学习 Go并发模型

前言

1.简单例子

  • 将数组内的数据转变为他们的平方
  • 分解以上过程为三个步骤
    • 生产信息 producer(),遍历切片
    • 处理信息 square(),计算平方
    • 消费信息 main(),消费

1.生产信息

func producer(nums ...int) <-chan int {
    // 创建带缓冲通道
    out := make(chan int,10)
    // 通过协程将数据存储到通道中
    go func(){
        defer close(out) //最后关闭通道
        for _,num := range nums {
            out <- num
        }
    }()
    return out
}

2.处理信息

func square(inCh <-chan int) <-chan int {
    out := make(chan int,10)
    go func(){
        defer cloes(out)
        for n := range inCh {
            out <- n*n
        }
    }()
    return out
}

3.消费信息

func main() {
    // 先将数据拆分放入通道
    in := producer(1,2,3,4)
    // 处理数据
    ch := square(in)
    // 消费数据
    for ret := range ch {
    fmt.Printf("%3d",ret)
    }
}

扇形模型优化 FAN-IN 与 FAN-OUT

  • FAN-OUT : 多个 goruntine 从同一个通道读取数据,直到该通道关闭
  • FAN-IN :1个 goruntine 从多个通道读取数据,直到该通道关闭

1. FAN-OUT 和 FAN-IN 实践

1.生产者producer() 和 消息处理square()不变

func producer(nums ...int) <-chan int {
    // 创建带缓冲通道
    out := make(chan int,10)
    // 通过协程将数据存储到通道中
    go func(){
        defer close(out) //最后关闭通道
        for _,num := range nums {
            out <- num
        }
    }()
    return out
}

func square(inCh <-chan int) <-chan int {
    out := make(chan int,10)
    go func(){
        defer close(out)
        for n := range inCh {
            out <- n*n
        }
    }()
    return out
}

2. 新增merge() 用来多个square() 操作最后回归到一个通道消费读取— FAN-IN

func merge(cs ...<-chan int) <-chan int {
    out := make(chan int,10)

    // 创建计时器
    var wg sync.WaitGroup
    // 将所有数据回归到一个通道中
    collect := func (in <-chan int){
        defer wg.Done()
        for n := range in {
            out <- n
        }
    }

    wg.Add(len(cs))
    // FAN - IN
    for _,c := range cs {
        go collect(c)
    }

    // 错误方式:直接等待是bug,死锁,因为merge写了out,main却没有读,出现该错误的原因是使用了无缓冲通道,如果要实现这个bug,请将 merge() 中的 make(chan int,10) 改成 make(chan int)
    // wg.Wait()
     // close(out)

    go func(){
        wg.Wait()
        close(out)
    }()

    return out
}

3.修改main(),启动3个square(),一个生产者producer()被多个square()读取 — FAN-OUT

func main() {
    in := producer(1,2,3,4)

    // FAN-OUT  这个时候开启了协程
    c1 := square(in)
    c2 := square(in)
    c3 := square(in)

    // consumer
    for ret :=range merge(c1,c2,c3) {
        fmt.Printf("%3d",ret)
    }
}

3.优化 FAN 模式

  • 不同的场景优化不同,要依据具体的情况,解决程序的瓶颈
  • 但总的来说 不推荐用无缓冲通道,推荐用有缓冲通道

结语

  • 这是一篇学习博客,推荐去看 原文章
  • 谢谢能看到最后
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 4

file 这一部分我在我电脑上跑不会死锁 file 这里in传的是引用,在这里c2,c3是多余的 file 感觉这样比较合理

3年前 评论

@roo_kie

  • 谢谢你能看我的文章,以及同时指出我的错误
  • 你说的那个错误,因为我全部的通道都使用了缓冲通道,你可以试着把 make(chan int,10) 改成 mak(chan int) 再尝试下 wg.Wait() ,这是我文章中的错误,没有好好说明,推荐不要使用无缓冲通道
  • 我并不认为 c2,c3是多余的,这篇文章的目的是学习 FAN-IN 和 FAN-OUT 模型,写c2,c3的目的是:让一个producer创建的生产者通道能够被多个 square()消费。
3年前 评论

@奇迹师 我的理解是这样,在第一个square中in里面的值就已经被消费完了,所以在c2,c3中并没有消费到,如果这里进行debug会发现c2,c3的channel里没有值,所以才感觉这部分是多余的,如果被多个square()消费,我建议每个square启动一个goroutine来竞争。如果有我不理解的地方请指正

3年前 评论
奇迹师 (楼主) 3年前

@roo_kie 谢谢你提到的 在第一个square中里面的值就已经被消费 这个错误,我在最近看文章的时候确实发现了这个问题 参考地址,之前没有回答这个问题是因为不知道如何复现,同时知识面也不是太广.这个是最新写的,不知道能不能够帮到你


type mc struct {
    cond *sync.Cond
    done bool
}

func New() *mc {
    return &mc{
        cond: sync.NewCond(&sync.Mutex{}),
        done: false,
    }
}

// producer 一个发送者
func (m *mc) producer(nums ...int) <-chan int {
    inCh := make(chan int, len(nums))
    go func() {
        m.cond.L.Lock()
        defer func() {
            close(inCh)
            m.cond.L.Unlock()
            m.cond.Broadcast()
            m.done = true
        }()
        for _, num := range nums {
            inCh <- num
        }
    }()
    return inCh
}

func (m *mc) consumer(inCh <-chan int) <-chan int {
    outCh := make(chan int, len(inCh))
    go func() {
        m.cond.L.Lock()
        defer func() {
            defer m.cond.L.Unlock()
            close(outCh)
        }()

        for !m.done {
            m.cond.Wait()
        }
        for ch := range outCh {
            outCh <- ch
        }
    }()
    return inCh
}

func merge(chs ...<-chan int, ) <-chan int {
    var wg sync.WaitGroup
    // 将所有数据最后集中到一个通道中
    outCh := make(chan int, len(chs))

    // 将所有数据回收
    collect := func(in <-chan int) {
        defer wg.Done()
        for n := range in {
            // 将数据传入通道
            outCh <- n
        }
    }

    wg.Add(len(chs))
    for _, ch := range chs {
        go collect(ch)
    }
    go func() {
        wg.Wait()
        close(outCh)
    }()

    return outCh
}

func main() {
    m := New()
    inCh := m.producer(1, 2, 3, 4, 5, 6)
    out1 := m.consumer(inCh)
    out2 := m.consumer(inCh)
    out3 := m.consumer(inCh)

    for i := range merge(out1, out2, out3) {
        fmt.Println(i)
    }
}
3年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!