学习 Go并发模型
前言
- 简化并发开发
- 以下代码内容均来自大彬大佬
1.简单例子
- 将数组内的数据转变为他们的平方
- 分解以上过程为三个步骤
- 生产信息
producer(),遍历切片 - 处理信息
square(),计算平方 - 消费信息
main(),消费
- 生产信息
1.生产信息
func producer(nums ...int) <-chan int {
// 创建带缓冲通道
out := make(chan int,10)
// 通过协程将数据存储到通道中
go func(){
defer close(out) //最后关闭通道
for _,num := range nums {
out <- num
}
}()
return out
}
2.处理信息
func square(inCh <-chan int) <-chan int {
out := make(chan int,10)
go func(){
defer cloes(out)
for n := range inCh {
out <- n*n
}
}()
return out
}
3.消费信息
func main() {
// 先将数据拆分放入通道
in := producer(1,2,3,4)
// 处理数据
ch := square(in)
// 消费数据
for ret := range ch {
fmt.Printf("%3d",ret)
}
}
扇形模型优化 FAN-IN 与 FAN-OUT
- FAN-OUT : 多个 goruntine 从同一个通道读取数据,直到该通道关闭
- FAN-IN :1个 goruntine 从多个通道读取数据,直到该通道关闭
1. FAN-OUT 和 FAN-IN 实践
1.生产者producer() 和 消息处理square()不变
func producer(nums ...int) <-chan int {
// 创建带缓冲通道
out := make(chan int,10)
// 通过协程将数据存储到通道中
go func(){
defer close(out) //最后关闭通道
for _,num := range nums {
out <- num
}
}()
return out
}
func square(inCh <-chan int) <-chan int {
out := make(chan int,10)
go func(){
defer close(out)
for n := range inCh {
out <- n*n
}
}()
return out
}
2. 新增merge() 用来多个square() 操作最后回归到一个通道消费读取— FAN-IN
func merge(cs ...<-chan int) <-chan int {
out := make(chan int,10)
// 创建计时器
var wg sync.WaitGroup
// 将所有数据回归到一个通道中
collect := func (in <-chan int){
defer wg.Done()
for n := range in {
out <- n
}
}
wg.Add(len(cs))
// FAN - IN
for _,c := range cs {
go collect(c)
}
// 错误方式:直接等待是bug,死锁,因为merge写了out,main却没有读,出现该错误的原因是使用了无缓冲通道,如果要实现这个bug,请将 merge() 中的 make(chan int,10) 改成 make(chan int)
// wg.Wait()
// close(out)
go func(){
wg.Wait()
close(out)
}()
return out
}
3.修改main(),启动3个square(),一个生产者producer()被多个square()读取 — FAN-OUT
func main() {
in := producer(1,2,3,4)
// FAN-OUT 这个时候开启了协程
c1 := square(in)
c2 := square(in)
c3 := square(in)
// consumer
for ret :=range merge(c1,c2,c3) {
fmt.Printf("%3d",ret)
}
}
3.优化 FAN 模式
- 不同的场景优化不同,要依据具体的情况,解决程序的瓶颈
- 但总的来说 不推荐用无缓冲通道,推荐用有缓冲通道
结语
- 这是一篇学习博客,推荐去看 原文章
- 谢谢能看到最后
本作品采用《CC 协议》,转载必须注明作者和本文链接
关于 LearnKu
@roo_kie
make(chan int,10)改成mak(chan int)再尝试下wg.Wait(),这是我文章中的错误,没有好好说明,推荐不要使用无缓冲通道@奇迹师 我的理解是这样,在第一个square中in里面的值就已经被消费完了,所以在c2,c3中并没有消费到,如果这里进行debug会发现c2,c3的channel里没有值,所以才感觉这部分是多余的,如果被多个square()消费,我建议每个square启动一个goroutine来竞争。如果有我不理解的地方请指正
@roo_kie 谢谢你提到的
在第一个square中里面的值就已经被消费这个错误,我在最近看文章的时候确实发现了这个问题 参考地址,之前没有回答这个问题是因为不知道如何复现,同时知识面也不是太广.这个是最新写的,不知道能不能够帮到你