「Golang成长之路」并发之Channel上

一、使用 channel 等待任务结束#

在前面的内容中很多地方使用到了:
time.Sleep(time.Millisecond)
如:

func chanDemo(){
    var channels [10]chan int  //创建channel数组
    for i := 0; i < 10; i++{
        channels[i] = creatworker(i)
    }
    for i := 0; i < 10; i++{
        channels[i] <- 'a' + 1
    }
    time.Sleep(time.Millisecond)
}
func bufferedChannel(){
    ch := make(chan int, 3)
    go worker(0, ch)
    for i := 0; i < 10; i++{
        ch <- 'a' + i
    }
    time.Sleep(time.Millisecond)
}
func channelClose(){
    ch := make(chan int)
    go worker(0, ch)
    ch <- 'a'
    ch <- 'b'
    ch <- 'c'
    ch <- 'd'
    close(ch)
    time.Sleep(time.Millisecond)
}
在这些方法里面我们很容易知道他们运行所消耗的时间,但事实上,很多程序的时间是不能预估的,我们不能一直是用 time 包来对程序运行的时间进行预估,是不靠谱的,所以这里我们有了 “使用 channel 等待任务结束”#

先看这段代码:

func chanDemo(){
   var channels [10]chan int //创建channel数组
   for i := 0; i < 10; i++{
      channels[i] = creatworker(i)
   }
   for i := 0; i < 10; i++{
      channels[i] <- 'a' + 1
  }
   time.Sleep(time.Millisecond)
}

我们需要使用 channel 并发的打印 10 个字母,为了让字母完整打印,我们对程序运行时间进行了预估,让程序运行 1 毫秒就结束;下面我们需要使用
1. 使用 channel 等待任务结束
仍然打印字母 (打印 20 个):部分内容见代码注释
使用’chan bool’的通道来共享通讯来使用内存,告诉 main 任务结束

package main

import (
   "fmt"
)
//定义一个结构体
//包含一个 'chan int ' 的in和 'chan bool'的done
type Worker struct{
   in chan int
   done chan bool  //对done的接和收作为结束的信号
}

func DoWork( id int, c chan int, done chan bool){
   for {
      n := <-c //接受channel的内容
      fmt.Printf("worker %d received %c\n", id, n)
      done <- true  //done接收数据true
  }
}

func createWorker(id int) Worker {
   //建立Worker的一个对象w
   w := Worker{make(chan int),
   make(chan bool),
   }

   go DoWork(id, w.in, w.done) 此处启动goroutine,即并发
   return w
}

func ChanDemo() {

   var workers [10]Worker  //创建10个抽象类型Worker

  for i := 0; i < 10; i++{ 
      workers[i] = createWorker(i) //创建10个Worker的对象,并返回给workers[i]
   }

   for i := 0; i < 10; i++{  //可使用range
      workers[i].in <- 'a' + i   //workers[i].in接受数据
   }

   for _, worker := range workers {
      <-worker.done   //将done的数据发给mian,告知main该任务结束
   }

   for i := 0; i < 10; i++{ //可使用range
      workers[i].in <- 'A'+ i  //workers[i].in接受数据
   }

   for _, worker := range workers{
      <- worker.done  //将done的数据发给mian,告知main该任务结束
   }
}

func main(){
   ChanDemo()
}

打印结果:
worker 0 received a
worker 5 received f
worker 1 received b
worker 6 received g
worker 4 received e
worker 9 received j
worker 8 received i
worker 2 received c
worker 7 received h
worker 3 received d
worker 6 received G
worker 2 received C
worker 3 received D
worker 7 received H
worker 1 received B
worker 4 received E
worker 5 received F
worker 0 received A
worker 9 received J
worker 8 received I

从打印结果看出:是按顺序打印的 (先小写后大写)
这里还有一种方法:

func ChanDemo() {

   var workers [10]Worker

  for i := 0; i < 10; i++{
      workers[i] = createWorker(i)
   }
   for i := 0; i < 10; i++{
      workers[i].in <- 'a' + i
   }
   for i := 0; i < 10; i++{
      workers[i].in <- 'A'+ i
   }
   //将两个<- worker.done 放在一起
   for _, worker := range workers{
      <- worker.done   
      <- worker.done
   }
}

但是需要注意的是:
我们一共创建了 10 个 goroutine,在第二个 for 中就已经向所有 channel 中发送数据,接着第三个 for,又向 channel 中发送数据,这样会死锁,因为第一次 channel 中的数据没有人来接,然后又向 channel 发数据。
解决方法:在 DoWork 函数增加并发,让 done <- true 处于并发执行状态,可随时向 main 发数据。

func DoWork( id int, c chan int, done chan bool){
   for {
      n := <-c //接受channel的内容
      fmt.Printf("worker %d received %c\n", id, n)
      go func() {
        done <- true
     }()
  }
}

2. 使用系统提供的 ‘WaitGroup’等待任务结束
WaitGroup 提供了:Add ()、Wait ()、Done () 方法

