ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Golang笔记: channel

2021-01-14 15:02:54  阅读:199  来源: 互联网

标签:nil gp goroutine 笔记 Golang 缓冲区 sg channel


Golang笔记: channel

定义

不要通过共享内存来通信,而是通过通信来实现内存共享

多个 goroutine 借助 channel 来传输数据,实现了跨 goroutine 间的数据传输,多者独立运行,不需要强关联,更不影响对方的 goroutine 状态。不存在 goroutine1 对 goroutine2 进行直传的情况。
图片

channel 基本特性

分类

channel 共有两种模式,分别是:双向和单向;三种表现方式,分别是:声明双向通道:chan T、声明只允许发送的通道:chan <- T、声明只允许接收的通道:<- chan T

channel 中还分为 “无缓冲 channel” 和 “缓冲 channel”。

等待队列

从channel读数据,如果channel缓冲区为空或者没有缓冲区,当前goroutine会被阻塞。
向channel写数据,如果channel缓冲区已满或者没有缓冲区,当前goroutine会被阻塞。

被阻塞的goroutine将会挂在channel的等待队列中:

  • 因读阻塞的goroutine会被向channel写入数据的goroutine唤醒;
  • 因写阻塞的goroutine会被从channel读数据的goroutine唤醒;

数据结构

channel 图解

图片

hchan 结构体

// src/runtime/chan.go
type hchan struct {
 qcount   uint      
 dataqsiz uint     
 buf      unsafe.Pointer 
 elemsize uint16
 closed   uint32
 elemtype *_type 
 sendx    uint  
 recvx    uint  
 recvq    waitq  
 sendq    waitq  

 lock mutex
}

/*
- qcount:队列中的元素总数量。
- dataqsiz:循环队列的长度。
- buf:指向长度为 dataqsiz 的底层数组,仅有当 channel 为缓冲型的才有意义。
- elemsize:能够接受和发送的元素大小。
- closed:是否关闭。
- elemtype:能够接受和发送的元素类型。
- sendx:已发送元素在循环队列中的索引位置。
- recvx:已接收元素在循环队列中的索引位置。
- recvq:接受者的 sudog 等待队列(缓冲区不足时阻塞等待的 goroutine)。
- sendq:发送者的 sudog 等待队列。
*/

recvqsendq,其表现为等待队列,其类型为 runtime.waitq 的双向链表结构

type waitq struct {
 first *sudog
 last  *sudog
}

// 无论是 first 属性又或是 last,其类型都为 runtime.sudog 结构体
type sudog struct {
 g *g

 next *sudog
 prev *sudog
 elem unsafe.Pointer
 ...
}

/*
g:指向当前的 goroutine。
next:指向下一个 g。
prev:指向上一个 g。
elem:数据元素,可能会指向堆栈。
*/

Chan使用

创建chan

ch := make(chan string)
ch := make(chan string, 1024)

创建 channel 的逻辑主要分为三大块:

  • 当前 channel 不存在缓冲区,也就是元素大小为 0 的情况下,就会调用 mallocgc 方法分配一段连续的内存空间。
  • 当前 channel 存储的类型存在指针引用,就会连同 hchan 和底层数组同时分配一段连续的内存空间。
  • 通用情况,默认分配相匹配的连续内存空间。

需要注意到一块特殊点,那就是 channel 的创建都是调用的 mallocgc 方法,也就是 channel 都是创建在堆上的。因此 channel 是会被 GC 回收的,自然也不总是需要 close 方法来进行显示关闭了。

向chan中写入数据

