条件变量-总是与锁一块使用的条件变量

未匹配的标注

前言

条件变量-总是与锁一块使用的条件变量

条件变量:条件变量的作用并不保证在同一时刻仅有一个go程(线程)访问某个共享的数据资源,而是在对应的共享数据的状态发生变化时,通知阻塞在某个条件上的go程(线程)。条件变量不是锁,在并发中不能达到同步的目的,因此 条件变量总是与锁一块使用。

条件变量

在讲解条件变量之前,先回顾一下前面我们所涉及的“生产者消费者模型”:

package main

import "fmt"

//只写,不读。
func producer(out chan<- int)  {
   for i:= 0; i < 10; i++ {
      out <- i*i                
   }
   close(out)
}
//只读,不写
func consumer(in <-chan int)  {
   for num := range in {        
      fmt.Println("num = ", num)
   }
}
func main()  {
   ch := make(chan int)     // 创建一个双向channel
   go producer(ch)          // 生产者,产生数据,写入 channel
   consumer(ch)             // 消费者,从channel读数据,打印到屏幕
}

这个案例中,虽然实现了生产者消费者的功能,但有一个问题。如果有多个消费者来消费数据,并且并不是简单的从channel中取出来进行打印,而是还要进行一些复杂的运算。在consumer( )方法中的实现是否有问题呢?如下所示:

package main

import "fmt"
import "sync"
import "time"

var sum int 

func producer(out chan<- int) {
for i := 0; i < =100; i++ {
    out <-i
}
close(out);
}

// 此chanel 只能读,不能写
func consumer(in <-chan int) {
for num := range in {
    sum +=num
}
fmt.println(“sum = ”, sum) 
}

func main() {

    ch:= make(chan int) // 创建一个双向通道
 go producer(ch)        // go程1,生产者,生产数字,写入channel
 go consumer(ch)        // go程2,消费者1
consumer(ch)        // 主go程,消费者。从channel读取内容打印
for  {
   ;
    }
}

在上面的代码中,加了一个消费者,同时在consumer方法中,将数据取出来后,又进行了一组运算。这时可能会出现一个go程从管道中取出数据,参与加法运算,但是还没有算完另外一个go程又从管道中取出一个数据赋值给了num变量。所以这样累加计算,很有可能出现问题。当然,按照前面的知识,解决这个问题的方法很简单,就是通过加锁的方式来解决。增加生产者也是一样的道理。

另外一个问题,如果消费者比生产者多,仓库中就会出现没有数据的情况。我们需要不断的通过循环来判断仓库队列中是否有数据,这样会造成cpu的浪费。反之,如果生产者比较多,仓库很容易满,满了就不能继续添加数据,也需要循环判断仓库满这一事件,同样也会造成CPU的浪费。

我们希望当仓库满时,生产者停止生产,等待消费者消费;同理,如果仓库空了,我们希望消费者停下来等待生产者生产。为了达到这个目的,这里引入条件变量。(需要注意:如果仓库队列用channel,是不存在以上情况的,因为channel被填满后就阻塞了,或者channel中没有数据也会阻塞)。

条件变量:条件变量的作用并不保证在同一时刻仅有一个go程(线程)访问某个共享的数据资源,而是在对应的共享数据的状态发生变化时,通知阻塞在某个条件上的go程(线程)。条件变量不是锁,在并发中不能达到同步的目的,因此条件变量总是与锁一块使用。

例如,我们上面说的,如果仓库队列满了,我们可以使用条件变量让生产者对应的goroutine暂停(阻塞),但是当消费者消费了某个产品后,仓库就不再满了,应该唤醒(发送通知给)阻塞的生产者goroutine继续生产产品。

GO标准库中的sync.Cond类型代表了条件变量。条件变量要与锁(互斥锁,或者读写锁)一起使用。成员变量L代表与条件变量搭配使用的锁。

type Cond struct {
   noCopy noCopy
   // L is held while observing or changing the condition
   L Locker
   notify  notifyList
   checker copyChecker
}

