Golang 并发编程中条件变量的理解与使用

  (本文提到的channel和读写锁混用导致的隐性死锁,和应用条件变量的代码见文章末尾。)
   现在的讨论情形是一个拥有多个生产者同时还有多个消费者的“生产者-消费者”模型。此时为了解决多个go程同时访问公共区造成的数据混乱,可以加入互斥锁。
   与之前隐性死锁的例子不同的是,当时是将无缓冲的channel和读写锁一起使用,并将这个无缓冲channel作为了公共区,而无缓冲的特性是:负责读和写的两个go程必须同时处于运行态(非阻塞),否则这个channel的两端都会一直阻塞,可是一旦向公共区加了锁,它就不会允许你的读go程和写go程同时访问公共区,由此而产生隐性死锁。但是这个例子不太一样了,它将作为公共区的无缓冲channel改成了有缓冲channel,这样的话对公共区的读写就可以在不同的时刻进行,也就不会产生那种隐性死锁。
   那么从现在开始,所有的go程在访问公共区之前,必须要抢到这把锁,当某一个go程拿到锁以后就可以对公共区进行访问,访问结束后解锁,相当于把锁扔掉,然后所有go程又开始抢这把锁。
  有了这把互斥锁,就能够让这多个go程在访问公共区时,由并行变成串行,从而保证了线程(go程)安全。但是当写入公共区的go程远远多于读go程时,就会产生这样一种情形:公共区的空间已经被写go程给写满了,还没有来得及被读go程给读走,此时很可能又有一个写go程抢到了锁,但是它在尝试向公共区写数据时,由于公共区已满,当前go程就会在此阻塞。也就是说这个写go程不是因为拿不到锁而阻塞,它虽然已经拿到锁了,但是在尝试向公共区写数据的时候阻塞住了。而与此同时,其他所有的go程在访问公共区时都因为拿不到锁而阻塞住了,所以导致了所有的go程都陷入阻塞,就会产生死锁。
   同样地,当读go程远远多于写go程时,公共区的数据全部被读走,写go程还没有来得及向公共区写入数据,此时会有读go程抢到锁并尝试从公共区读取数据,但由于公共区为空,所以当前读go程进入阻塞,而其他go程与此同时由于拿不到锁也会阻塞,这样也会发生和前一种同样的死锁。
  为了解决这个问题,引入了条件变量的概念。
   先来看一下什么是条件变量,以及条件变量中包含哪些方法。GO标准库中的sys.Cond类型代表了条件变量。条件变量要与锁(互斥锁,或者读写锁)一起使用。成员变量L代表与条件变量搭配使用的锁。

type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
对应的有3个常用方法,Wait,Signal,Broadcast。
1)func (c Cond) Wait()
该函数的作用可归纳为如下三点:
a)阻塞等待条件变量满足
b)释放已掌握的互斥锁相当于cond.L.Unlock()。 注意:两步为一个原子操作。
c)当被唤醒,Wait()函数返回时,解除阻塞并重新获取互斥锁。相当于cond.L.Lock()
2)func (c
Cond) Signal()
单发通知,给一个正等待(阻塞)在该条件变量上的goroutine(线程)发送通知。
3)func (c *Cond) Broadcast()
广播通知,给正在等待(阻塞)在该条件变量上的所有goroutine(线程)发送通知。
再来看一下条件变量的使用流程。
使用流程:

1.  创建 条件变量: var cond    sync.Cond

2.  指定条件变量用的 锁:  cond.L = new(sync.Mutex)

3.  cond.L.Lock()   给公共区加锁(互斥量)

4.  判断是否到达 阻塞条件(缓冲区满/空) —— for 循环判断

    for  len(ch) == cap(ch) {   cond.Wait() —— 1) 阻塞 2) 解锁 3) 加锁

5.  访问公共区 —— 读、写数据、打印 

6.  解锁条件变量用的 锁  cond.L.Unlock()

7.  唤醒阻塞在条件变量上的 对端。 signal()  Broadcast()

      我把这个过程比喻成了公司的员工流动,公共区就是这个公司固定数量的岗位,对公共区的写操作就是员工入职,读操作就是员工离职。互斥锁就是一间办公室,想入职的必须来这间办公室面试,想离职的也必须来这间办公室办离职手续。并且这间办公室只能给一个人面试或者给一个人办离职手续(线程间互斥)。员工要想入职(go程向公共区写数据)就一定要进入这间办公室来面试(加锁),接下来判断是否满足条件(条件变量),如果公司岗位已经满了(公共区被写满),就让他先不必面试,并离开这间办公室(wait函数完成的解锁),但是公司还不能让你走,一直在门外等待(解了锁但没有解除阻塞),防止他走了以后又来面试(占着锁不放)。这样就能让出办公室(多个go程重新抢锁),给其他办离职手续的人让位置,当有人离职后会发出通知(signal方法),让门口等待的人再进去面试(重新获得锁),该员工入职后就会离开办公室(解锁),上岗干活(向公共区写入数据),然后通知准备离职的员工可以去那个办公室办理离职手续(signal方法唤醒阻塞在条件变量上的对端)。
   至于说为什么公共区每次写进来或读出数据以后都要执行signal方法来唤醒对端上阻塞的go程,因为每次我有数据写进去就说明读端现在一定有数据读,一定可以唤醒一个读端的阻塞go程,同样,一旦有数据读出去,就说明写端至少有一个go程可以被唤醒并写入数据。通常signal方法比broadcast更常用一些。