向一个channel中写数据过程

  1. 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程;

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
     ...
     if c.closed != 0 { // 会对 channel 进行一次状态判断(是否关闭
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
     }
    
     if sg := c.recvq.dequeue(); sg != nil {
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
     }
    }
    
  2. 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
     ...
     if c.qcount < c.dataqsiz {
      qp := chanbuf(c, c.sendx)
      typedmemmove(c.elemtype, qp, ep)
      c.sendx++
      if c.sendx == c.dataqsiz {
       c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
     }
    
     if !block {
      unlock(&c.lock)
      return false
     }
    }
    /*
    会对缓冲区进行判定(`qcount` 和 `dataqsiz` 字段),以此识别缓冲区的剩余空间。紧接进行如下操作:
    
    - 调用 `chanbuf` 方法,以此获得底层缓冲数据中位于 sendx 索引的元素指针值。
    - 调用 `typedmemmove` 方法,将所需发送的数据拷贝到缓冲区中。
    - 数据拷贝后,对 sendx 索引自行自增 1。同时若 sendx 与 dataqsiz 大小一致,则归 0(环形队列)。
    - 自增完成后,队列总数同时自增 1。解锁互斥锁,返回结果。
    */
    

    图片

  3. 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
     ...
     gp := getg()
     mysg := acquireSudog()
     mysg.releasetime = 0
     if t0 != 0 {
      mysg.releasetime = -1
     }
    
     mysg.elem = ep
     mysg.waitlink = nil
     mysg.g = gp
     mysg.isSelect = false
     mysg.c = c
     gp.waiting = mysg
     gp.param = nil
     c.sendq.enqueue(mysg)
    
     atomic.Store8(&gp.parkingOnChan, 1)
     gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    
     KeepAlive(ep)
    }
    /*
    - 调用 getg 方法获取当前 goroutine 的指针,用于后续发送数据。
    - 调用 acquireSudog 方法获取 sudog 结构体,并设置当前 sudog 具体的待发送数据信息和状态。
    - 调用 c.sendq.enqueue 方法将刚刚所获取的 sudog 加入待发送的等待队列。
    - 调用 gopark 方法挂起当前 goroutine(会记录执行位置),状态为 waitReasonChanSend,阻塞等待 channel。
    - 调用 KeepAlive 方法保证待发送的数据值是活跃状态,也就是分配在堆上,避免被 GC 回收。
    */
    

    图片

流程图

img

从chan中读取数据

前置处理

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
 if c == nil {
  if !block {
   return
  }
  gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
  throw("unreachable")
 }
/*
一开始时 chanrecv 方法会判断其是否为 nil channel。

场景如下:

若 channel 是 nil channel,且为阻塞接收则调用 gopark 方法挂起当前 goroutine。
若 channel 是非阻塞模式,则直接返回。
而接下来对于非阻塞模式的 channel 会进行快速失败检查,检测 channel 是否已经准备好接收。
*/
    
 if !block && empty(c) {
  if atomic.Load(&c.closed) == 0 {
   return
  }

  if empty(c) {
   if ep != nil {
    typedmemclr(c.elemtype, ep)
   }
   return true, false
  }
 }
 ...
}
/*
其分以下几种情况:

无缓冲区:循环队列为 0 及等待队列 sendq 内没有 goroutine 正在等待。
有缓冲区:缓冲区数组为空。
随后会对 channel 的 closed 状态进行判断,因为 channel 是无法重复打开的,需要确定当前 channel 是否为未关闭状态。再确定接收失败,返回。

但若是 channel 已经关闭且不存在缓存数据了,则会清理 ep 指针中的数据并返回。
*/

从channel读数据过程

  1. 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,最后把G唤醒,结束读取过程;

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    
     lock(&c.lock)
    
     if sg := c.sendq.dequeue(); sg != nil {
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true, true
     }
     ...
    }
    
  2. 如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,结束读取过程;

  3. 如果等待发送队列sendq为空、缓冲区中有数据,则从缓冲区取出数据,结束读取过程;

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    
     if c.qcount > 0 {
      qp := chanbuf(c, c.recvx)
      if ep != nil {
       typedmemmove(c.elemtype, ep, qp)
      }
      typedmemclr(c.elemtype, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
       c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return true, true
     }
    
     if !block {
      unlock(&c.lock)
      return false, false
     }
     ...
    }
    
  4. 如果等待发送队列sendq为空、缓冲区中无数据,将当前goroutine加入recvq,进入睡眠,等待被写goroutine唤醒;

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    
     gp := getg()
     mysg := acquireSudog()
     mysg.releasetime = 0
     if t0 != 0 {
      mysg.releasetime = -1
     }
    
     mysg.elem = ep
     mysg.waitlink = nil
     gp.waiting = mysg
     mysg.g = gp
     mysg.isSelect = false
     mysg.c = c
     gp.param = nil
     c.recvq.enqueue(mysg)
    
     atomic.Store8(&gp.parkingOnChan, 1)
     gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
     ...
    }
    

流程图

img

特性

  1. 读队列Recvq, 写入数据时G被唤醒; 写队列Sendq, 读取数据时G被唤醒
  2. sendq有G有数据; recvq有G无数据

Chan关闭

panic出现的常见场景:

  1. 关闭值为nil的channel(未make)
  2. 关闭已经被关闭的channel
  3. 向已经关闭的channel写数据

channel异常总结

sendq、recvq G释放

释放接收方

在完成了异常边界判断和标志设置后,会将接受者的 sudog 等待队列(recvq)加入到待清除队列 glist 中:

func closechan(c *hchan) {

 var glist gList
 for {
  sg := c.recvq.dequeue()
  if sg == nil {
   break
  }
  if sg.elem != nil {
   typedmemclr(c.elemtype, sg.elem)
   sg.elem = nil
  }
  if sg.releasetime != 0 {
   sg.releasetime = cputicks()
  }
  gp := sg.g
  gp.param = nil
  if raceenabled {
   raceacquireg(gp, c.raceaddr())
  }
  glist.push(gp)
 }
 ...
}

