Golang 并发编程
CSP(Communicating Sequential Processes) 通信顺序进程。用于描述并发通信中的模式。golang原生的chan通道可阻塞,可锁可队列性。它的目的在于,保证并发执行的可控,即让并发的无序执行在某些条件下具备有序可控。
Goroutine本身是异步执行的代码块,它的执行顺序与其书写分配 任务顺序无关,与运行时通道,调度策略相关。
生产者 VS 消费者
通道阻塞与否事关后续代码的执行时机。通常同一个函数或goroutine内代码块是同步顺序执行的。可一旦内有通道阻塞,则意味着当前阻塞点后续代码的执行会在其他goroutine某段代码之后释放(即依赖于另一个对应通道读写),换而言之,会发生执行上下文切换,而并非上一行阻塞点执行完毕,立刻执行下一行。表现出来的样子,有小子在阻塞点位前插队。
细思极恐,若不停的切换上下文,则存在多个goroutine中有代码段被阻塞的通道切割成了多个小片段,每次切换实际上只是执行区间小片段,然后再进入其它的Groutine。这有点更像goto,执行一段跳到另外一段,未来某时又跳回来执行后续代码。
chan 本质是有序,当它处于阻塞时,是指该代码点之后的代码执行,需要等待运行时触发其状态变更为非阻塞时方可运行。下述布尔变量b在,done通道读取之后时。其打印结果完全不同,决定了其布尔值是输出头或尾。
package main
import (
"flag"
"fmt"
"log"
"os"
"runtime"
"runtime/pprof"
)
type Consumer struct {
msgs *chan int
}
func NewConsumer(msgs *chan int) *Consumer {
return &Consumer{msgs: msgs}
}
// consumer 不停地读取通道
func (c *Consumer) consume(){
fmt.Println("consume: Started")
for {
msg := <-*c.msgs
fmt.Println("consume: Received:", msg)
}
}
// Producer 结构体
type Producer struct {
msgs *chan int
done *chan bool
}
func NewProducer(msgs *chan int, done *chan bool)*Producer {
return &Producer{msgs: msgs, done:done}
}
// 生产者方法
func (p *Producer)produce(max int){
fmt.Println("produce: Started")
for i := 0; i < max; i++ {
fmt.Println("produce:Sending ", i)
*p.msgs <-i
}
// 阻塞在外部得到信号退出
*p.done <- true
fmt.Println("produce: Done")
}
func main() {
cpuprofile := flag.String("cpuprofile", "", "write cpu profile to `file")
memprofile := flag.String("memprofile", "", "write memory profile to `file`")
max := flag.Int("n", 5, "defines the number of messages")
flag.Parse()
runtime.GOMAXPROCS(runtime.NumCPU())
if *cpuprofile != ""{
f,err := os.Create(*cpuprofile)
if err != nil{
log.Fatal("could not create CPU profile: ", err)
}
if err := pprof.StartCPUProfile(f);err!=nil{
log.Fatal("could not start CPU profile: ", err)
}
defer pprof.StopCPUProfile()
}
var msgs = make(chan int)
var done = make(chan bool)
// 使用结构体方法作为goroutine, 同一结构体资源使用必使用锁(即chan),即使访问具有有效性
go NewProducer(&msgs,&done).produce(*max)
go NewConsumer(&msgs).consume()
var b bool
fmt.Println(b)
<-done
if *memprofile != ""{
f,err := os.Create(*memprofile)
if err!= nil{
log.Fatal("could not create memory profile: ", err)
}
runtime.GC()
if err := pprof.WriteHeapProfile(f);err!=nil{
log.Fatal("could not write memory profile: ", err)
}
f.Close()
}
}
效果
D:\code-base\gomod\gott>go run "d:\code-base\gomod\gott\main.go"
false
consume: Started
produce: Started
produce:Sending 0
produce:Sending 1
consume: Received: 0
consume: Received: 1
produce:Sending 2
produce:Sending 3
consume: Received: 2
consume: Received: 3
produce:Sending 4
produce: Done
D:\code-base\gomod\gott>
死循环中的通道
之所以对通道说这么多,是想讲明不像通常所见,从上到下书写执行。而是强调在goroutine中阻塞,会迅速切换上下文,并不会陷入consume方法看起来死循环(若无通道又非goroutine,该段代码是铁定在理论上会耗尽cpu计算资源)。另外补充一下,每个goroutine的时间片是有时限。通道在某种意义上协调多个goroutine之间的执行
从另一方面也说明并非读写通道就会切换执行上下文,比如在无缓冲写通道不会阻塞后续代码,有多次写入未满缓冲通道亦不会阻塞。所以无论是有缓冲通道,还是无缓冲通道,对于goroutine而言他上下文切换,通道阻塞是其触发条件之一。
理发师问题
理发店在理发室内有一张理发椅,且在待理室有若干个供客户等候休息的凳子。当完成理发,理发师不再伺候,直接进待理室看看是否还有其他待理发的人,若无则返回理发室睡觉。每当有客人进店,他会先瞅瞅理发师在作什么。若在睡则叫他起来干活,若在理发则在待理室等候,如果没有凳子可供休息,则会选择直接离开。
package main
import (
"fmt"
"sync"
"time"
)
const(
sleeping = iota
checking
cutting
)
var stateLog = map[int]string{
0:"Sleeping",
1:"Checking",
2:"Cutting",
}
var wg *sync.WaitGroup // 潜在的客户数
type Barber struct {
name string
sync.Mutex
state int // Sleeping/Checking/Cutting
customer *Customer
}
type Customer struct {
name string
}
// 获取当前客户地址
func (c *Customer)String() string{
return fmt.Sprintf("%p", c)[7:]
}
func NewBarber()(b *Barber){
return &Barber{
name: "Sam",
state:sleeping,
}
}
// 理发师 goroutine
// 核查客户
// 理发师睡觉 等待唤醒
func barber(b *Barber, wr chan *Customer,wakers chan *Customer){
for {
b.Lock()
defer b.Unlock()
b.state = checking
b.customer=nil
// 检查待理室客户
fmt.Printf("Checking waiting room: %d\n", len(wr))
// 模拟查询时阻塞
time.Sleep(time.Microsecond*100)
// 待理室分配任务
select{
case c := <-wr:
// 1.主动理发
// 理发
HairCut(c,b)
// 解锁
b.Unlock()
default:
// 2. 唤醒理发
// 待理室人员为空,打印理发师名
fmt.Printf("Sleeping Barber - %s \n", b.customer)
// 重置理发状态
b.state = sleeping
b.customer = nil
b.Unlock()
// 唤醒者客户出现
c := <-wakers
// 理发时加锁
b.Lock()
fmt.Printf("Woken by %s\n", c)
HairCut(c,b)
b.Unlock()
}
}
}
// 理发逻辑,此段代码多个goroutine共用,注意加解锁
func HairCut(c *Customer, b *Barber){
b.state = cutting
b.customer = c
b.Unlock()
fmt.Printf("Cutting %s hair\n", c)
// 模拟理发时间
time.Sleep(time.Millisecond * 100)
b.Lock()
wg.Done()
// 用户走人
b.customer = nil
}
// 客户 goroutine
// 若待理室已满,则理发失败,否则进来的都是客户
func customer(c *Customer, b *Barber, wr chan<- *Customer, wakers chan<- *Customer){
// 客户进来
time.Sleep(time.Microsecond*50)
// 理发上锁
b.Lock()
// 当前用户,理发师状态,待理室用户数,具备唤醒权的用户娄数,唤醒人
fmt.Printf(" Customer %s checks %s barber | room: %d, w %d - customer: %s\n", c, stateLog[b.state],len(wr),len(wakers),b.customer)
switch b.state {
// 理发师在睡觉
case sleeping:
select {
// 客户成为待唤醒者
case wakers <- c:
default:
// 否则先成为待理室成员
select{
case wr <-c:
default:
// 待理室没人,则默认理发师完成工作
wg.Done()
}
}
case cutting:
select{
// 待理室有人
case wr <-c:
default:
// 待理室已满坐不下了,默认执行此多余人离开理发店
wg.Done()
}
case checking:
// 抛出 客户goroutine 不应核验发师,应由理发师核检待理室的客户
panic("Customer shouldn't check for the Barber when Barber is Checking the waiting room")
}
b.Unlock()
}
func main() {
b := NewBarber()
b.name = "Pardon"
// 模拟待理室有5把椅子
WaitingRoom := make(chan *Customer, 5)
// 存在一位唤醒者
Wakers := make(chan *Customer,1)
// 理发师处理
go barber(b, WaitingRoom, Wakers)
time.Sleep(time.Microsecond*100)
wg = new(sync.WaitGroup)
n := 10
// 增加10个计数点
wg.Add(10)
// 生成10个客户
for i := 0; i < n; i++ {
time.Sleep(time.Microsecond*50)
c := new(Customer)
// 分配十个消费goroutine
go customer(c, b, WaitingRoom, Wakers)
}
// 阻塞主线程
wg.Wait()
fmt.Println("No more customers for the day")
}
效果
D:\code-base\gomod\gott>go run "d:\code-base\gomod\gott\main.go"
Checking waiting room: 0
Sleeping Barber - <nil>
Customer 32010 checks Sleeping barber | room: 0, w 0 - customer: <nil>
Woken by 32010
Cutting 32010 hair
Customer 32020 checks Cutting barber | room: 0, w 0 - customer: 32010
Customer 4c1f0 checks Cutting barber | room: 1, w 0 - customer: 32010
Customer 32040 checks Cutting barber | room: 2, w 0 - customer: 32010
Customer 32050 checks Cutting barber | room: 3, w 0 - customer: 32010
Customer 32060 checks Cutting barber | room: 4, w 0 - customer: 32010
Customer 4c220 checks Cutting barber | room: 5, w 0 - customer: 32010
Customer 4c230 checks Cutting barber | room: 5, w 0 - customer: 32010
Customer e2000 checks Cutting barber | room: 5, w 0 - customer: 32010
Customer c0060 checks Cutting barber | room: 5, w 0 - customer: 32010
Checking waiting room: 5
Cutting 32020 hair
Checking waiting room: 4
Cutting 4c1f0 hair
Checking waiting room: 3
Cutting 32040 hair
Checking waiting room: 2
Cutting 32050 hair
Checking waiting room: 1
Cutting 32060 hair
Checking waiting room: 0
No more customers for the day
D:\code-base\gomod\gott>cls
小结
前者基于结构体的方法,后者直接用函数作为gortouine。二者在访问共用逻辑时注意加锁,多个goroutine运行通过chan来协作。
使用超时避免读堵塞,使用缓冲避免写堵塞,通道就可以在传递数据的同时,控制groutine的运行。有点像事件驱动,也有点像阻塞队列。
本作品采用《CC 协议》,转载必须注明作者和本文链接
你的头像好引人注目,总看到你。 :+1: