前言
之前在看golang
多线程通信的时候, 看到了go 的管道. 当时就觉得这玩意很神奇, 因为之前接触过的不管是php
, java
, Python
, js
, c
等等, 都没有这玩意, 第一次见面, 难免勾起我的好奇心. 所以就想着看一看它具体是什么东西. 很明显, 管道是go
实现在语言层面的功能, 所以我以为需要去翻他的源码了. 虽然最终没有翻到C
的层次, 不过还是受益匪浅.
见真身
结构体
要想知道他是什么东西, 没什么比直接看他的定义更加直接的了. 但是其定义在哪里么? 去哪里找呢? 还记得我们是如何创建chan
的么? make
方法. 但是当我找过去的时候, 发现make
方法只是一个函数的声明.
这, 还是没有函数的具体实现啊. 汇编看一下. 编写以下内容:
package main
func main() {
_ = make(chan int)
}
执行命令:
go tool compile -N -l -S main.go
虽然汇编咱看不懂, 但是其中有一行还是引起了我的注意.
make
调用了runtime.makechan
. 漂亮, 就找他.
找到他了, 是hchan
指针对象. 整理了一下对象的字段(不过人家自己也有注释的):
// 其内部维护了一个循环队列(数组), 用于管理发送与接收的缓存数据.
type hchan struct {
// 队列中元素个数
qcount uint
// 队列的大小(数组长度)
dataqsiz uint
// 指向底层的缓存队列, 是一个可以指向任意类型的指针.
buf unsafe.Pointer
// 管道每个元素的大小
elemsize uint16
// 是否被关闭了
closed uint32
// 管道的元素类型
elemtype *_type
// 当前可以发送的元素索引(队尾)
sendx uint
// 当前可以接收的元素索引(队首)
recvx uint
// 当前等待接收数据的 goroutine 队列
recvq waitq
// 当前等待发送数据的 goroutine 队列
sendq waitq
// 锁, 用来保证管道的每个操作都是原子性的.
lock mutex
}
可以看的出来, 管道简单说就是一个队列加一把锁.
发送数据
依旧使用刚才的方法分析, 发送数据时调用了runtime.chansend1
函数. 其实现简单易懂:
然后查看真正实现, 函数步骤如下(个人理解, 有一些 test 使用的代码被我删掉了. ):
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 异常处理, 若管道指针为空
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 常量判断, 恒为 false, 应该是开发时调试用的.
if debugChan {
print("chansend: chan=", c, "\n")
}
// 常量, 恒为 false, 没看懂这个判断
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// 若当前操作不阻塞, 且管道还没有关闭时判断
// 当前队列容量为0且没有等待接收数据的 或 当前队列容量不为0且队列已满
// 那么问题来了, 什么时候不加锁呢? select 的时候. 可以在不阻塞的时候快速返回
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
// 上锁, 保证操作的原子性
lock(&c.lock)
// 若管道已经关闭, 报错
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 从接受者队列获取一个接受者, 若存在, 数据直接发送, 不走缓存, 提高效率
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 若缓存为满, 则将数据放到缓存中排队
if c.qcount < c.dataqsiz {
// 取出对尾的地址
qp := chanbuf(c, c.sendx)
// 将ep 的内容拷贝到 ap 地址
typedmemmove(c.elemtype, qp, ep)
// 更新队尾索引
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// 若当前不阻塞, 直接返回
if !block {
unlock(&c.lock)
return false
}
// 当走到这里, 说明数据没有成功发送, 且需要阻塞等待.
// 以下代码没看懂, 不过可以肯定的是, 其操作为阻塞当前协程, 等待发送数据
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
虽然最终阻塞的地方没看太明白, 不过发送数据的大体流程很清楚:
- 若无需阻塞且不能发送数据, 返回失败
- 若存在接收者, 直接发送数据
- 若存在缓存, 将数据放到缓存中
- 若无需阻塞, 返回失败
- 阻塞等待发送数据
其中不加锁的操作, 在看到selectnbsend
函数的注释时如下:
// compiler implements
//
// select {
// case c <- v:
// ... foo
// default:
// ... bar
// }
//
// as
//
// if selectnbsend(c, v) {
// ... foo
// } else {
// ... bar
// }
//
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
看这意思, select
关键字有点类似于语法糖, 其内部会转换成调用selectnbsend
函数的简单if
判断.
接收数据
至于接收数据的方法, 其内部实现与发送大同小异. runtime.chanrecv
方法.
源码简单看了一下, 虽理解不深, 但对channel
也有了大体的认识.
上手
简单对channel
的使用总结一下.
定义
// 创建普通的管道类型, 非缓冲
a := make(chan int)
// 创建缓冲区大小为10的管道
b := make(chan int, 10)
// 创建只用来发送的管道
c := make(chan<- int)
// 创建只用来接收的管道
d := make(<-chan int)
// eg: 只用来接收的管道, 每秒一个
e := time.After(time.Second)
发送与接收
// 接收数据
a := <- ch
b, ok := <- ch
// 发送数据
ch <- 2
最后, 看了一圈, 感觉channel
并不是很复杂, 就是一个队列, 一端接受, 一端发送. 不过其对多协程处理做了很多优化. 与协程配合, 灵活使用的话, 应该会有不错的效果.