对应的有3个常用方法,Wait,Signal,Broadcast。

1) func (c *Cond) Wait()

该函数的作用可归纳为如下三点:

  • 阻塞等待条件变量满足      
  • 释放已掌握的互斥锁相当于cond.L.Unlock()。 注意:两步为一个原子操作。
  • 当被唤醒,Wait()函数返回时,解除阻塞并重新获取互斥锁。相当于cond.L.Lock()

2) func (c *Cond) Signal()

  • 单发通知,给一个正等待(阻塞)在该条件变量上的goroutine(线程)发送通知。

3) func (c *Cond) Broadcast()

  • 广播通知,给正在等待(阻塞)在该条件变量上的所有goroutine(线程)发送通知。

下面我们用条件变量来编写一个“生产者消费者模型”

示例代码:

package main
import "fmt"
import "sync"
import "math/rand"
import "time"

var cond sync.Cond             // 创建全局条件变量

// 生产者
func producer(out chan<- int, idx int) {
   for {
      cond.L.Lock()             // 条件变量对应互斥锁加锁
      for len(out) == 3{            // 产品区满 等待消费者消费
         cond.Wait()                // 挂起当前go程, 等待条件变量满足,被消费者唤醒
      }
      num := rand.Intn(1000)    // 产生一个随机数
      out <- num                // 写入到 channel 中 (生产)
      fmt.Printf("%dth 生产者,产生数据 %3d, 公共区剩余%d个数据\n", idx, num, len(out))
      cond.Signal()             // 唤醒 阻塞的 消费者
      cond.L.Unlock()               // 生产结束,解锁互斥锁
      time.Sleep(time.Second)       // 生产完休息一会,给其他go程执行机会
   }
}
//消费者
func consumer(in <-chan int, idx int) {
   for {
      cond.L.Lock()             // 条件变量对应互斥锁加锁(与生产者是同一个)
      for len(in) == 0 {        // 产品区为空 等待生产者生产
         cond.Wait()                // 挂起当前go程, 等待条件变量满足,被生产者唤醒
      }
      num := <-in                   // 将 channel 中的数据读走 (消费)
      fmt.Printf("---- %dth 消费者, 消费数据 %3d,公共区剩余%d个数据\n", idx, num, len(in))
      cond.Signal()             // 唤醒 阻塞的 生产者
      cond.L.Unlock()               // 消费结束,解锁互斥锁
      time.Sleep(time.Millisecond * 500)        //消费完 休息一会,给其他go程执行机会
   }
}
func main() {
   rand.Seed(time.Now().UnixNano())  // 设置随机数种子

   product := make(chan int, 3)      // 产品区(公共区)使用channel 模拟
   cond.L = new(sync.Mutex)          // 创建互斥锁和条件变量

   for i := 0; i < 5; i++ {          // 5个消费者
      go producer(product, i+1)
   }
   for i := 0; i < 3; i++ {          // 3个生产者
      go consumer(product, i+1)
   }
   for {                            // 主go程阻塞 不结束
    runtime.GC()
}
}
  1. main函数中定义quit,其作用是让主go程阻塞。
  2. 定义product作为队列,生产者产生数据保存至队列中,最多存储3个数据,消费者从中取出数据模拟消费
  3. 条件变量要与锁一起使用,这里定义全局条件变量cond,它有一个属性:L Locker。是一个互斥锁。
  4. 开启5个消费者go程,开启3个生产者go程。
  5. producer生产者,在该方法中开启互斥锁,保证数据完整性。并且判断队列是否满,如果已满,调用wait()让该goroutine阻塞。当消费者取出数后执行cond.Signal(),会唤醒该goroutine,继续生产数据。
  6. consumer消费者,同样开启互斥锁,保证数据完整性。判断队列是否为空,如果为空,调用wait()使得当前goroutine阻塞。当生产者产生数据并添加到队列,执行cond.Signal() 唤醒该goroutine。

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
讨论数量: 0
发起讨论 只看当前版本


暂无话题~