5. 并发神奇——协程 |《 刻意学习 Golang 》

先简单梳理一下,下面要讲的内容吧,看完 Go 的并发之后 脑子里就一个东西 『协程』。进程下面有线程、线程下面有协程。准备找个时间翻译几篇官方博客讲 Go 并发内容的,继续加深理解。

PHP 中没有协程,Swoole 中实现了协程,所以 Swoole 在并发上面显得如此优异。

内容简介

  • 协程
  • 协程之间的通信 channels 双向管道
  • 缓存管道
  • 使用 range 跟 close 来遍历缓存管道
  • 多个 channels 同时使用时:select
  • 处理超时

goroutine 『协程』

goroutine 是 Go 并行设计的核心。goroutine 说到底其实就是协程,但是它比线程更小,十几个 goroutine 可能体现在底层就是五六个线程,Go 语言内部帮你实现了这些 goroutine 之间的内存共享。执行 goroutine 只需极少的栈内存(大概是 4~5 KB),当然会根据相应的数据伸缩。也正因为如此,可同时运行成千上万个并发任务。goroutine 比 thread 更易用、更高效、更轻便。

goroutine 是通过 Go 的 runtime 管理的一个线程管理器。goroutine 通过 go 关键字实现了,其实就是一个普通的函数。

chanels 双向管道通信

goroutine 运行再相同的地址空间,访问共享内存必须做好同步,Go 提供了一种机制,用于协程之间进行数据通信『channels』这个类似于 Unix shell 中的双向管道。

  • 由 make 创建
  • 创建时需要指定类型
  • 可以接收、发送指定类型数据
  • 使用 <- 操作符进行接收、发送数据
  • channels 接收和发送数据都是阻塞的

下面一一解释几点

创建

ci := make(chan int)
cs := make(chan string)
cf := make(chan interface{})

<- 接收发送数据 (下面分别是两个协程:一个发送数据一个接收数据)

// goroutine 1
ch <- v  // 将 v 传给 ch
// goroutine 2
x <- ch  // 从 ch 接收 v

实际使用(下面通过两个协程来并行计算 sum 然后将数据通过 channels 传回)

func sum(a []int, ch chan int) {
    sum := 0
    for _, v := range a {
        sum += v
    }
    ch <- sum  // 将 sum 传回,可以理解为 push 到 ch 通道
}
int main() {
    a := []int{7, 2, 8, -9, 4, 0}

    ch := make(chan int)
    go sum(a[:len(a)/2], ch) // 开启一个协程 并用 ch 作为通信通道
    go sum(a[len(a/2):], ch)

    x, y := <-ch, <-ch // 接收 ch 通道中的值
    fmt.Println(x, y, x + y)
}

所谓阻塞就是需要等待某些事情完成之后才会继续执行下去否则就处于等待状态。换句话说就是:

如果 通道 ch 中没有数据那么读取数据的协程就阻塞起来等待直到有数据push入,反之如果push数据时候 ch 中还有数据,也将阻塞起来等待数据被取出再继续执行。也就是 ch 中只能存在一个数据

我们改一改上面的代码你就理解了:

// Sleep 需要 time 包
func sum(a []int, ch chan int) {
    sum := 0
    for _, v := range a {
        sum += v
    }
    fmt.Println("我将 10s 后计算完成再将数据 push 到通道 ch")
    time.Sleep(time.Second * 10)
    ch <- sum
}
int main() {
    a := []int{7, 2, 8, -9, 4, 0}

    ch := make(chan int)
    go sum(a[:len(a)/2], ch) // 开启一个协程 并用 ch 作为通信通道
    go sum(a[len(a)/2:], ch)

    fmt.Println("ch 中没有数据我将阻塞起来等数据")
    x, y := <-ch, <-ch // 接收 ch 通道中的值
    fmt.Println(x, y, x + y)
}

上面程序输出

ch 没有数据我阻塞起来等数据
我将 10s 后计算完成将数据push到通道
我将 10s 后计算完成将数据push到通道
// 这里等待 10s 因为两个协程并发执行所以只需要等 10s
17 -5 12

我想你应该明白了有关 channels 上面的五点。上面是无缓存的(只能存一个数据),下面介绍有缓存的 channels(可以存多个数据)

