Golang 并发编程

CSP(Communicating Sequential Processes) 通信顺序进程。用于描述并发通信中的模式。golang原生的chan通道可阻塞,可锁可队列性。它的目的在于,保证并发执行的可控,即让并发的无序执行在某些条件下具备有序可控。

Goroutine本身是异步执行的代码块,它的执行顺序与其书写分配 任务顺序无关,与运行时通道,调度策略相关。

生产者 VS 消费者

通道阻塞与否事关后续代码的执行时机。通常同一个函数或goroutine内代码块是同步顺序执行的。可一旦内有通道阻塞,则意味着当前阻塞点后续代码的执行会在其他goroutine某段代码之后释放(即依赖于另一个对应通道读写),换而言之,会发生执行上下文切换,而并非上一行阻塞点执行完毕,立刻执行下一行。表现出来的样子,有小子在阻塞点位前插队。

细思极恐,若不停的切换上下文,则存在多个goroutine中有代码段被阻塞的通道切割成了多个小片段,每次切换实际上只是执行区间小片段,然后再进入其它的Groutine。这有点更像goto,执行一段跳到另外一段,未来某时又跳回来执行后续代码。

chan 本质是有序,当它处于阻塞时,是指该代码点之后的代码执行,需要等待运行时触发其状态变更为非阻塞时方可运行。下述布尔变量b在,done通道读取之后时。其打印结果完全不同,决定了其布尔值是输出头或尾。

package main

import (
    "flag"
    "fmt"
    "log"
    "os"
    "runtime"
    "runtime/pprof"
)

type Consumer struct {
    msgs *chan int
}


func NewConsumer(msgs *chan int) *Consumer {
    return &Consumer{msgs: msgs}
}

// consumer 不停地读取通道
func (c *Consumer) consume(){
    fmt.Println("consume: Started")
    for {
        msg := <-*c.msgs
        fmt.Println("consume: Received:", msg)
    }
}

// Producer 结构体
type Producer struct {
    msgs *chan int
    done *chan bool
}

func NewProducer(msgs *chan int, done *chan bool)*Producer {
    return &Producer{msgs: msgs, done:done}
}

// 生产者方法
func (p *Producer)produce(max int){
    fmt.Println("produce: Started")
    for i := 0; i < max; i++ {
        fmt.Println("produce:Sending ", i)
        *p.msgs <-i
    }
    // 阻塞在外部得到信号退出 
    *p.done <- true
    fmt.Println("produce: Done")
}


func main() {
    cpuprofile := flag.String("cpuprofile", "", "write cpu profile to `file")    
    memprofile := flag.String("memprofile", "", "write memory profile to `file`")
    max := flag.Int("n", 5, "defines the number of messages")

    flag.Parse()

    runtime.GOMAXPROCS(runtime.NumCPU())

    if *cpuprofile != ""{
        f,err := os.Create(*cpuprofile)
        if err != nil{
            log.Fatal("could not create CPU profile: ", err)
        }
        if err := pprof.StartCPUProfile(f);err!=nil{
            log.Fatal("could not start CPU profile: ", err)
        }
        defer pprof.StopCPUProfile()
    }

    var msgs = make(chan int)
    var done = make(chan bool)

    // 使用结构体方法作为goroutine, 同一结构体资源使用必使用锁(即chan),即使访问具有有效性
    go NewProducer(&msgs,&done).produce(*max)
    go NewConsumer(&msgs).consume()

    var b bool
    fmt.Println(b)
    <-done

    if *memprofile != ""{
        f,err := os.Create(*memprofile)
        if err!= nil{
            log.Fatal("could not create memory profile: ", err)
        }
        runtime.GC()
        if err := pprof.WriteHeapProfile(f);err!=nil{
            log.Fatal("could not write memory profile: ", err)
        }
        f.Close()
    }
}

效果