package main

import (
   "fmt"
 "sync")

type Worker struct{
   in chan int
   wg *sync.WaitGroup  //引用需要指针
}

func DoWork( id int, c chan int, wg *sync.WaitGroup){
   for {
      n := <-c //接受channel的内容
      fmt.Printf("worker %d received %c\n", id, n)
       wg.Done()  //接和收结束信息
   }
}

func createWorker(id int, wg *sync.WaitGroup) Worker {
   //建立Worker的一个对象w
   w := Worker{make(chan int), wg,}

   go DoWork(id, w.in, wg)
   return w
}

func ChanDemo() {
   var wg sync.WaitGroup

   var workers [10]Worker
   for i := 0; i < 10; i++{
      workers[i] = createWorker(i, &wg)  //指针
   }
   wg.Add(20)  //20个任务
   for i := 0; i < 10; i++{
      workers[i].in <- 'a' + i
   }
   for i := 0; i < 10; i++{
      workers[i].in <- 'A'+ i
   }
   wg.Wait()  //任务结束
}

func main(){
   ChanDemo()
}

打印结果:
worker 0 received a
worker 4 received e
worker 6 received g
worker 2 received c
worker 9 received j
worker 7 received h
worker 0 received A
worker 8 received i
worker 5 received f
worker 3 received d
worker 1 received b
worker 1 received B
worker 9 received J
worker 2 received C
worker 3 received D
worker 4 received E
worker 7 received H
worker 8 received I
worker 6 received G
worker 5 received F

二、select#

之前的内容中,我们使用 channel 都是一个一个的收数据,如果我们需要把多个 channel 同时收,该怎么办?
答案是:Go 语言引入了 select 语句

下面来具体介绍一下 select:
select 的逻辑和 switch 的逻辑类似,他们都有多个 case 分支和 default,但 select 是针对 channel 的,其逻辑是:在多个含有 case 分支的 select 里面,当某时刻相应的 channel 满足发发出数据,让外面接收,就能满足对应 case,接下来就会执行该 case 对应的语句块,如果多个 case 同时都满足条件,则会随机选择其中一个 case,如果所有 case 都不满足则会执行 default
例如:

var activeWorker chan<- int
n := 0
select {
  //c1, c2 为chan int类型
  case n = <-c1:
     fmt.Printf("this is c1:%d\n", n)
  case n = <-c2:
     fmt.Printf("this is c2:%d\n", n)
  case activeWorker <- n:
     hasValue = false
  default:
     fmt.Println("not find channel")
    return
  }

在执行 select 时,程序会将所有的 case 分析一遍,先来看第一个 case,如果此时 c1 发出数据,则第一个 case 可被执行,再看第二个 case,如果此时 c2 发出数据,则第二个 case 可被执行,再看第三个 case,如果此时 n 有值就会将其值发给 activeWorker, 最后来看 default,当上面所有的 case 都不满足时,就会执行 default 的语句块。

下面来看一个完整的 select 的应用:

package main

import (
   "fmt"
 "math/rand" "time")

//控制时间,向channel里面发送消息
func generator() chan int{
   out := make(chan int)
   go func() {
      i := 0
     for{
         //控制发送数据时间间隔
         time.Sleep(time.Duration( rand.Intn(1500) ) * time.Microsecond)
         out <- i
         i++
      }
   }()
   return out
}

//channel接受和打印信息
func DoWork( id int, c chan int){
   for {
    n := <- c  //接受channel的内容
    time.Sleep(time.Second)  //控制打印时间间隔
    fmt.Printf("worker %d received %d\n", id, n)
   }
}

//建立channel,启动并发
func createWorker(id int) chan<- int{
   w := make(chan int)
   go DoWork(id, w)
   return w
}

func main() {
   var c1, c2 = generator(), generator()
   var worker = createWorker(0)
   n := 0
   var Values []int //动态缓存数据
   //tm为程序总时间  tm := time.After(1 * time.Second)
   tick := time.Tick(time.Second)

   for{
      var activeWorker chan<- int
      var activeValue int
      if len(Values) > 0 {
          activeWorker = worker
          activeValue = Values[0]
      }
      select {
      case n = <-c1:
         Values = append(Values, n)
      case n = <-c2:
         Values = append(Values, n)
      case activeWorker <- activeValue:
         Values = Values[1:]
      case <- time.After(2000 * time.Microsecond):  //每两次发送数据时间差超过800毫秒执行一次
         fmt.Println("timeout")
      case <- tick:   //使用tick反映系统状态
         fmt.Println("queue len is:",len(Values))
      case <- tm:  //使用tm控制总时间
         fmt.Println("bey")
         return
     }
   }
}