Buffered Channels

Go 也允许指定 channel 的缓冲大小(指定 channels 中同时存在的数据个数)创建时候指定缓存长度即可

ch := make(chan type, value) // value 为缓存长度
  • value = 0 时,channel 是无缓冲阻塞读写的
  • value > 0 时,channel 有缓冲、非阻塞的,直到写满 value 个元素才阻塞写入。

我们看一下下面这个例子,你可以在自己本机测试一下,修改相应的 value 值

Range 与 Close

当通道可以存两个或者以上的值时候,我们通过 x <- ch y <- ch ... 这就显得很不明智了,go 提供了 range 跟 close ,让我们像遍历数组一样来获取管道中的数据。

func fibonacci(n int, c chan int) {
    x, y := 1, 1
    for i := 0; i < n; i++ {
        c <- x
        x, y = y, x + y
    }
    close(c)
}

func main() {
    c := make(chan int, 10)
    go fibonacci(cap(c), c)
    for i := range c {
        fmt.Println(i)
    }
}

记住应该在生产者的地方关闭 channel,而不是消费的地方去关闭它,这样容易引起 panic

另外一点的就是 channel 不像文件之类的,不需要经常去关闭,只有当你确实没有任何发送数据了,或者你想显式的结束 range 循环之类的

Select

select 的出现是为了解决当我们有多个 channels 时,选择使用那个 channels,画张图更容易理解

超时

出现阻塞了,我们不能让其无休止的等待下去,所以就有了超时。
请看代码

func main () {
    ch := make(chan int)
    ch1 := make(chan int)
    o := make(chan bool)
    go func (){
        for {
            select { // ch/ch1 通道都没有数据的时候 select 会阻塞起来等待数据,两个同时有数据时候随机选一个
            case v := <- ch:
                fmt.Println("v:", v)
            case a := <- ch1:
                fmt.Println("a:", a)
            case <- time.After(2*time.Second):  // 如果前面的所有通道都阻塞了2s就执行这里
                fmt.Println("time out")
                o <- true
                break
            }
        }
    }()
    // 模拟向通道写数据
    go func () {
        for i := 0; i < 5; i++ {
            ch <- i + 1
        }
    }()

    go func () {
        for i := 0; i < 5; i++ {
            ch1 <- i + 1
        }
    }()
    // 会阻塞起来等待 o 中的数据
    <- o
}

runtime goroutine

runtime 包中有几个处理 goroutine 的函数:

  • Goexit

    退出当前执行的 goroutine,但是 defer 函数还会继续调用

  • Gosched

    让出当前 goroutine 的执行权限,调度器安排其他等待的任务运行,并在下次某个时候从该位置恢复执行。

  • NumCPU

    返回 CPU 核数量

  • NumGoroutine

    返回正在执行和排队的任务总数

  • GOMAXPROCS

    用来设置可以并行计算的 CPU 核数的最大值,并返回之前的值。

高度自律,深度思考,以勤补拙

讨论数量: 4
Ali

代码多处有错误:

sum 未看到定义
go sum(a[len(a/2):], ch)  这种写法不对吧   len(a/2) 括号 扩错位置了
ch <- v  这个 v 在 for 循环外面能使用吗?
5个月前 评论
Ali

简单的关闭通道:

// 开始发送
func startWriting(ch chan int) {
    go func() {
        for i := 0; i < 5; i++ {
            ch <- i
        }
        close(ch) // 发送完毕,关闭通道
    }()
}

// 开始接收
func startReading(ch chan int) {
    go func() {
        for {
            // 如果通道数据接收完毕,执行break
            if data, ok := <-ch; ok {
                fmt.Println(data)
            } else {
                break
            }
        }
    }()
}

// 测试关闭 channel
func TestCloseChannel1(t *testing.T) {
    ch := make(chan int)
    startWriting(ch)
    startReading(ch)
}

//输出结果如下:
0
1
2
3
4
5个月前 评论

@Ali 都用的上面的 sum,为了节约篇幅就省去了,感谢指正,这里确实两处拼写错误,已经 Fix. 再次感谢指正哈。

第一篇源码分析就快要出来了~

5个月前 评论
Ali

@jerrkill 期待源码分析.

5个月前 评论

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!