大话通道
稍微有编码常识的同学,都会意识到程序并非完全按照纯代码逻辑顺序执行。有多线程多进程经验,知道程序执行往往表现的像无规律的交叉,而且每次重新来过,还体现不一样。 本文以通道为引子,意直白讲述并发同步
内存顺序
编译器(在编译时刻)优化和CPU处理器优化(运行时),会调整指令的执行地顺序。这导致指令执行顺序与代码指定的顺序不完全一致。所以当你认为你的代码是按照书写的逻辑执行时,事实有可能并非如此,尤其是在高并发的情况下。在go语言中表现为,指令执行顺序(指令顺序,称之为内存顺序)的调整,可能会影响到其它协程行为。
并发同步
为了应对这些调整,需要同步技术。换而言之,对于某些调整代码会影响到最终输出结果的情况,必须作出内存顺序保证。非go语言通常会用到锁和原子操作,以及适用于多个进程之间或多个主机之间,用网络或文件读写来实现并发同步。
锁基本上有这几种。互斥锁有点独的味道,无论是读还是写,都会阻塞,即有人在读其它人别说写了,想看看(读)都没门。读写锁,单写多读模型,除了写时阻塞,你读时,我可读,他可读,大家谁读都没问题,不阻塞。更进一步,其实站在锁的立场,它需要知道,谁在用他,用完后给哪些需要他的人。换而言之,得通知可能得锁人,让那些没得锁的人继续等待。条件锁出现了,它依赖于前面两种锁,通常用来协调想要访问共享资源的线程,在对应的共享资源状态发送变化时,通知其它因此而阻塞(在等待)的线程。
中断 代码从运行状态切换到非运行状态称之为中断
原子操作通常是 CPU 和操作系统提供支持,执行过程中不会中断
原子操作 互斥锁只能保证临界区代码的串行执行,不能保证代码执行的原子性。
而原子执行过程中不中断,可以完全消除竞态条件,从而绝对保证并发安全性,无锁直接通过CPU指令直接实现。
顺序保证
上面说了那么多,就一目的实现内存模型(指令执行)顺序保证,即确保某些情况下顺序不被调整(或即便调整了也不影响最终的结果正确性)。Go内存模型除了提供主流的锁或原子操作做出顺序保证外,还提供了通道操作顺序保证。
通道
通道简单理解,就是由读,写,缓冲三个队列组成的数据类型。其实它的设计逻辑,以无缓冲通道为例,假定有一空杯,大家喝水都用它,加锁那套是要喝水的人时不时要看看,有没有人在用那个杯子,没有人用它用完则放回原地。而通道不一样,它是杯子到手了,我用完了,直接传递给下一个要用的人,当然你得保证喝完之后杯子里有水。二者的区别在于,前者需要锁防止大家争抢水杯,而后者则你不需要去找水,你只需要告诉水杯我要喝水,上一个喝水的人,喝完之后,会灌满递给你。前者强调共享,后者重在传递。所以不要让计算通过共享内存来通讯,而应该让它们通过通讯来共享内存。
最快到达
现实生活中,发出请求并不总是及时响应,有时面对多源数据,我们会发出多个请求,只采用其中响应最快的那个。
import (
"fmt"
"math/rand"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
startTime := time.Now()
// 采用缓冲通道,模拟同步发出多个请求
c := make(chan int32, 5)
for i := 0; i < cap(c); i++ {
go source(c)
}
// 只取一个最快的响应结果
rnd := <-c
// 测量最快时间差
fmt.Println(time.Since(startTime))
fmt.Println(rnd)
}
func source(c chan<- int32) {
ra, rb := rand.Int31(), rand.Intn(3)+1
// 随机模拟请求的响应时间
time.Sleep(time.Duration(rb) * time.Second)
c <- ra
}
Future/Promise
Future/promise 常常用在请求/回应场合,以下示例 sumSquares
函数调用的两个实参请求并发进行。 每个通道读取操作将阻塞到请求返回结果为止。 两个实参总共需要大约3秒钟(而不是6秒钟) 准备完毕(以较慢的一个为准)
package main
import (
"fmt"
"math/rand"
"time"
)
func longTimeRequest() <-chan int32 {
r := make(chan int32)
go func() {
time.Sleep(time.Second * 3) // 模拟一个工作负载
r <- rand.Int31n(100) // 随机正整数范围
}()
return r
}
func sumSquares(a, b int32) int32 {
return a*a + b*b
}
func main() {
rand.Seed(time.Now().UnixNano()) // 准备随机初始种子
start := time.Now() // 计时
a, b := longTimeRequest(), longTimeRequest() // goroutine分发,并发执行
fmt.Println(sumSquares(<-a, <-b), time.Since(start)) // 输出类似 10084 3.000541298s
}
- 通知
互斥锁
将容量为1的缓冲通道,作为互斥锁,下面示例发送操作加锁
package main
import "fmt"
func main() {
mutex := make(chan struct{}, 1) // 容量必须为1,二元信号
counter := 0
increase := func() {
mutex <- struct{}{} // 发送通道加锁
counter++
<-mutex // 解锁
}
increase10 := func(done chan<- struct{}) {
for i := 0; i < 10; i++ {
increase()
}
done <- struct{}{}
}
done := make(chan struct{})
go increase10(done)
go increase10(done)
<-done; <-done
fmt.Println(counter) // 20
}
计数信号量
计数信号量经常被使用于限制最大并发数,下面以酒吧喝酒示例
package main
import (
"log"
"math/rand"
"time"
)
type Seat int
type Bar chan Seat
func (bar Bar) ServeCustomer(c int, seat Seat) {
log.Print("顾客#", c, "进酒吧了")
log.Print("++ 顾客", c, "坐在",seat,"号位开始喝酒#", )
time.Sleep(time.Second * time.Duration(2+rand.Intn(6)))
log.Print("-- 顾客#", c, "离开了", seat, "号座位")
bar <- seat // 离开座位
}
func main() {
rand.Seed(time.Now().UnixNano())
bar24x7 := make(Bar, 3) // 此酒吧最多能同时服务3个客人
for seatId := 0; seatId < cap(bar24x7); seatId++ {
bar24x7 <- Seat(seatId) // 酒吧放置3把椅子,此处不会阻塞
}
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
seat := <-bar24x7 // 等待,当有空位时允许进
go bar24x7.ServeCustomer(customerId, seat)
}
select {} // 主协程永久阻塞,防止退出
}
- 对战
其它
用通道实现请求/应答模式,使用缓冲,并不能保证结果顺序与分发顺序一致。道理很简单,在同步发出多个请求,最先响应的并不一定是第一个请求(各个请求响应耗时不一),你不能根据响应的结果来断定通道是否读取完毕。读写一致是最好的保证。
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: