ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Go channel——block为false时chansend/chanrecv的处理机制

2021-05-23 14:05:43  阅读:216  来源: 互联网

标签:缓存 false chanrecv chansend send closed sg channel


前言

本篇聚集select 2个case(1个send/recv case、1个default case)场景时sendrecv的具体处理。

更多内容分享,欢迎关注公众号:Go开发笔记

chansend

select {
case c <- v:
	... foo
default:
	... bar
}

其底层对应func为selectnbsend

selectnbsend

// compiler implements
//
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
	return chansend(c, elem, false, getcallerpc())
}

从注释中我们可以知道,select 2个case(1个send/recv case、1个default case)会被编译成if...else的形式进行处理。

注意:此时block默认为false

chansend具体实现

以下源码中省略了blocktrue的逻辑及部分debug及race的逻辑。

/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {
		if !block { // 对于select chan,向nil的chan发送消息会直接返回false
			return false
		}
		...
	}

	...
	// Fast path: check for failed non-blocking operation without acquiring the lock.
	//
	// After observing that the channel is not closed, we observe that the channel is
	// not ready for sending. Each of these observations is a single word-sized read
	// (first c.closed and second full()).
	// Because a closed channel cannot transition from 'ready for sending' to
	// 'not ready for sending', even if the channel is closed between the two observations,
	// they imply a moment between the two when the channel was both not yet closed
	// and not ready for sending. We behave as if we observed the channel at that moment,
	// and report that the send cannot proceed.
	//
	// It is okay if the reads are reordered here: if we observe that the channel is not
	// ready for sending and then observe that it is not closed, that implies that the
	// channel wasn't closed during the first observation. However, nothing here
	// guarantees forward progress. We rely on the side effects of lock release in
	// chanrecv() and closechan() to update this thread's view of c.closed and full().
	if !block && c.closed == 0 && full(c) { // 对于select chan,未close时,缓存已满直接返回false
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock) // 获取锁

	if c.closed != 0 { // 如果chan已closed,则释放锁并panic
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	if sg := c.recvq.dequeue(); sg != nil {// 如果接收等待队列中有等待的接收者,直接发送到接收者。
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
    // 以下是没有接收者的处理
	if c.qcount < c.dataqsiz {// 如果当前数据量少于缓存,即缓存还有剩余,存入发送缓存队列
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)// 获取数据存入缓存的位置
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep)// 将数据存入缓存的指定位置
		c.sendx++// 指向下一个位置
		if c.sendx == c.dataqsiz {// 如果已达到队列大小,说明已满,重新指向开头位置
			c.sendx = 0
		}
		c.qcount++ // 数据量+1
		unlock(&c.lock)// 释放锁
		return true // 发送成功
	}
    // 当缓存已满时
	if !block {// select channel直接返回失败
		unlock(&c.lock)
		return false
	}

	...
	return true // 发送成功
}

// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if raceenabled {
		if c.dataqsiz == 0 {
			racesync(c, sg)
		} else {
			// Pretend we go through the buffer, even though
			// we copy directly. Note that we need to increment
			// the head/tail locations only when raceenabled.
			racenotify(c, c.recvx, nil)
			racenotify(c, c.recvx, sg)
			c.recvx++
			if c.recvx == c.dataqsiz {
				c.recvx = 0
			}
			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
		}
	}
	if sg.elem != nil {// sg已存储发送内容
		sendDirect(c.elemtype, sg, ep)// 直接将内存发送至接收者,实际是将数据拷贝至接收者
		sg.elem = nil // 清空
	}
	gp := sg.g // 获取接收goroutine gp
	unlockf()
	gp.param = unsafe.Pointer(sg) // 设置唤醒参数为等待列表中的sg
	sg.success = true // 设置sg的状态为成功
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1) // 接收goroutine准备运行
}

// Sends and receives on unbuffered or empty-buffered channels are the
// only operations where one running goroutine writes to the stack of
// another running goroutine. The GC assumes that stack writes only
// happen when the goroutine is running and are only done by that
// goroutine. Using a write barrier is sufficient to make up for
// violating that assumption, but the write barrier has to work.
// typedmemmove will call bulkBarrierPreWrite, but the target bytes
// are not in the heap, so that will not help. We arrange to call
// memmove and typeBitsBulkBarrier instead.

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	// src is on our stack, dst is a slot on another stack. 跨栈拷贝

	// Once we read sg.elem out of sg, it will no longer
	// be updated if the destination's stack gets copied (shrunk).
	// So make sure that no preemption points can happen between read & use.
	dst := sg.elem 
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)// 拷贝数据前添加写屏障,确保在读取和使用之间不会发生抢占点。
	// No need for cgo write barrier checks because dst is always
	// Go memory.
	memmove(dst, src, t.size) //将数据拷贝至dst,即sg.elem中
}