打印结果:
worker 0 received 132
worker 0 received 133
worker 0 received 134
worker 0 received 136
worker 0 received 135
worker 0 received 136
worker 0 received 137
worker 0 received 138
worker 0 received 137
worker 0 received 138
worker 0 received 139
timeout
worker 0 received 140
worker 0 received 139
timeout
worker 0 received 140
worker 0 received 141
worker 0 received 142
worker 0 received 143
worker 0 received 141
timeout
worker 0 received 142
worker 0 received 144
worker 0 received 145
worker 0 received 143
worker 0 received 144
worker 0 received 145
worker 0 received 146
worker 0 received 146
worker 0 received 147
worker 0 received 147
worker 0 received 148
worker 0 received 148
worker 0 received 149
worker 0 received 149
worker 0 received 150
worker 0 received 151
timeout
worker 0 received 152
worker 0 received 150
worker 0 received 153
worker 0 received 151
worker 0 received 152
worker 0 received 153
worker 0 received 154
worker 0 received 154
timeout
worker 0 received 155
worker 0 received 156
worker 0 received 155
worker 0 received 157
worker 0 received 156
worker 0 received 158
worker 0 received 157
timeout
worker 0 received 158
worker 0 received 159
worker 0 received 159
worker 0 received 160
worker 0 received 161
worker 0 received 162
worker 0 received 160
timeout
worker 0 received 161
worker 0 received 163
worker 0 received 162
timeout
worker 0 received 164
worker 0 received 163
worker 0 received 164
worker 0 received 165
worker 0 received 166
worker 0 received 167
worker 0 received 165
worker 0 received 168
timeout
worker 0 received 166
worker 0 received 169
worker 0 received 167
worker 0 received 170
timeout
worker 0 received 171
worker 0 received 168
timeout
worker 0 received 172
worker 0 received 169
worker 0 received 170
worker 0 received 171
worker 0 received 173
timeout
worker 0 received 174
worker 0 received 172
worker 0 received 175
……
……
……
worker 0 received 2352
worker 0 received 2386
timeout
worker 0 received 2387
worker 0 received 2353
timeout
worker 0 received 2388
worker 0 received 2389
worker 0 received 2354
worker 0 received 2390
worker 0 received 2355
bey
这个程序充分体现了 select 的实际应用

三、在这里总结了几个常见的问题:#

func ChanDemo() {

var workers [10]Worker

  for i := 0; i < 10; i++{
workers[i] = createWorker(i)
}

for i := 0; i < 10; i++{
workers[i].in <- 'a' + i
}

在第一个 for 中,第一步 workers [0] = createWorker (0)

然后就进入这里

func createWorker(id int) Worker {
//建立Worker的一个对象w
  w := Worker{make(chan int),
     make(chan bool),
     }

go DoWork(id, w.in, w.done)
return w
}

在这个函数中我们开了一个 goroutine,同时我们会将 w 返回给 workers [0],然后就进入:

for i := 0; i < 10; i++{
workers[i] = createWorker(i)
}

的第二次,第三次循环……

直到循环结束。

但是这里就有问题了,在这个途中我们一共开了 10 goroutine,但是这 10 goroutine 都处于等待状态 (因为我们还没有给 channel 任何内容,从我们的输出结果可以看出)

  1. 那么这里的 10 个 goroutine 是处于等待状态是不是因为,我们 channel 没有接受到任何信息,所以就会造成 goroutine 的等待?

2. 还有这里:

 func DoWork( id int, c chan int, done chan bool){
    for {
         n := <-c //接受channel的内容
         fmt.Printf("id: %v, chan:%c\n", id, n)
         done <- true
      }
    }

这个死循环,为什么在函数调用后只循环了一遍? 当然这里我知道他是其中一个 goroutine

3. 当然还有一个问题,就是我们在前两个问题在基础上,调用函数 DoWork () 时,也会对应的将 true 发送给与之对应的 workers [i].done 中,然后:

for i := 0; i < 10; i++{
workers[i].in <- 'a' + i
}

for _, worker := range workers {
<- worker.done
}

在这里的第二个 for 中,这里 <- worker.done 全为 true,我们是不是从这里就可以了解到前面的 10 个 goroutine 结束了?

4. 也正是因为这样我们才不需要 time 包,来预计程序的运行时间了?

三、回答#
  1. 是的,它们此时都在等待,等别人从 in 中发送任务数据。

  2. 这是个死循环,一般我们 goroutine 中常会这么写,只要有任务就做。视频里实际上大写字母,小写字母,一共执行两遍。执行多少遍取决于外界,这里是 main 函数,到底发送了多少任务给我这个 worker [i]。

  3. 这里的 true 方向同学搞错了,是 worker 通知 main 函数,说我做完了。<- worker.done 这里是 main 函数接收 worker.done 的数据,如果收到,就说明这个 worker 的事情做完了。

  4. ​是的,理想情况下应该不需要 time 包来预计运行时间。预计的时间会不靠谱。

本作品采用《CC 协议》,转载必须注明作者和本文链接
刻意学习
未填写
文章
128
粉丝
107
喜欢
197
收藏
286
排名:330
访问:2.9 万
私信
所有博文
社区赞助商