Goroutine 使用的一些经验谈
本文最后更新于:2022年11月29日 晚上
在工作中 Goroutine 使用得相当多,积累了不少经验,也逐渐学习了一些小 tricks,在此进行一些总结 。
如何控制 Goroutine 的数量
关于 Goroutine 的语法不做赘述。
首先,使用协程池还是直接创建新的 Goroutine 并限制数量(协程池会尽可能复用而不是创建)仁者见仁,智者见智,由于 Goroutine 比较轻量级,即便创建新的 Routine 资源消耗也不会很大,在并发数不高的情况下,可以不用过多在意,详情可以参考下这篇回答[1]
下面会介绍提到的两种限制数量的方法。
开源协程池
主流的为以下两个:
以 ants 为例,ants 在 README 中其实有比较详细的介绍:
common pool
1 |
|
正如其名,其很通用,因为 Sumbit()
完全可以传递完全不同的函数作为参数。
有一些注意点:
不能直接去
Submit
我们的 workerFunction (即demoFunc()
),正如使用 Goroutine 的一般经验,需要使用去WaitGroup
去进行控制,示例使用了syncCalculateSum
进行了一层包络,同时在调用前要wg.Add(1)
尽管示例中的
demoFunc()
没有参数,而且Submit()
的函数签名中可以看到其能接受的函数参数也没有任何参数值,但是我们可以使用closure
闭包机制去传递参数。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25func demoFunc(person string) {
fmt.Println("Hello " + person)
}
defer ants.Release()
var people = []string{
"Alice",
"Bob",
}
// Use the common pool.
var wg sync.WaitGroup
for _, person := range peple {
localPerson := person // important!
f := func(){
demoFunc(localPerson)
wg.Done()
}
wg.Add(1)
_ = ants.Submit(f)
}
wg.Wait()
Pool with a func
1 |
|
与 Common Pool 相比,其只执行特定的函数,但在使用思路上大体一致,同样需要使用WaitGroup
进行控制。
示例中只传递了一个参数,同时NewPoolWithFunc()
的函数签名中也可以看到其接受的函数参数的参数值只有一个interface{}
,也缺少一个类似于 func NewPoolWithFunc(size int, pf func(...interface{}), options ...Option)
的接口,为了能够传递多个参数可能就需要一些 trick.
一种方法就是将 workFunc 需要的所有参数包装成一个结构体,再拆解传递:
1 |
|
另一种方法则是上面提到的闭包机制。
关于性能
还是需要强调,在 worker 数量并不大的情况下,协程池能带来的性能提升是极其有限的,尽管可能会有比较明显的内存使用优势(因为复用机制)[2],但协程池的使用毕竟增加了复杂度,这点需要进行权衡。
直接创建新的 Routine
除了使用协程池去对 Goroutine 进行复用,还可以直接创建新的 Goroutine ,需要的则是添加一些对数量进行限制的逻辑:
1 |
|
核心思想就是使用空结构体的带有缓冲的channel
,利用缓冲区满时 Block 的性质可以很容易地控制并发的数量。
这里很推荐将任务逻辑包装成一个workerFunc
;尽管不那么做也可以,比如像下面这样:
1 |
|
但是这里会有一些潜藏的风险,如果你的逻辑中有多个 return,最容易出现的错误就是只在末尾添加<-workerLimiter
和wg.Done()
,一旦执行不到末尾而提前 return,就有可能会造成程序的阻塞:
- 如果 worker 数量大于等于 task 数量,会在
wg.Wait()
时阻塞 - 如果 worker 数量小于 task 数量,则会在
workLimiter <- struct{}{}
处阻塞
为了避免遗忘添加,最好的方法就是使用defer
,不过由于defer
只能和函数连用,所以将相关资源释放的逻辑整合到同一个函数中:
1 |
|
收集数据
使用 Goroutine 往往是为了将比较多的数据进行分批处理从而加快执行速度,所以一般最后需要对 worker 处理得到的结果进行汇集。
如果结果可以存储在 map 确定的 key 对应的位置或者数组中某个确定的 index 对应的位置中,那么无需担心并发访问的问题。但如果只是将结果添加到数组末尾,且添加位置取决于完成的先后顺序,那么需要考虑 thread-safe 的问题。
使用 sync.Mutex 来保护
1 |
|
最直接的方法就是使用sync.Mutex
来做同步。
使用 channel 来收集数据
channel
本身是 thread-safe 的,使用channel
去做同步也是官方推荐的一种方法:
1 |
|
在代码中,worker routine 中将任务结果发送到channel
中,在主 routine 中(区分于go
关键词创建的 routine)使用for
从channel
中不断地收集数据;
为了能够让主 routine 及时退出循环,这里需要使用另一个单独的 routine 来负责在所有任务完成后关闭channel
,这段逻辑如果写在主 routine 的任何地方都会导致阻塞[3]:
- 如果在 Position 1 处,由于 unbuffered channel 可以看作 always-full,在没有人从
resChan
处接收数据时,worker routine 作为 sender 会阻塞,这意味着wg.Done()
永远无法被执行,则wg.Wait()
永远无法退出,从而造成主 routine 的阻塞 - 如果在 Position 2 处,由于没有人去 close channel,主 routine 永远无法退出
for
循环
由于这些要注意的点,这么写确实显得很麻烦,远不如前一种那么简单直观,在多数情况下我也推荐第一种,但是有一些很特别的情况,比如你既不想使用goto
同时你又不希望执行到底部,想要提前退出并发送 res,在这种情况下,前一种方法需要写
1 |
|
后一种方法则只需要写
1 |
|
尤其在这样的代码段可能会在你的 task 逻辑中大量反复出现时,后一种方法形势上要更整洁统一。
如何在 Goroutine 中处理错误
在使用 Goroutine 的一些场景中,worker 可能会在执行过程中产生一些error
,关于如何处理,Go Sync提供了一个强大的“武器”——errgroup.Group
后续有空的话会进一步拓展这个话题,有一些代码片段可供参考[4][5],这篇博文[6]也写得很好,深入浅出,可以好好品读一下。
errgroup.Group.Go()
无法接受带有参数的函数参数,可以像给出的参考[4][5][6]中那样使用channel
传递数据,当然也可以使用之前介绍中提到的闭包机制。
闭包
严格来说这不是一个跟 Goroutine 相关的话题,所以在这里不会做详细地介绍。
可能注意到在 common pool 中有这么一个代码片段:
1 |
|
localPerson := person
显得很多余,但其实不然,如果有过 Python 程序编写经验的人会很容易明白,person
这个变量指向的其实是同一个地址,如果不做 copy,它会被所有的 Goroutine 共享[7]。
可以自己简单验证一下:
1 |
|
结果为
1 |
|
修改后:
1 |
|
结果为
1 |
|
参考
- Does a goroutine pool make sense like thread pools in other languages? ↩
- 测试发现使用池计算时间并没有缩短,只有内存占用变小了。 ↩
- Why does the use of an unbuffered channel in the same goroutine result in a deadlock? ↩
- errgroup_example_md5all_test.go ↩
- another errgroup example ↩
- Why you should be using errgroup.WithContext() in your Golang server handlers ↩
- go vet range variable captured by func literal when using go routine inside of for each loop ↩