[奔跑的 Go] 教程十七、深入学习 Go 并发编程之通道(Channel)
通道是什么?
通道 是用于协程间交流的通信载体。严格地来说,通道就是数据传输的管道,数据通过这根管道被“传入”或被“读出”。 因此协程可以发送数据到通道中,而另一个协程可以从该通道中读取数据。
声明一个通道
Go提供一个chan
关键词去创建一个通道。一个通道只能传入一种类型的数据,其他的数据类型不允许被传输。
https://play.golang.org/p/iWOFLfcgfF-
上面的程序声明了一个可以传入int类型数据的通道c
。上面的程序会打印<nil>
,因为通道的0值是nil
。但是一个nil通道是没有用的。你不能向它传递数据或者读取数据。因此,我们必须使用make函数器创建一个可以使用的通道。
https://play.golang.org/p/N4dU7Ql9bK7
我们可以使用一个简洁的符号:=
和make函数来创建一个通道。看上面程序执行的结果
type of `c` is chan int
value of `c` is 0xc0420160c0
注意看通道C的值。它是一个指针内存地址。通道变量默认是一个指针。多数情况下,当你想要和一个协程沟通的时候,你可以给函数或者方法传递一个通道作为参数。当从协程接收到通道参数后,你不需要再对其进行解引用就可以从通道接收或者发送数据。
通道的读写操作
Go语言提供一个非常简洁的左箭头语法 <-
去从通道读写数据。
c <- data
上面的代码意味着我们想要把 data
数据推入到通道 c
. 注意看箭头的指向.它表明是从 data
to 通道 c
. 因此我们可以猜到我们正在把 data
推入到通道 c
.
<- c
上面的代码表明我们想要从通道 c
读一些数据. 注意看箭头的指向,它从通道 c
开始. 这个语句不会把数据传输给任何变量, 但是仍然是一个有效的语句. 如果你希望有一个变量来接收通道C的数据,你可以使用下面的语句
var data int
data = <- c
现在来自int类型通道c的数据就可以被存储到int类型变量data中。
上面的语句也可以使用语法:=
重写
data := <- c
Go语言将根据通道C的数据类型来自动判断data变量的数据类型。
上面的通道操作默认是阻塞的. 在以前的课程中,我们知道可以使用 time.Sleep
去阻塞一个通道. 通道操作本质上是阻塞的.当一些数据被写入通道,对应的协程将阻塞直到有其他的协程可以从此通道接收数据。 同时正如我们之前看的文章 concurrency chapter
, 通道操作会通知调度器去调度其他的协程, 这就是为什么程序不会一直阻塞在一个协程. 通道的这些特性在不同的协程沟通的时候非常有用,它避免了我们使用锁或者一些hack手段去达到阻塞协程的目的。
通道实战
通道已经介绍差不多了,让我们结合协程来实战一下。
https://play.golang.org/p/OeYLKEz7qKi
让我们一步步剖析下上面程序的执行步骤。
- 我们声明一个函数
greet
,这个函数的参数c
是一个string类型的通道. 在这个函数中,我们从通道c
中接收数据并打印到控制台上。 - main函数的第一个语句是打印
main started
到控制台。 - 接下来我们在main函数中使用make函数创建一个string类型的通道赋值给‘c’变量
- 我们把c通道传递给
greet
函数并用go关键词以协程方式运行它。 - 此时, 程序有两个协程并且正在调度运行的是
main goroutine
(阅读以前的文章就明白为什么). 接下来运行下一行代码. - 我们给通道
c
传入一个数据John
. 此时主线程将阻塞直到有协程接收这个数据. Go的调度器开始调度greet
协程接收通道c的数据,执行逻辑参照第一步的说明。 - 然后主线程激活并且执行后面的语句,打印
main stopped
死锁
正如上面所说,当通道读写数据时,所在协程会阻塞并且调度控制权会转移到其他未阻塞的协程。
如果当前协程正在从一个没有任何值的通道中读取数据,那么当前协程会阻塞并且等待其他协程往此通道写入值。因此,读操作将被阻塞。类似的,如果你发送数据到一个通道,它将阻塞当前协程直到有其他协程从通道中读取数据。此时写操作将阻塞.
下面是一个主线程在进行通道操作的时候造成死锁的例子
https://play.golang.org/p/2KTEoljdci_f
上面的程序将抛出下面的运行时错误
main() started
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
program.Go:10 +0xfd
exit status 2
fatal error: all goroutines are asleep — deadlock!. 这是在说所有协程都进入休眠状态,没有协程是可被调度的。
☛ 关闭通道
一个通道关闭后没有数据可以再被写入。从通道接收数据的协程可以通过语句val, ok := <- channel
来判别通道当前的状态,如果ok
is true
表示通道是打开的或者读操作可以执行,如果ok
is false
表示通道是关闭的或者读操作不可以执行。一个通道可以使用colse函数关闭,例如 close(channel)
。 让我们看一个简单的例子
https://play.golang.org/p/LMmAq4sgm02
上面代码仅仅是帮助你理解阻塞的概念, 第一个操作
c <- "John"
将阻塞协程直到有其他协程从此通道中读取数据, 因此greet
会被调度器调度执行. 然后第一个操作<-c
是非阻塞的 因为现在通道c
有数据可读. 第二个操作<-c
将被阻塞因为通道c
已经没数据可读, 此时main
协程将被激活并且程序执行close(c)
关闭通道操作。
从上面的错误中我们看到我们试图向一个已经关闭的通道中发送数据带来的影响。此外,如果我们试图从一个已关闭的通道中读取数据,程序将会异常。为了更好的理解closed channels带来的影响,我们看下面的for
循环的例子。
☛ For循环
我们可以使用 for
无限循环语句 for{}
不间断的从通道中读取数据。
https://play.golang.org/p/X58FTgSHhXi
在上面的例子中, 我们创建一个协程 squares
用来逐个计算1-9数字的平方并把他们放入通道中. 在主线程中我们使用for循环不间断从通道中读取这些数据。
因为是无限循环,所以我们需要在一个阶段使用break去中断循环,我们使用语句 val, ok := <-c
从通道中读取数据. 其中 ok
将返回通道是否关闭的信息. 所以在squares
协程中, 所有数据计算发送后我们将使用close(c)
关闭通道. 主线程中当 ok
is true
, 控制台输出 val
变量值并且通道状态是 ok
. 当通道状态是 false
的时候, 我们使用 break
关键词去退出循环,主线程退出。 下面是程序执行结果
main() started
0 true
1 true
4 true
9 true
16 true
25 true
36 true
49 true
64 true
81 true
0 false <-- loop broke!
main() stopped
当通道被关闭后我们在主线程中还可以读取0值数据。因为这个通道是用来传输int类型数据,默认情况下int的空值是0被返回。(注:从已经关闭的通道接收数据或者正在接收数据时,将会接收到通道类型的零值)
为了避免我们在for循环中使用break这种手段去判断通道关闭退出, Go给出了一个语法for range
,当通道关闭的时候它将主动退出循环. 我们来修改之前的代码
https://play.golang.org/p/ICCYbWO7ZvD
在上面的程序中我们使用 for val := range c
代替 for{}
. range
将循环从通道中读取数据直到它关闭。下面是代码打印结果
main() started
0
1
4
9
16
25
36
49
64
81
main() stopped
如果在 使用
for range
循环时候不主动关闭通道, 程序将产生运行时死锁。(译者注:个人理解,在1-9循环打印完毕后不关闭通道,子协程退出,此时主线程的range依然会从c通道接收值,但是已无值可接收,主线程将阻塞,程序会死锁。)
☛ 通道容量
正如我们看到的,每次往无缓冲通道发送数据会阻塞当前协程,这是因为我们还没有使用 make
函数的第二个参数。这第二个参数代表通道缓冲区的容量。 默认情况下通道缓冲区容量为0,被称为无缓冲通道。 向无缓冲通道发送的数据需要立即被读取。
当缓冲区参数不是0的时候。协程将不会阻塞除非缓冲区被填满。 当缓冲区满了之后,想要再往缓冲区发送数据只有等到有其他协程从缓冲区接收数据, (此时的发送协程是阻塞的). 有一点需要注意, 读缓冲区的操作是渴望式读取. 意味着一旦读操作开始它将读取缓冲区所有数据,直到缓冲区为空。原理上来说读操作的协程将不会阻塞直到缓冲区为空.
我们可以用以下语法定义有缓冲通道
c := make(chan Type, n)
它创建了一个Type
类型的通道并带着一个大小为 n
缓冲区。 除非缓冲区被填满,也就是数量超过n,否则当前协程一直可以往缓冲区发数据并且不会阻塞。
上面说了当前协程不会阻塞直到缓冲区满了,让我们用代码实践一下
https://play.golang.org/p/k0usdYZfp3D
在上面的程序中通道 c
缓冲区大小为3
. 这意味着它可以盛放 3
个值, 看代码20行,缓冲区没有溢出(因为我们没有再 往缓冲区放任何值), 这样主线程也就不会阻塞,直接退出程序。
让我们再加一个值
https://play.golang.org/p/KGyiskRj1Wi
正如上面所看到的,我们添加了 c <- 4
向已满的缓冲区又发送了一个数据,此时主协程阻塞, squares
协程调度并且会依次从缓冲区读取所有数据并打印。
通道的长度和容量
和切片类似,一个缓冲通道也有长度和容量。通道的长度是其内部缓冲队列未读的数据量,而通道的容量是缓冲区可最大盛放的数据量。我们可以使用len
函数去计算通道的长度,使用cap
函数去获得通道的容量。是不是和切片用法神似!
https://play.golang.org/p/qsDZu6pXLT7
你是否很疑惑上面的程序为什么没有发生死锁。这是因为这个c通道容量为3,但只盛放了2个数据。Go就不用去阻塞主线程去调度其他协程。 你也可以在主线程中去读取这些数据, 因为 虽然通道没有放满,也不会阻止你去从通道读取数据.
下面是一个很酷的例子
https://play.golang.org/p/-gGpm08-wzz
来测试一下自己的是否理解
https://play.golang.org/p/sdHPDx64aor
使用缓冲区通道和 for range
, 我们可以从已关闭的通道中读取数据。 虽然通道关闭了,但是数据还在缓冲区,我们仍然可以获取这些数据。
https://play.golang.org/p/vULFyWnpUoj
缓冲区通道像 Pythagoras Cup, 感兴趣的可以观看这个视频 Pythagoras Cup.
多协程协同工作
让我们写两个协程,一个用来计算数字的平方,另一个用来计算数字的立方。
https://play.golang.org/p/6wdhWYpRfrX
下面来一步步梳理下程序的执行流程。
- 我们创建两个函数
square
和cube
作为协程运行。 两个函数都有一个int类型通道参数c
,我们从c
中读取数据到变量num
,最后我们把计算的数据再写入到通道c
中。 - 在主线程中我们使用make函数创建两个int类型通道
squareChan
andcubeChan
- 然后我们分别运行
square
和cube
协程。 - 因为调度权还在主线程,所以执行
testNumb
赋值为3。 - 然后我们把数据放入通道
squareChan
。主线程将阻塞直到通道的数据被读取。 一旦通道的数据被读取,主线程将继续执行。 - 在主线程中我们试图从这两个通道中读取数据,此时线程可能阻塞直到有数据写入到通道。这里我们使用
:=
语法来接收多个通道的值。 - 一旦这些协程把数据写入到通道,主线程将阻塞(存疑,原文是这样,不过这里貌似有问题)。
- 当数据被写入通道中,主线程将继续执行,最后我们计算出数字的总和并打印到控制台。
下面是程序的运行结果
[main] main() started
[main] sent testNum to squareChan
[square] reading
[main] resuming
[main] sent testNum to cubeChan
[cube] reading
[main] resuming
[main] reading from channels
[main] sum of square and cube of 3 is 36
[main] main() stopped
☛ 单向通道
目前为止,我们已经学习到可以双向传递数据的通道,或者说,我们可以对通道做读操作和写操作。但是事实上我们也可以创建单向通道。比如只读通道只允许读操作,只写通道只允许写操作。
单向通道也可以使用make函数创建,不过需要额外加一个箭头语法。
roc := make(<-chan int)
soc := make(chan<- int)
在上面的程序中, roc
是一个只读通道,<- 在chan关键词前面。 soc
is 只写通道,<- 在chan关键词后面。 他们也算不同的数据类型。
https://play.golang.org/p/JZO51IoaMg8
但是单向通道有什么作用呢?使用单向通道可以 提高程序的类型安全性, 使得程序不容易出错。
但是假如你在一个协程中只需要读操作某通道,但是在主线程中却需要读写操作这个通道该怎么办呢?
幸运的是Go提供了一个简单的语法去把双向通道转化为单向通道。
https://play.golang.org/p/k3B3gCelrGv
我们修改 greet
协程函数,把参数 c
类型从双向通道改成单向接收通道。 现在我们只能从通道中读取数据,通道上的任何写入操作将会发生错误: "invalid operation: roc <- "some text" (send to receive-only type <-chan string)"
.
☛ 匿名协程
在 goroutines
章节, 我们学习了 匿名协程. 我们也可以在匿名协程上使用通道。让我们修改之前的例子来在匿名协程上使用通道。
这是之前的例子
https://play.golang.org/p/c5erdHX1gwR
下面是我们用匿名协程修改后的例子
https://play.golang.org/p/cM5nFgRha7c
☛ 通道类型的通道
通道也是一种类型可以像其他类型一样被用在很多地方: 比如结构体元素,函数参数, 函数返回值,甚至是作为其他通道的数据传输类型. 下面我们了解下怎么使用通道作为通道的数据传输类型。
https://play.golang.org/p/xVQvvb8O4De
☛ Select
select
和 switch
很像,它不需要输入参数,并且仅仅被使用在通道操作上。
Select语句被用来执行多个通道操作的一个和其附带的case
块代码。
让我们来看下面的例子,讨论下其执行原理
https://play.golang.org/p/ar5dZUQ2ArH
从上面的程序来看,我们知道 select
语句和 switch
很像,不同点是用通道读写操作代替了布尔操作。通道将被阻塞,除非它有默认的 default 块 (我们之后将看到). 一旦某个case条件执行,它将不阻塞. 所以一个 case 条件什么时候执行呢?
如果所有的case语句(通道操作)被阻塞,那么select语句将阻塞直到这些case条件的一个不阻塞(通道操作),case块执行。如果有多个case块(通道操作)都没有阻塞,那么运行时将随机选择一个不阻塞的case块立即执行。
为了演示上面的程序,我们开启两个协程并传入对应的通道变量. 然后我们写一个带有两个case操作的select语句。 一个case操作从chan1
读数据,另外一个从chan2
读数据。这两个通道都是无缓冲的, 读操作将被阻塞 (那么写操作呢). 所以select语句将阻塞. 因此select
将等待,直到有 case
语句不阻塞。
当程序执行到 select
语句后, 主线程将阻塞并开始调度service1
和service2
协程。 service1
休眠 3 秒
后未阻塞的把数据写入通道 chan1
. 与其类似, service2
等待 5 秒
后未阻塞的把数据写入通道 chan2
. 因为 service1
比 service2
早一步执行完毕, case 1 将首先调度执行,其他的cases块 (这里指 case 2)将被忽略。 一旦case块执行完毕, main
线程将开始继续执行。
上述程序真实模拟了一个数百万请求的服务器负载均衡的例子,它从多个有效服务中返回其中一个响应。 使用协程,通道和select语句, 我们可以向多个服务器请求数据并获取其中最快响应的那个。
为了模拟上面哪个case块率先返回数据,我们可以直接去掉Sleep
函数调用。
https://play.golang.org/p/giSkkqt8XHb
以下是上面程序的运行结果 (你可能有不一样的结果)
main() started 0s
service2() started 481µs
Response from service 2 Hello from service 2 981.1µs
main() stopped 981.1µs
有时也会出现这样的结果
main() started 0s
service1() started 484.8µs
Response from service 1 Hello from service 1 984µs
main() stopped 984µs
发生这种情况的原因是 chan1
和 chan2
操作几乎同时发生, golang在代码执行和调度上有时候可能会有差异。
为了证明当所有case块都是非阻塞的时候,golang会随机选择一个代码块执行打印reaponse,我们使用缓冲通道来改造程序。
https://play.golang.org/p/RLRGEmFQP3f
以下是上面程序的运行结果
main() started 0s
Response from chan2 Value 1 0s
main() stopped 1.0012ms
也可能是这种结果
main() started 0s
Response from chan1 Value 1 0s
main() stopped 1.0012ms
在上面的程序中,两个通道在其缓冲区中都有两个值。因为我们向容量为2的缓冲区通道分别发送了两个值, 所以这些通道发送操作不会阻塞并且会执行下面的select块。 select块中的所有case操作都不会阻塞,因为每个通道中都有两个值,而我们的case操作只需要取出其中一个值。因此,go运行时会随机选择一个case操作并执行其中的代码。
default
case块
像 switch
一样, select
语句也有 default
case块. default case 块 是非阻塞的. 不仅如此, default
case 块可以使 select
语句永不阻塞. 这意味着, 任何通道的 发送
和 接收
操作 (不管是缓冲或者非缓冲) 都不会阻塞当前线程。
如果有case块的通道操作是非阻塞,那么 select
会执行其case块.如果没有那么select将默认执行default
块.
https://play.golang.org/p/rFMpc80EuT3
在上面的程序中,因为通道是非缓冲的,case块的通道操作都是阻塞的,所有 default
块将被执行。如果上面的 select
语句没有default
块,select
将阻塞,没有response会被打印出来。
如果带有default
, select
将是非阻塞的, 调度器将不会从主线程转而调度其他协程。 但是我们可以使用 time.Sleep
改变这一点。 通过这种方式,主线程将把调度权转移到其他协程,在其他协程执行完毕后,调度权从新回到主线程手里。当主线程重新执行的时候,通道里面已经有值了,case操作将不会阻塞。
https://play.golang.org/p/eD0NHxHm9hN
以下是上面程序的执行结果
main() started 0s
service1() started 0s
service2() started 0s
Response from service 1 Hello from service 1 3.0001805s
main() stopped 3.0001805s
有时可能是这种结果
main() started 0s
service1() started 0s
service2() started 0s
Response from service 2 Hello from service 2 3.0000957s
main() stopped 3.0000957s
Deadlock
default
块在通道操作阻塞的时候是非常有用的,他可以避免死锁。 同时由于 default
块的非阻塞特性,Go可以避免在其他协程阻塞的时候去调度其他协程,从而避免死锁。
https://play.golang.org/p/S3Wxuqb8lMF
通道的发送操作也类似,, default
可以在其他协程不能被调度的时候被执行,从而避免死锁。
☛ nil
通道
我们知道通道的默认值是 nil
. 所以我们不能在一个nil通道上执行读写操作。并且在一个 nil
通道被用在select
语句上时候, 它可能会抛出下面示例中的错误。
https://play.golang.org/p/uhraFubcF4S
从上面的执行结果上来看,我们知道 select (no cases)
意味着select
语句其实是空的。因为 一个nil通道的case块会被忽略. 随着空 select{}
语句阻塞 main
线程 , service
协程将被调度, 其中 nil
通道的操作将抛出 chan send (nil chan)
错误. 为了避免上面的问题,我们应该使用 default
块。
https://play.golang.org/p/upLsz52_CrE
上面的程序虽然忽略了 case
块 ,但是却立即执行了 default
语句。 因此调度器没来得及去调度 service
协程。 但是这种写法真的不好。 你应该时刻去检查通道是不是nil
。
☛ 添加超时操作
上面的程序只有 default
块被执行,其实没什么作用。 有时候我们想要的是在规定的时间内必须有服务数据被返回,如果超过规定时间未返回再去执行 default
块 。这可以通过在一个case块上使用一个在定义时间后不阻塞的通道来实现。 time
包的 After
函数可以返回这样的一个通道。让我们看下面的例子。
https://play.golang.org/p/mda2t2IQK__X
上面的程序在2秒后返回以下结果
main() started 0s
No response received 2.0010958s
main() stopped 2.0010958s
在上面的程序中, <-time.After(2 * time.Second)
在2秒后将不再阻塞并返回时间值。 我们其实对此通道的返回值不感兴趣.。此操作更像一个协程阻塞了2秒,等于说我们其实有三个协程,并且time操作是最先执行完毕并不再阻塞。 因此time操作对应的case块将被率先执行。
如果你不想等待一个服务执行太长时间的时候,上面的方法是非常有用的。如果我们把程序修改成 10 * time.Second
, 不出意料的话,那么 service1
将被打印。
☛ 空 select
和 for{}
这样的空循环很像, 空 select{}
语法也是有效的。但是有一点必须要说明。我们知道select
将被阻塞除非有case块没有阻塞。因为select{}
没有 case
非阻塞语句, 主线程将阻塞并可能会导致死锁。
https://play.golang.org/p/-pBd-BLMFOu
在上面的程序中我们知道 select
将阻塞 main
线程,调度器将会调度service
这个协程。在service执行完毕后,调度器会再去调度其他可用的协程,但是此时已经没有可用的协程,主线程也正在阻塞,所以最后的结果就是发生死锁.
main() started
Hello from service!
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [select (no cases)]:
main.main()
program.Go:16 +0xba
exit status 2
☛ WaitGroup
有一种业务场景是你需要知道所有的协程是否已执行完成他们的任务。这个和只需要随机选择一个条件为true的select不同,他需要你满足所有的条件都是 `true` 才可以激活主线程继续执行。 这里的 条件 指的是非阻塞的通道操作。
WaitGroup 是一个带着计数器的结构体
,这个计数器可以追踪到有多少协程创建,有多少协程完成了其工作。当计数器为0的时候说明所有协程都完成了其工作。
让我们来通过一个例子了解上面的概念
https://play.golang.org/p/8qrAD9ceOfJ
在上面的程序中, 我们创建了一个sync.WaitGroup
类型的空结构体(带着0值字段) wg
。 WaitGroup 结构体有一些像noCopy
, state1
和 sema
这样的内部字段。 这个结构体也有三个公开方法: Add
, Wait
和 Done
.
Add
方法的参数是一个变量名叫delta
的 int
类型参数,主要用来内部计数。 内部计数器默认值为0. 它用于记录多少个协程在运行。当WaitGroup创建后,计数器值为0,我们可以通过给Add
方法传int类型值来增加它的数量。 记住, 当协程建立后,计数器
的值不会自动递增 ,因此需要我们手动递增它。
Wait
方法用来阻塞当前协程。一旦 计数器
为0, 协程将恢复运行。 因此,我们需要一个方法去降低计数器的值。
Done
方法可以降低计数器的值. 他不接受任何参数,因此,它每执行一次计数器就减1。
上面的例子中, 我们在创建 wg
变量后, 运行了三次for循环,每次运行的时候我们创建一个协程并给计数器加1。 这意味着现在我们有三个协程在等待运行并且WaitGroup的计数器值为3。注意我们传给协程函数的是一个指针,这是因为一旦在协程内部工作完成后,我们需要通过调用Done
方法去降低计数器的值。 如果 wg
通过值复制方式传过去, 因为传递的是一个拷贝,主线程中的wg
将不会得到修改。
在for
循环执行完成后,我们通过调用 wg.Wait()
去阻塞当前主线程,并把调度权让给其他协程,直到计数器值为0之后,主线程才会被再次调度。我们在另外三个协程中通过Done
方法把计数器值降为0,此时主线程将再次被调度并开始执行之后的代码。
下面是程序运行结果
main() started
Service called on instance 2
Service called on instance 3
Service called on instance 1
main() stopped
因为协程执行顺序的不同,上面的结果可能和你会有出入。
Add
方法接收int
类型参数,这意味着delta变量可能会接收负数,更多资料可以访问这个文档 here.
☛ 工作池
顾名思义, 一个工作池
并发执行某项工作的协程集合。 在 WaitGroup
章节, 我们已经用到的多个协程执行一个任务,但是他们并没有执行特定的工作,只是sleep了一下。 如果你向协程中传一个通道,他们可以去完成一些工作,变成一个工作池。
所以工作池其实就是维护了多个工作协程,这些协程的功能是可以收到任务,执行任务并返回结果。他们完成任务后我们就可以收到结果。这些协程使用相同的通道来达到自己的目的。
让我们看下面一个例子:
https://play.golang.org/p/IYiMV1I4lCj
下面我来一步步解释执行原理
sqrWorker
是一个带有tasks
通道,results
通道 和id
三个参数的协程函数。这个协程函数的任务是把从tasks
通道接收到的数字的平方发送到results
通道。- 在主函数中,我们创建了两个带缓冲区,容量为10的通道
tasks
andresult
。因此在缓冲区被充满之前,任何操作都是非阻塞的。所以有时候设置一个大点的缓冲区是个好办法。 - 然后我们循环创建多个
sqrWorker
协程,并传入tasks
通道,results
通道 和id
三个参数,用来传递和获取协程执行前后的数据。 - 接着我们向
tasks
非阻塞通道放入5个任务数据。 - 因为我们已经向任务通道放入的数据,所以我们可以关闭它,虽然这个操作不是必须的,但是如果以后运行中出现错误的话可以防止通道range带来的死锁问题。
- 然后我们开启循环5次从
results
通道接收数据,因为目前通道缓冲区没有数据,所以通道读取操作造成主线程阻塞,调度器将调度工作池的协程,直到有数据添加到results
通道。 - 当前我们有3个work协程在工作,我们使用了sleep操作来模拟阻塞操作,所以调度器在某一个阻塞的时候会去调用其他的work协程,当某个work协程sleep完成后会把计算数字的平方的结果数据放入
results
缓冲无阻塞通道。当3个协程依次交替把task通道的任务都完成后,for range
循环将完成,并且因为之前我们已经关闭了任务通道,所以协程也不会发生死锁。调度器将继续返回调度主线程。 - 有时候所有的工作协程可能都在阻塞,此时调度器将去调度主线程,直到
results
通道再次为空。 - 当所有work协程都完成任务退出后,主线程将继续拿到调度权并打印
results
通道剩下的数据,继续之后代码的执行。
上面的例子可能理解有点困难,但是却很好的解释了多个协程怎么在相同的通道上协同完成某项工作。当工作任务有阻塞的时候,协程发挥的作用就很大。如果你移除 time.Sleep()
,那么某一个协程将会独自去完成任务并且等到range循环全部执行完毕,本协程执行完毕才会去调度其他协程,没有达到多协程执行任务的目的。
因为系统运行速度的不同,你可能得到的运行结果和我的不一样。因为即使全部工作协程都被阻塞的时间在微秒级别,主线程都有可能会得到重新调度的机会。
所以现在我们可以使用 另外一个协程同步概念WaitGroup
来重写上面的例子,这样的话我们会得到一样的结果,但是可能没有通道实现那么优雅。
https://play.golang.org/p/0rRfchn7sL1
上面的结果看起来非常整洁,因为在主线程将被wg.Wait()
阻塞直到所有的结果数据都存入results
通道,当主线程再次从results
通道读取数据的时候都是无阻塞的。 使用 waitGroup
, 我们能避免许多(不必要)的上下文切换(调度), 但是这样做的代价是你必须等所有的工作协程完成后才能调度主线程.
☛ Mutex
互斥是Go中一个简单的概念。在我解释它之前,先要明白什么是竞态条件。 goroutines 都有自己的独立的调用栈,因此他们之间不分享任何数据。但是有一种情况是数据存放在堆上,并且被多个goroutines使用。 多个goroutines试图去操作一个内存区域的数据会造成意想不到的后果.。下面我们展示一个简单的例子:
https://play.golang.org/p/MQNepChxiEa
在上面的例子中,我们创建1000个goroutines用来并发的增加一个变量i
的值。因此我们使用WaitGroup
,目的是在1000个goroutines结束后 i 的值为1000 。 wg.Wait()
被调用后,主线程开始执行打印变量 i 的值,让我们看下结果:
value of i after 1000 operations is 937
是不是很惊讶,为什么得到的值会小于1000? 似乎有些goroutines 没有执行。事实上是我们的程序出现了竞态条件, 让我们看下可能会发生的事情,
i = i + 1
这个计算有3步
- (1) 得到 i 的值
- (2) 给 i 的值加1
- (3) 更新 i 的值
让我们想象一下多个goroutines在这三个步骤之间调用会发生什么. 例如让我们从1000个goroutines中拿出两个举个例子,标明 G1 和 G2。
G1 开始执行的时候 i 为0,运行两步之后 i
现在是 1
. 但是在G1执行第三步更新 i
的值的时候,新的协程G2被调度并且执行了3个步骤。 因此现在 i 的值为1。现在G1又一次被调度从第二步开始更新了 i 的值。理想情况下,2个协程调度后 i 的值应该是2,但是事实却不是这样。因此我们可以猜想到为什么结果不是1000。
目前为止我们学习到的调度方式都是协同调度。在 concurrency
章节我们提到,除非一个协程阻塞,否则其他协程是没有机会获得调度的。那么 i = i + 1
也没有阻塞, 为什么Go的调度器会去调度其他协程呢?
有关答案请点击这个链接 stackoverflow. 在任何情况下,都不应该依赖Go的调度算法,而应该实现自己的逻辑来同步不同的goroutine.
实现方法之一就是使用我们上面提到的互斥锁。互斥锁是一个编程概念,它保证了在同一时间只能有一个线程或者协程去操作同一个数据。当一个协程想要操作数据的时候,必须获取该数据的一个锁,操作完成后必须释放锁,如果没有获取到该数据的锁,那么就不能操作这个数据。
在Go中,互斥数据结构( map) 由sync
包提供。在Go中,多协程去操作一个值都可能会引起竞态条件。我们需要在操作数据之前使用 mutex.Lock()
去锁定它,一旦我们完成操作,比如上面提到的 i = i + 1
,我们就可以使用 mutext.Unlock()
方法解锁。如果在锁定的时候,有一个协程想要读写 i 的值,那么此协程将阻塞 直到前面的协程完成操作并解锁数据。因此在某一时刻有且仅有一个协程可以操作数据,从而避免竞态条件。记住,任何锁之间的变量在解锁之前对于其他协程都不是可用的。
让我们使用互斥锁修改上面的例子
https://play.golang.org/p/xVFAX_0Uig8
在上面的程序中,我们创建了一个互斥锁变量m
,并把它的指针传递给所有已创建的协程。 在协程内部,当我们要开始操作i
变量的时候,我们先通过m.Lock()
获得锁,操作完成后我们使用m.Unlock()
释放锁。下面是程序运行结果
value of i after 1000 operations is 1000
从上面的结果来看互斥锁可以帮助我们解决竞态条件。 但首要规则是避免goroutine之间共享资源.
在Go中,你可以使用命令行
Go run -race program.Go
去检测你的程序有没有发生竞态条件。 点击这里了解更多 here.
并发应用场景
这有很多使用并发的方式可以简化我们的编程开发。下面介绍一些使用并发加速编程的技巧和概念。
生成器
使用通道,我们可以更好地实现生成器。如果生成器的计算开销很大,那么我们也可以并发生成数据。这样,生成数据的逻辑就不会阻塞主程序。比如生成斐波那契数列。
https://play.golang.org/p/1_2MDeqQ3o5
我们通过使用 fib
函数, 获得一个可迭代返回数据的通道。在fib
函数内部, 我们创建并返回一个带缓冲的 只读
通道。函数最后将把双向通道转换成单向通道返回。在协程函数内部,我们循环计算斐波那契数字并发送到通道中。一旦循环完成,我们将关闭通道。在主线程中,我们使用 for range
迭代fib
函数的返回值,因为返回值是一个通道,我们就可以直接迭代并打印它。
扇入 & 扇出
扇入是一种多路复用的策略,把几个通道的输入整合到一个输出的通道。扇出是一种多路分解策略,将单个通道的数据分散到多个通道。
package main
import (
"fmt"
"sync"
)
// 从切片中读取元素,写入intput channel
func getInputChan() <-chan int {
//实例化容量为100,int类型的 input channel
input := make(chan int, 100)
// 用来写到通道的数据
numbers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
// 启动协程,把数字写到input channel
go func() {
for num := range numbers {
input <- num
}
// 数据写入完毕,关闭channel
close(input)
}()
return input
}
// 把从input channel 读到的数字,做平方运算,再写入output channel
func getSquareChan(input <-chan int) <-chan int {
// 实例化容量为100的int类型的output channel
output := make(chan int, 100)
// 启动协程
go func() {
// 遍历input channel,把读到的数字,平方运算,写入output
for num := range input {
output <- num * num
}
//关闭output channel
close(output)
}()
return output
}
// 返回 对`outputsChan` 通道合并之后的通道
// 这会产生扇入通道
// 这是一个可变参数的函数
func merge(outputsChan ...<-chan int) <-chan int {
// 申明 WaitGroup
var wg sync.WaitGroup
// 实例化 merged 通道
merged := make(chan int, 100)
// 增加一个计数器,计数器的参数为outputsChan的长度
// 因为我们将会创建多个goroutine,其中goroutine的数量就是要准备进行合并的通道的数量
wg.Add(len(outputsChan))
// 从sc channel读取数据,写入到merged 通道
output := func(sc <-chan int) {
// 遍历
for sqr := range sc {
merged <- sqr
}
//一旦sc通道关闭,
//在`WaitGroup`上调用`Done`来递减计数器
wg.Done()
}
//把`output`函数运行为groutines,
// 启动n个协程
//其中n等于作为函数参数接收的通道数
//这里我们在`outputsChan`上使用`for range`循环,因此无需手动告诉`n`
for _, optChan := range outputsChan {
go output(optChan)
}
// 一旦完成,运行goroutine关闭 merged 通道
go func() {
// 等到WaitGroup结束
wg.Wait()
close(merged)
}()
return merged
}
func main() {
//步骤1:获取输入数字通道
//通过调用`getInputChan`函数,它运行一个goroutine,它将数字发送到返回的通道
chanInputNums := getInputChan()
//步骤2:对多个goroutine进行 `扇出` 平方操作
// 这可以通过多次调用`getSquareChan`函数来完成,其中单个函数调用返回一个通道,该通道发送由`chanInputNums`通道提供的数字的平方
//`getSquareChan`函数在内部运行goroutine,同时运行平方操作
chanOptSqr1 := getSquareChan(chanInputNums)
chanOptSqr2 := getSquareChan(chanInputNums)
//步骤3:扇入(合并)`chanOptSqr1`和`chanOptSqr2`输出到合并频道
// 这是通过调用`merge`函数实现的,该函数将多个通道作为参数
// 并使用`WaitGroup`和多个goroutines来接收平方数,我们可以发送平方数
//到 `merged` 通道,并关闭
chanMergedSqr := merge(chanOptSqr1, chanOptSqr2)
//步骤4:计算0到9之间的所有整数的平方再求和,大约是'285`
//这是通过在`chanMergedSqr`上使用`for range`循环来完成的
sqrSum := 0
//运行直到`chanMergedSqr`或合并频道关闭
//当所有goroutines推送到合并频道完成时,在`merge`函数中发生
for num := range chanMergedSqr {
sqrSum += num
}
//步骤5:当`for循环'完成执行时,在`chanMergedSqr`通道关闭之后打印总和
fmt.Println("Sum of squares between 0-9 is", sqrSum)
}
https://play.golang.org/p/hATZmb6P1-u
我不打算解释上述程序是如何工作的,因为我在程序中添加了注释来解释这一点。 以上程序产生以下结果
Sum of squares between 0-9 is 285
本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。
推荐文章: