线程模型

线程模型

  • 最近在写有关 go 的线程,但是突然一下子就卡住了,之前明明看过很多文章,也写过协程池,为什么一到实战就不行了呢,好吧,其实是写的不多,但是快速上手也很重要,为什么不偷懒下直接套模板呢,以下是我总结的一些可以直接套用的简易模板

前置知识

  • gorontine 协程
  • channel 通道

快速开始

  1. 一个十分简单的例子
func main() {
    // 使用 sync.WaitGroup 来替代 time.Sleep()
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Println("hello")
    }()
    wg.Wait()
}

尝试开三个协程,一个生产者,一个消费者,生产者负责组装发送信息,消费者负责接收计算

// producer 负责组装发送信息
func producer(nums ...int) <-chan int {
   inCh := make(chan int, len(nums))
   go func() {
      defer close(inCh)
      for _, num := range nums {
         inCh <- num
      }
   }()
   return inCh
}

// consumer 消费者
func consumer(inCh <-chan int) <-chan int {
   outCh := make(chan int, len(inCh))
   go func() {
      defer close(outCh)
      for in := range inCh {
         outCh <- in*in
      }
   }()
   return outCh
}

func main() {
   // 将数据组装为通道 -- 意味着可以被多组消费
  in := producer(1, 2, 3, 4)
   // 进行消费
  out := consumer(in)
   // 打印输出
  for i:= range out {
      fmt.Println(i)
   }
}

考虑多种情况

1. M个接收者和一个发送者,发送者通过关闭用来传输数据的通道来传递发送结束信号


type mc struct {
    cond *sync.Cond
    done bool
}

func New() *mc {
    return &mc{
        cond: sync.NewCond(&sync.Mutex{}),
        done: false,
    }
}

// producer 一个发送者
func (m *mc) producer(nums ...int) <-chan int {
    inCh := make(chan int, len(nums))
    go func() {
        m.cond.L.Lock()
        defer func() {
            close(inCh)
            m.cond.L.Unlock()
            m.cond.Broadcast()
            m.done = true
        }()
        for _, num := range nums {
            inCh <- num
        }
    }()
    return inCh
}

func (m *mc) consumer(inCh <-chan int) <-chan int {
    outCh := make(chan int, len(inCh))
    go func() {
        m.cond.L.Lock()
        defer func() {
            defer m.cond.L.Unlock()
            close(outCh)
        }()

        for !m.done {
            m.cond.Wait()
        }
        for ch := range outCh {
            outCh <- ch
        }
    }()
    return inCh
}

func merge(chs ...<-chan int, ) <-chan int {
    var wg sync.WaitGroup
    // 将所有数据最后集中到一个通道中
    outCh := make(chan int, len(chs))

    // 将所有数据回收
    collect := func(in <-chan int) {
        defer wg.Done()
        for n := range in {
            // 将数据传入通道
            outCh <- n
        }
    }

    wg.Add(len(chs))
    for _, ch := range chs {
        go collect(ch)
    }
    go func() {
        wg.Wait()
        close(outCh)
    }()

    return outCh
}

func main() {
    m := New()
    inCh := m.producer(1, 2, 3, 4, 5, 6)
    out1 := m.consumer(inCh)
    out2 := m.consumer(inCh)
    out3 := m.consumer(inCh)

    for i := range merge(out1, out2, out3) {
        fmt.Println(i)
    }
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!