D:\code-base\gomod\gott>go run "d:\code-base\gomod\gott\main.go"
false
consume: Started
produce: Started
produce:Sending  0
produce:Sending  1
consume: Received: 0
consume: Received: 1
produce:Sending  2
produce:Sending  3
consume: Received: 2
consume: Received: 3
produce:Sending  4
produce: Done

D:\code-base\gomod\gott>

死循环中的通道

之所以对通道说这么多,是想讲明不像通常所见,从上到下书写执行。而是强调在goroutine中阻塞,会迅速切换上下文,并不会陷入consume方法看起来死循环(若无通道又非goroutine,该段代码是铁定在理论上会耗尽cpu计算资源)。另外补充一下,每个goroutine的时间片是有时限。通道在某种意义上协调多个goroutine之间的执行

从另一方面也说明并非读写通道就会切换执行上下文,比如在无缓冲写通道不会阻塞后续代码,有多次写入未满缓冲通道亦不会阻塞。所以无论是有缓冲通道,还是无缓冲通道,对于goroutine而言他上下文切换,通道阻塞是其触发条件之一。

理发师问题

理发店在理发室内有一张理发椅,且在待理室有若干个供客户等候休息的凳子。当完成理发,理发师不再伺候,直接进待理室看看是否还有其他待理发的人,若无则返回理发室睡觉。每当有客人进店,他会先瞅瞅理发师在作什么。若在睡则叫他起来干活,若在理发则在待理室等候,如果没有凳子可供休息,则会选择直接离开。

package main

import (
    "fmt"
    "sync"
    "time"
)

const(
    sleeping = iota
    checking
    cutting
)

var stateLog = map[int]string{
    0:"Sleeping",
    1:"Checking",
    2:"Cutting",
}

var wg *sync.WaitGroup // 潜在的客户数

type Barber struct {
    name string
    sync.Mutex
    state int // Sleeping/Checking/Cutting
    customer *Customer
}

type Customer struct {
    name string
}

// 获取当前客户地址
func (c *Customer)String() string{
    return fmt.Sprintf("%p", c)[7:]
}

func NewBarber()(b *Barber){
    return &Barber{
        name: "Sam",
        state:sleeping,
    }
}

// 理发师 goroutine
// 核查客户
// 理发师睡觉 等待唤醒
func barber(b *Barber, wr chan *Customer,wakers chan *Customer){
    for {
        b.Lock()
        defer b.Unlock()
        b.state = checking
        b.customer=nil

        // 检查待理室客户
        fmt.Printf("Checking waiting room: %d\n", len(wr))
        // 模拟查询时阻塞
        time.Sleep(time.Microsecond*100)

        // 待理室分配任务
        select{
        case c := <-wr:
            // 1.主动理发
            // 理发
            HairCut(c,b)
            // 解锁
            b.Unlock()
        default:
            // 2. 唤醒理发
            // 待理室人员为空,打印理发师名
            fmt.Printf("Sleeping Barber - %s \n", b.customer)
            // 重置理发状态
            b.state = sleeping
            b.customer = nil
            b.Unlock()
            // 唤醒者客户出现
            c := <-wakers
            // 理发时加锁
            b.Lock()
            fmt.Printf("Woken by %s\n", c)
            HairCut(c,b)
            b.Unlock()
        }
    }
}

// 理发逻辑,此段代码多个goroutine共用,注意加解锁
func HairCut(c *Customer, b *Barber){
    b.state = cutting
    b.customer = c
    b.Unlock()

    fmt.Printf("Cutting  %s hair\n", c)
    // 模拟理发时间
    time.Sleep(time.Millisecond * 100)
    b.Lock()
    wg.Done()
    // 用户走人
    b.customer = nil
}

