Goroutine 和 Channel 的的使用和一些坑以及案例分析

简单认识一下 Go 的并发模型#

简单聊一下并发模型,下一篇会单独全篇聊聊多种并发模型,以及其演进过程。

硬件发展越来越快,多核 cpu 正是盛行,为了提高 cpu 的利用率,编程语言开发者们也是各显神通,Java 的多线程,nodejs 的多进程,golang 的协程等,我想大家在平时开发中都应该在各自公司的监控平台上看到 cpu 利用率低到 5% 左右,内存利用率经常 80% 左右。

软件运行的最小单位是进程,当一个软件或者应用程序启动时我们知道操作系统为其创建了一个进程;代码运行的最小单位是线程,我们平时编程时写的代码片段在程序跑起来后一定是在一个线程中运行的,而这个线程是属于这个进程创建的。

我们经常接触到的并发模型是多线程并发模型,而 Go 语言中的并发模型是 CSP 并发模型,这里简单介绍一这两种并发模型

  1. 多线程并发模型

多线程并发模型是在一个应用程序中同时存在多个执行流,这多个执行流通过内存共享,信号量,锁等方式进行通信,CPU 在多个线程间进行上下文切换,从而达到并发执行,提高 CPU 利用率,其本质是内核态线程和用户态线程是一对一的关系

  1. CSP 并发模型

CSP 并发模型的意思将程序的执行和通信划分开来(Process 和 Channel),Process 代表了执行任务的一个单元,Channel 用来在多个单元之间进行数据交互,共享;Process 内部之间没有并发问题,所有由通信带来的并发问题都被压缩在 Channel 中,使得聚合在一起,得到了约束,同步,竞争聚焦在 Channel 上,Go 就是基于这种并发模型的,Go 在线程的基础上实现了这一套并发模型(MPG),线程之上虚拟出了协程的概念,一个协程代表一个 Process, 但在操作系统级别调度的基本单位依然是线程,只是 Go 自己实现了一个调度器,用来管理协程的调度,M(Machine)代表一个内核线程,P(Process)代表一个调度器,G (Goroutine) 代表一个协程,其本质是内核线程和用户态线程成了多对多的关系

Goroutine 和 Channel 的使用#

如下代码运行起来,Go 的主协程就启动起来了

package main

func main(){
    fmt.Println("主协程启动")
}

如何通过代码启动一个新的协程呢,通过 go 关键字启动一个新的协程,主协程启动后,等待新的协程启动执行

package main

func main(){
    var wg sync.WaitGroup
    wg.Add(1)
    go func(){
        defer wg.Done()
        fmt.Println("新的协程启动")
    }()
    fmt.Println("主协程启动")
    //等待新的协程运行完毕,程序才退出
    wg.Wait()
}

channel 一些介绍

//通道分为两类: 
//无缓冲区的通道
c := make(chan int)
//有缓冲区的通道
c := make(chan int,10)
//通道的操作
//往通道写入数据
c <- 1
//从通道读取数据,
//temp是读取到的值
//ok是返回此通道是否已被关闭
temp,ok := <- c
//关闭通道
close(c)
//遍历通道
for v :=  range c{
}

两个协程之间如何通信呢?,那就是通过 channel 通道来实现,channel 创建时可以指定是否带有缓冲区,如果不带缓冲区,那么当一个协程往通道中写入一个数据的时候,另一个协程必须读取,否则第一个协程就只能出去阻塞状态(也就是生产一个,消费一个),带有缓冲区的 channel 就理解为一个队列或者仓库,可以一下子生产很多个先暂存起来,慢慢消费。

package main

func main(){
    var wg sync.WaitGroup
    wg.Add(1)
    //不带缓冲区的channel
    c := make(chan string)
    go func(){        
        defer func(){                
            wg.Done()              
        }()        
        for{                
            //从通道中取出数据                
            temp := <- c               
            if temp == "写入数据3" {                        
                break               
            }        
        }
    }()
    //主协程循环往通道写入值
    for i:=1;i<4;i++{        
        c <- "写入数据"+strconv.Itoa(i)
    }
    //等待新的协程运行完毕,程序才退出
    wg.Wait()
}
//最终程序执行结果
/**
写入数据1
写入数据2
写入数据3
*/

我们再来看一个用 Goroutine 和 Channel 实现的生产者消费者例子

/**生产者消费者的例子*/
func ProductAndConsumer() {        
    wg := sync.WaitGroup{}        
    wg.Add(1)        
    //带有缓冲区的通道
    cint := make(chan int, 10)        
    go func() {                
        //product  ,循环往通道中写入一个元素              
        for i := 0; i < 100; i++ {                       
            cint <- i                        
        }        
        //关闭通道
        close(cint)        
     }()        
    go func() {                
        defer wg.Done()                
        //consumer   遍历通道消费元素并打印        
        for temp := range cint {                        
            fmt.Println(temp) 
            //len函数可以查看当前通道元素个数
            fmt.Println("当前通道元素个数",len(cint))
        }        
    }()        
    wg.Wait()
}