所取出并加入的 goroutine 状态需要均为 _Gwaiting,以保证后续的新一轮调度。

释放发送方

同样,与释放接收方一样。会将发送方也加入到到待清除队列 glist 中:

func closechan(c *hchan) {

 // release all writers (they will panic)
 for {
  sg := c.sendq.dequeue()
  if sg == nil {
   break
  }
  sg.elem = nil
  if sg.releasetime != 0 {
   sg.releasetime = cputicks()
  }
  gp := sg.g
  gp.param = nil
  if raceenabled {
   raceacquireg(gp, c.raceaddr())
  }
  glist.push(gp)
 }
 unlock(&c.lock)
 ...
}

协程调度

将所有 glist 中的 goroutine 状态从 _Gwaiting 设置为 _Grunnable 状态,等待调度器的调度:

func closechan(c *hchan) {

 // Ready all Gs now that we've dropped the channel lock.
 for !glist.empty() {
  gp := glist.pop()
  gp.schedlink = 0
  goready(gp, 3)
 }
}

后续所有的 goroutine 允许被重新调度后。若原本还在被动阻塞的发送方或接收方,将重获自由,后续该干嘛就去干嘛了,再跑回其所属的应用流程。

channel send/recv 分析

send

send 方法承担向 channel 发送具体数据的功能:

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 if sg.elem != nil {
  sendDirect(c.elemtype, sg, ep)
  sg.elem = nil
 }
 gp := sg.g
 unlockf()
 gp.param = unsafe.Pointer(sg)
 if sg.releasetime != 0 {
  sg.releasetime = cputicks()
 }
 goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
 dst := sg.elem
 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
 memmove(dst, src, t.size)
}
  • 调用 sendDirect 方法将待发送的数据直接拷贝到待接收变量的内存地址(执行栈)。

    • 例如:msg := <-ch 语句,也就是将数据从 ch 直接拷贝到了 msg 的内存地址。
  • 调用 sg.g 属性, 从 sudog 中获取等待接收数据的 goroutine,并传递后续唤醒所需的参数。

  • 调用 goready 方法唤醒需接收数据的 goroutine,期望从 _Gwaiting 状态调度为 _Grunnable

recv

recv 方法承担在 channel 中接收具体数据的功能:

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 if c.dataqsiz == 0 {
  if ep != nil {
   recvDirect(c.elemtype, sg, ep)
  }
 } else {
  qp := chanbuf(c, c.recvx)
  if ep != nil {
   typedmemmove(c.elemtype, ep, qp)
  }
  typedmemmove(c.elemtype, qp, sg.elem)
  c.recvx++
  if c.recvx == c.dataqsiz {
   c.recvx = 0
  }
  c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
 }
 sg.elem = nil
 gp := sg.g
 unlockf()
 gp.param = unsafe.Pointer(sg)
 if sg.releasetime != 0 {
  sg.releasetime = cputicks()
 }
 goready(gp, skip+1)
}

该方法在接受上分为两种情况,分别是直接接收和缓冲接收:

直接接收(不存在缓冲区):

  • 调用 recvDirect 方法,其作用与 sendDirect 方法相对,会直接从发送方的 goroutine 调用栈中将数据拷贝过来到接收方的 goroutine。

缓冲接收(存在缓冲区):

  • 调用 chanbuf 方法,根据 recvx 索引的位置读取缓冲区元素,并将其拷贝到接收方的内存地址。
  • 拷贝完毕后,对 sendxrecvx 索引位置进行调整。

最后还是常规的 goroutine 调度动作,会调用 goready 方法来唤醒当前所处理的 sudog 的对应 goroutine。那么在下一轮调度时,既然已经接收了数据,自然发送方也就会被唤醒。

细节

  1. channel 都是创建在堆上的。因此 channel 是会被 GC 回收的;
  2. 调用 KeepAlive 方法保证待发送的数据值是活跃状态,也就是分配在堆上,避免被 GC 回收。
  3. 在具体的数据传输上,都是围绕着 “边界上下限处理,上互斥锁,阻塞/非阻塞,缓冲/非缓冲,缓存出队列,拷贝数据,解互斥锁,协程调度” 在不断地流转处理。在基本逻辑上也是相对重合的,因为发送和接收,创建和关闭总是相对的。

参考博客

Go语言基础之并发
一文带你解密 Go 语言之通道 channel
Go专家编程(书籍购买)

标签:nil,gp,goroutine,笔记,Golang,缓冲区,sg,channel
来源: https://blog.csdn.net/qq_36652517/article/details/112607822

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有