// 客户 goroutine
// 若待理室已满,则理发失败,否则进来的都是客户
func customer(c *Customer, b *Barber, wr chan<- *Customer, wakers chan<- *Customer){
    // 客户进来 
    time.Sleep(time.Microsecond*50)
    // 理发上锁
    b.Lock()
    // 当前用户,理发师状态,待理室用户数,具备唤醒权的用户娄数,唤醒人
    fmt.Printf(" Customer %s checks %s barber | room: %d, w %d - customer: %s\n", c, stateLog[b.state],len(wr),len(wakers),b.customer)
    switch b.state {
    // 理发师在睡觉
    case sleeping:
        select {
        // 客户成为待唤醒者
        case wakers <- c:
        default:
            // 否则先成为待理室成员
            select{
            case wr <-c:
            default:
                // 待理室没人,则默认理发师完成工作
                wg.Done()
            }
        }
    case cutting:
        select{
            // 待理室有人
        case wr <-c:
        default:
            // 待理室已满坐不下了,默认执行此多余人离开理发店
            wg.Done()
        }
    case checking:
        // 抛出 客户goroutine 不应核验发师,应由理发师核检待理室的客户
        panic("Customer shouldn't check for the Barber when Barber is Checking the waiting room")
    }
    b.Unlock()
}

func main() {
    b := NewBarber()
    b.name = "Pardon"
    // 模拟待理室有5把椅子
    WaitingRoom := make(chan *Customer, 5)
    // 存在一位唤醒者
    Wakers := make(chan *Customer,1)

    // 理发师处理
    go barber(b, WaitingRoom, Wakers)

    time.Sleep(time.Microsecond*100)
    wg = new(sync.WaitGroup)
    n := 10
    // 增加10个计数点
    wg.Add(10)

    // 生成10个客户
    for i := 0; i < n; i++ {
        time.Sleep(time.Microsecond*50)
        c := new(Customer)
        // 分配十个消费goroutine
        go customer(c, b, WaitingRoom, Wakers)
    }

    // 阻塞主线程
    wg.Wait()
    fmt.Println("No more customers for the day")
}

效果

D:\code-base\gomod\gott>go run "d:\code-base\gomod\gott\main.go"
Checking waiting room: 0
Sleeping Barber - <nil>
 Customer 32010 checks Sleeping barber | room: 0, w 0 - customer: <nil>
Woken by 32010
Cutting  32010 hair
 Customer 32020 checks Cutting barber | room: 0, w 0 - customer: 32010
 Customer 4c1f0 checks Cutting barber | room: 1, w 0 - customer: 32010
 Customer 32040 checks Cutting barber | room: 2, w 0 - customer: 32010
 Customer 32050 checks Cutting barber | room: 3, w 0 - customer: 32010
 Customer 32060 checks Cutting barber | room: 4, w 0 - customer: 32010
 Customer 4c220 checks Cutting barber | room: 5, w 0 - customer: 32010
 Customer 4c230 checks Cutting barber | room: 5, w 0 - customer: 32010
 Customer e2000 checks Cutting barber | room: 5, w 0 - customer: 32010
 Customer c0060 checks Cutting barber | room: 5, w 0 - customer: 32010
Checking waiting room: 5
Cutting  32020 hair
Checking waiting room: 4
Cutting  4c1f0 hair
Checking waiting room: 3
Cutting  32040 hair
Checking waiting room: 2
Cutting  32050 hair
Checking waiting room: 1
Cutting  32060 hair
Checking waiting room: 0
No more customers for the day

D:\code-base\gomod\gott>cls

小结

前者基于结构体的方法,后者直接用函数作为gortouine。二者在访问共用逻辑时注意加锁,多个goroutine运行通过chan来协作。
使用超时避免读堵塞,使用缓冲避免写堵塞,通道就可以在传递数据的同时,控制groutine的运行。有点像事件驱动,也有点像阻塞队列。

本作品采用《CC 协议》,转载必须注明作者和本文链接
pardon110
讨论数量: 1

你的头像好引人注目,总看到你。 :+1:

3天前 评论

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!