使用中的一些坑#

向一个已关闭的 channel 写入数据会报错,但从一个已关闭的 channel 读取数据不会报错

package main

func main(){
    c := make(chan int,10)
    close(c)
    c <- 1
}
//结果如下
panic: send on closed channel

主程序在读取一个没有生产者的 channel 时会被判断为死锁,如果是在新开的协程中是没有问题的,同理主程序在往没有消费者的协程中写入数据时也会发生死锁

package main

func main(){
    c := make(chan int,10)
    //从一个永远都不可能有值的通道中读取数据,会发生死锁,因为会阻塞主程序的执行
    <- c
}
func main(){
    c := make(chan int,10)
    //主程序往一个没有消费者的通道中写入数据时会发生死锁, 因为会阻塞主程序的执行
    c <- 1
}
//结果如下
fatal error: all goroutines are asleep - deadlock!

当通道被两个协程操作时,如果一方因为阻塞导致另一放阻塞则会发生死锁,如下代码创建两个通道,开启两个协程 (主协程和子协程),主协程从 c2 读取数据,子协程往 c1,c2 写入数据,因为 c1,c2 都是无缓冲通道,所以往 c1 写时会阻塞,从 c2 读取时也会会阻塞,从而发生死锁

package main

func main(){
    c1 := make(chan int)
    c2 := make(chan int)
    go func(){
        c1 <- 1
        c2 <- 2
    }()
    <- c2
}
//结果
fatal error: all goroutines are asleep - deadlock!

通道死锁的一些注意事项,其实上面的死锁情况主要分为如下两种

  1. 不要往一个已经关闭的 channel 写入数据
  2. 不要通过 channel 阻塞主协程

一些经典案例看看 Gorouting 和 Chanel 的魅力#

先说说 Go 中 select 的概念,一个 select 语句用来选择哪个 case 中的发送或接收操作可以被立即执行。它类似于 switch 语句,但是它的 case 涉及到 channel 有关的 I/O 操作,或者换一种说法,select 就是用来监听和 channel 有关的 IO 操作,当 IO 操作发生时,触发相应的动作,基本用法如下:


//select基本用法
select {
    case <- c1:
    // 如果c1成功读到数据,则进行该case处理语句
    case c2 <- 1:
    // 如果成功向c2写入数据,则进行该case处理语句
    default:
    // 如果上面都没有成功,则进入default处理流程
}

案例一,多个不依赖的服务可以并发执行

package main

func queryUserById(id int)chan string{
    c := make(chan string)
    go func(){
        c <- "姓名"+strconv.Itoa(id)
    }()
    return c
}

func main(){
    //三个协程同时并发查询,缩小执行时间,
    //本来一次查询需要1秒,顺序执行就得3秒,
    //现在并发执行总共1秒就执行完成
    name1 := queryUserById(1)
    name2 := queryUserById(2)
    name3 := queryUserById(3)
    //从通道中获取执行结果
    <- name1
    <- name2
    <- name3
}

案例二:select 监听通道合并多个通道的值到一个通道


package main

func queryUserById(id int)chan string{
    c := make(chan string)
    go func(){
        c <- "姓名"+strconv.Itoa(id)
    }()
    return c
}

func main(){    
    c1, c2, c3 := queryUserById(1), queryUserById(2), queryUserById(3)
    c := make(chan string)
    // 开一个goroutine监视各个信道数据输出并收集数据到信道c
    go func() { 
        for {
            // 监视c1, c2, c3的流出,并全部流入信道c
            select {
               case       
                   v1 := <- c1:        
                   c <- v1
               case       
                   v2 := <- c2:        
                   c <- v2
               case       
                   v3 := <- c3:       
                   c <- v3
            }
        }
    }()
    // 阻塞主线,取出信道c的数据
    for i := 0; i < 3; i++ {
         // 从打印来看我们的数据输出并不是严格的顺序
        fmt.Println(<-c) 
    }
}

案例三:结束标志

func main() {

    c, quit := make(chan int), make(chan int)
    go func() {
        c <- 2  // 添加数据
        quit <- 1 // 发送完成信号
    } ()
    for is_quit := false; !is_quit; {
        // 监视信道c的数据流出
        select { 
            case v := <-c: fmt.Printf("received %d from c", v)
            case <-quit: is_quit = true 
            // quit信道有输出,关闭for循环
        }
    }}

Golang

本作品采用《CC 协议》,转载必须注明作者和本文链接
那小子阿伟