golang chan 探究

前言

之前在看golang多线程通信的时候, 看到了go 的管道. 当时就觉得这玩意很神奇, 因为之前接触过的不管是php, java, Python, js, c等等, 都没有这玩意, 第一次见面, 难免勾起我的好奇心. 所以就想着看一看它具体是什么东西. 很明显, 管道是go实现在语言层面的功能, 所以我以为需要去翻他的源码了. 虽然最终没有翻到C的层次, 不过还是受益匪浅.

见真身

结构体

要想知道他是什么东西, 没什么比直接看他的定义更加直接的了. 但是其定义在哪里么? 去哪里找呢? 还记得我们是如何创建chan的么? make方法. 但是当我找过去的时候, 发现make方法只是一个函数的声明.

截屏2020-10-10 下午10.13.05

这, 还是没有函数的具体实现啊. 汇编看一下. 编写以下内容:

package main

func main() {
    _ = make(chan int)
}

执行命令:

go tool compile -N -l -S main.go

虽然汇编咱看不懂, 但是其中有一行还是引起了我的注意.

image-20201010221859435

make调用了runtime.makechan. 漂亮, 就找他.

image-20201010222031325

找到他了, 是hchan指针对象. 整理了一下对象的字段(不过人家自己也有注释的):

// 其内部维护了一个循环队列(数组), 用于管理发送与接收的缓存数据. 
type hchan struct {
  // 队列中元素个数
    qcount   uint
  // 队列的大小(数组长度)
    dataqsiz uint
  // 指向底层的缓存队列, 是一个可以指向任意类型的指针. 
    buf      unsafe.Pointer
  // 管道每个元素的大小
    elemsize uint16
  // 是否被关闭了
    closed   uint32
  // 管道的元素类型
    elemtype *_type
  // 当前可以发送的元素索引(队尾)
    sendx    uint  
  // 当前可以接收的元素索引(队首)
    recvx    uint  
  // 当前等待接收数据的 goroutine 队列
    recvq    waitq
  // 当前等待发送数据的 goroutine 队列
    sendq    waitq 
    // 锁, 用来保证管道的每个操作都是原子性的. 
    lock mutex
}

可以看的出来, 管道简单说就是一个队列加一把锁.

发送数据

依旧使用刚才的方法分析, 发送数据时调用了runtime.chansend1 函数. 其实现简单易懂:

image-20201010225259028

然后查看真正实现, 函数步骤如下(个人理解, 有一些 test 使用的代码被我删掉了. ):

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // 异常处理, 若管道指针为空
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    // 常量判断, 恒为 false, 应该是开发时调试用的. 
    if debugChan {
        print("chansend: chan=", c, "\n")
    }
    // 常量, 恒为 false, 没看懂这个判断
    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
    }
  // 若当前操作不阻塞, 且管道还没有关闭时判断
  // 当前队列容量为0且没有等待接收数据的 或 当前队列容量不为0且队列已满
  // 那么问题来了, 什么时候不加锁呢? select 的时候. 可以在不阻塞的时候快速返回
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }
    // 上锁, 保证操作的原子性
    lock(&c.lock)
    // 若管道已经关闭, 报错
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    // 从接受者队列获取一个接受者, 若存在, 数据直接发送, 不走缓存, 提高效率
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    // 若缓存为满, 则将数据放到缓存中排队
    if c.qcount < c.dataqsiz {
    // 取出对尾的地址
        qp := chanbuf(c, c.sendx)
    // 将ep 的内容拷贝到 ap 地址
        typedmemmove(c.elemtype, qp, ep)
    // 更新队尾索引
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }
    // 若当前不阻塞, 直接返回
    if !block {
        unlock(&c.lock)
        return false
    }
    // 当走到这里, 说明数据没有成功发送, 且需要阻塞等待. 
  // 以下代码没看懂, 不过可以肯定的是, 其操作为阻塞当前协程, 等待发送数据
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    KeepAlive(ep)
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

虽然最终阻塞的地方没看太明白, 不过发送数据的大体流程很清楚:

  1. 若无需阻塞且不能发送数据, 返回失败
  2. 若存在接收者, 直接发送数据
  3. 若存在缓存, 将数据放到缓存中
  4. 若无需阻塞, 返回失败
  5. 阻塞等待发送数据

其中不加锁的操作, 在看到selectnbsend函数的注释时如下:

// compiler implements
//
//  select {
//  case c <- v:
//      ... foo
//  default:
//      ... bar
//  }
//
// as
//
//  if selectnbsend(c, v) {
//      ... foo
//  } else {
//      ... bar
//  }
//
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    return chansend(c, elem, false, getcallerpc())
}

看这意思, select关键字有点类似于语法糖, 其内部会转换成调用selectnbsend函数的简单if判断.

接收数据

至于接收数据的方法, 其内部实现与发送大同小异. runtime.chanrecv 方法.

源码简单看了一下, 虽理解不深, 但对channel也有了大体的认识.

上手

简单对channel的使用总结一下.

定义

// 创建普通的管道类型, 非缓冲
a := make(chan int)
// 创建缓冲区大小为10的管道
b := make(chan int, 10)
// 创建只用来发送的管道
c := make(chan<- int)
// 创建只用来接收的管道
d := make(<-chan int)
// eg: 只用来接收的管道, 每秒一个
e := time.After(time.Second)

发送与接收

// 接收数据
a := <- ch
b, ok := <- ch
// 发送数据
ch <- 2

最后, 看了一圈, 感觉channel并不是很复杂, 就是一个队列, 一端接受, 一端发送. 不过其对多协程处理做了很多优化. 与协程配合, 灵活使用的话, 应该会有不错的效果.

订阅评论
提醒
guest
0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请发表评论。x