此场景下使用send时,需要注意:

  • block为false,不会发生阻塞
  • nilchan send,直接返回false
  • 若chan没有closed且缓存已满,直接返回false
  • closedchan send会发送panic
  • 若已经有等待的接收者,会直接发送至接收者(直接拷贝数据到接收者)
  • 若缓存未满,则继续缓存
  • 若缓存已满,直接返回false
  • recv/close操作均可以唤醒send goroutine,区别在于close后当前goroutine会panic,recv后返回true

chanrecv

// recv1
select {
case v = <-c:
	... foo
default:
	... bar
}

// recv2
select {
case v, ok = <-c: 
	... foo
default:
	... bar
}

以上两种场景recv处理方式一致,recv1recv2分别对应底层func selectnbrecvselectnbrecv2

selectnbrecv/selectnbrecv2

// compiler implements
//
//	select {
//	case v = <-c:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbrecv(&v, c) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
	selected, _ = chanrecv(c, elem, false)
	return
}

// compiler implements
//
//	select {
//	case v, ok = <-c:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if c != nil && selectnbrecv2(&v, &ok, c) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
	// TODO(khr): just return 2 values from this function, now that it is in Go.
	selected, *received = chanrecv(c, elem, false)
	return
}å

selectnbrecvselectnbrecv2区别在于是否有received,注意block的默认值为false

chanrecv具体实现

以下源码中省略了block为true的逻辑及部分debug及race的逻辑。

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// raceenabled: don't need to check ep, as it is always on the stack
	// or is new memory allocated by reflect.

	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}

	if c == nil {/
		if !block {// 对于select chan,向nil的chan接收消息会直接返回false
			return
		}
		...
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	if !block && empty(c) {// 对于select chan,chan缓存为空时
		// After observing that the channel is not ready for receiving, we observe whether the
		// channel is closed.
		//
		// Reordering of these checks could lead to incorrect behavior when racing with a close.
		// For example, if the channel was open and not empty, was closed, and then drained,
		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
		// we use atomic loads for both checks, and rely on emptying and closing to happen in
		// separate critical sections under the same lock.  This assumption fails when closing
		// an unbuffered channel with a blocked send, but that is an error condition anyway.
		if atomic.Load(&c.closed) == 0 { // 如果chan已经close,直接返回
			// Because a channel cannot be reopened, the later observation of the channel
			// being not closed implies that it was also not closed at the moment of the
			// first observation. We behave as if we observed the channel at that moment
			// and report that the receive cannot proceed.
			return
		}
		// The channel is irreversibly closed. Re-check whether the channel has any pending data
		// to receive, which could have arrived between the empty and closed checks above.
		// Sequential consistency is also required here, when racing with such a send.
		if empty(c) {// 再次检查缓存是否为空
			// The channel is irreversibly closed and empty.
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			if ep != nil {
				typedmemclr(c.elemtype, ep) //清空ep中的数据
			}
			return true, false
		}
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)

	if c.closed != 0 && c.qcount == 0 {// 再次检查
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}

	if sg := c.sendq.dequeue(); sg != nil {// 如果发送队列中有等待的发送者,直接从发送者接收数据
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

	if c.qcount > 0 {// 如果有缓存
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)// 获取缓存的位置
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {// 将数据拷贝至ep中
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++ // 指向下一个位置
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount-- // 减少缓存数量
		unlock(&c.lock)
		return true, true
	}

	if !block {// 对于select channel操作,无缓存时直接返回false
		unlock(&c.lock)
		return false, false
	}
	return true, success
	...
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
	// dst is on our stack or the heap, src is on another stack.
	// The channel is locked, so src will not move during this
	// operation.
	src := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	memmove(dst, src, t.size) // 拷贝数据
}

此场景下使用recv时,需要注意:

  • block为false
  • nilchan recv会直接返回selected=false,received=false
  • closed缓存为空chan recv会获取到chan类型的零值
  • 若已经有等待的发送者,
    • 若是无缓存chan,会直接从发送者接收数据(从发送者直接拷贝数据到接收者)
    • 否则,取待接收位置的缓存,将发送数据存储至待发送位置的缓存
  • 若有缓存,则取待接收位置的缓存
  • 若没有缓存,直接返回selected=false,received=false
  • send/close操作均可以唤醒send goroutine,区别在于:
    • close后,若有缓存,recv会返回缓存,若没有、缓存,recv会返回零值
    • send后返回true,true

总结

最后以一张图总结2个case(1个send/recv case、1个default case)场景下使用send/recv的处理逻辑:

在这里插入图片描述

与单独使用(包含select单个send/recv)的最大区别在于不会造成goroutine阻塞,不满足条件时,直接返回结果。

标签:缓存,false,chanrecv,chansend,send,closed,sg,channel
来源: https://blog.csdn.net/xz_studying/article/details/117194406

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有