代码:
(1)channel和读写锁混用导致的隐性死锁:
package main

import (
"math/rand"
"time"
"fmt"
"sync"
)

var rwMutex sync.RWMutex // 锁只有一把, 2 个属性 r w

func readGo(in <-chan int, idx int) {
for {
rwMutex.RLock() // 以读模式加锁
num := <-in
fmt.Printf("----%dth 读 go程,读出:%d\n", idx, num)
rwMutex.RUnlock() // 以读模式解锁
}
}

func writeGo(out chan<- int, idx int) {
for {
// 生成随机数
num := rand.Intn(1000)
rwMutex.Lock() // 以写模式加锁
out <- num
fmt.Printf("%dth 写go程,写入:%d\n", idx, num)
time.Sleep(time.Millisecond * 300) // 放大实验现象
rwMutex.Unlock()
}
}

func main() {
// 播种随机数种子
rand.Seed(time.Now().UnixNano())

quit := make(chan bool)         // 用于 关闭主go程的channel
ch := make(chan int)            // 用于 数据传递的 channel

for i:=0; i<5; i++ {
    go readGo(ch, i+1)
}
for i:=0; i<5; i++ {
    go writeGo(ch,i+1)
}
for{
    ;
}

}

(2)应用条件变量的代码:
package main

import (
"fmt"
"time"
"math/rand"
"sync"
)
var cond sync.Cond // 定义全局条件变量

func producer08(out chan<- int, idx int) {
for {
// 先加锁
cond.L.Lock()
// 判断缓冲区是否满
for len(out) == 5 {
cond.Wait() // 1. 2. 3.
}
num := rand.Intn(800)
out <- num
fmt.Printf("生产者%dth,生产:%d\n", idx, num)
// 访问公共区结束,并且打印结束,解锁
cond.L.Unlock()
// 唤醒阻塞在条件变量上的 消费者
cond.Signal()
time.Sleep(time.Millisecond * 200)
}
}

func consumer08(in <-chan int, idx int) {
for {
// 先加锁
cond.L.Lock()
// 判断 缓冲区是否为空
for len(in) == 0 {
cond.Wait()
}
num := <-in
fmt.Printf("-----消费者%dth,消费:%d\n",idx, num)
// 访问公共区结束后,解锁
cond.L.Unlock()
// 唤醒 阻塞在条件变量上的 生产者
cond.Signal()
time.Sleep(time.Millisecond * 200)
}
}

func main() {
product := make(chan int, 5)
rand.Seed(time.Now().UnixNano())

quit := make(chan bool)

// 指定条件变量 使用的锁
cond.L = new(sync.Mutex)                // 互斥锁 初值 0 , 未加锁状态

for i:=0; i<5; i++ {
    go producer08(product, i+1)         // 1 生产者
}
for i:=0; i<5; i++ {
    go consumer08(product, i+1)         // 3 个消费者
}

/ for {
runtime.GC()
}
/
<-quit //主go程在此一直阻塞,相当于死循环,程序必须手动退出

}

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 2
Summer

麻烦看下 markdown 的用法 6.6. LearnKu Markdown 编辑器使用指南

4年前 评论
package main

import (
    "fmt"
    "runtime"
    "time"
    "math/rand"
    "sync"
)
var cond sync.Cond // 定义全局条件变量

func producer08(out chan<- int, idx int) {
    for {
        // 先加锁
        cond.L.Lock()
        // 判断缓冲区是否满
        for len(out) == 5 {
            cond.Wait() // 1. 2. 3.
        }
        num := rand.Intn(800)
        out <- num
        fmt.Printf ("生产者 % dth,生产:% d\n", idx, num)
        // 访问公共区结束,并且打印结束,解锁
        cond.L.Unlock()
        // 唤醒阻塞在条件变量上的 消费者
        cond.Signal()
        time.Sleep(time.Millisecond * 200)
    }
}

func consumer08(in <-chan int, idx int) {
    for {
        // 先加锁
        cond.L.Lock()
        // 判断 缓冲区是否为空
        for len(in) == 0 {
            cond.Wait()
        }
        num := <-in
        fmt.Printf ("----- 消费者 % dth,消费:% d\n",idx, num)
        // 访问公共区结束后,解锁
        cond.L.Unlock()
        // 唤醒 阻塞在条件变量上的 生产者
        cond.Signal()
        time.Sleep(time.Millisecond * 200)
    }
}

func main() {
    product := make(chan int, 5)
    rand.Seed(time.Now().UnixNano())

    quit := make(chan bool)

    // 指定条件变量 使用的锁
    cond.L = new(sync.Mutex)                // 互斥锁 初值 0 , 未加锁状态

    for i:=0; i<5; i++ {
        go producer08(product, i+1)         // 1 生产者
    }
    for i:=0; i<5; i++ {
        go consumer08(product, i+1)         // 3 个消费者
    }

    select {
    case <-quit:
    }
}

你的md文档格式也太难看了吧

3年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!