《Go 语言程序设计》读书笔记 (五) 协程与通道

Goroutines#

  • 在 Go 语言中,每一个并发的执行单元叫作 goroutine。设想一个程序中有两个函数,假设两个函数没有相互之间的调用关系。一个线性的程序会先调用其中的一个函数,然后再调用另一个。如果程序中包含多个 goroutine,对两个函数的调用则可能发生在同一时刻。
  • 当一个程序启动时,其 main 函数即在一个单独的 goroutine 中运行,我们叫它 main goroutine。新的 goroutine 会用 go 语句来创建。在语法上,go 语句是在一个普通的函数或方法调用前加上关键字 go。go 语句会使其语句中的函数在一个新创建的 goroutine 中运行。而 go 语句本身会迅速地完成。
f()    // call f(); wait for it to return
go f() // create a new goroutine that calls f(); don't wait
  • 主函数返回时,所有的 goroutine 都会被直接打断,程序退出。除了从主函数退出或者直接终止程序之外,没有其它的编程方法能够让一个 goroutine 来打断另一个的执行,但是之后可以看到一种方式来实现这个目的,通过 goroutine 之间的通信来让一个 goroutine 请求其它的 goroutine,并被请求的 goroutine 自行结束执行。

Channel#

  • 如果说 goroutine 是 Go 语言程序的并发体的话,那么 channels 它们之间的通信机制。它可以让一个 goroutine 通过它给另一个 goroutine 发送值信息。每个 channel 都有一个特殊的类型,也就是 channel 可发送数据的类型。一个可以发送 int 类型数据的 channel 一般写为 chan int。
  • 使用内置的 make 函数,我们可以创建一个 channel:
ch := make(chan int)
  • 和 map 类似,channel 也一个对 make 函数创建的底层数据结构的引用。当我们复制一个 channel 或把 channel 用于函数参数传递时,我们只是拷贝了一个 channel 引用,因此调用者和被调用者将引用同一个 channel 对象。和其它的引用类型一样,channel 的零值也是 nil。

  • channel 有发送和接收两个主要操作,都是通信行为。一个发送语句将一个值从一个 goroutine 通过 channel 发送到另一个执行接收操作的 goroutine。发送和接收两个操作都是用 <- 运算符。在发送语句中,<- 运算符分割 channel 和要发送的值。在接收语句中,<- 运算符写在 channel 对象之前。一个不使用接收结果的接收操作也是合法的。

ch <- x  // 发送消息
x = <-ch // 从 channel 中接收消息
<-ch     // 从 channel 接收并丢弃消息
  • Channel 还支持 close 操作,用于关闭 channel,随后对基于该 channel 的任何发送操作都将导致 panic 异常。对一个已经被 close 过的 channel 执行接收操作依然可以接收到之前已经成功发送的数据;如果 channel 中已经没有数据的话将产生一个零值的数据。

    使用内置的 close 函数就可以关闭一个 channel:

  close(ch)

以最简单方式调用 make 函数创建的时一个无缓冲的 channel,但是我们也可以指定第二个整形参数,对应 channel 的容量。如果 channel 的容量大于零,那么该 channel 就是带缓冲的 channel。

  ch = make(chan int)    // unbuffered channel
  ch = make(chan int, 0) // unbuffered channel
  ch = make(chan int, 3) // buffered channel with capacity 3

无缓冲 channel#

  • 一个基于无缓冲 Channel 的发送操作将导致发送者 goroutine 阻塞,直到另一个 goroutine 在相同的 Channel 上执行接收操作,当发送的值通过 Channel 成功传输之后,两个 goroutine 可以继续执行后面的语句。反之,如果接收操作先发生,那么接收者 goroutine 也将阻塞,直到有另一个 goroutine 在相同的 Channel 上执行发送操作。

  • 下面的程序在 main 函数的 goroutine 中将标准输入复制到 server,因此当客户端程序关闭标准输入时,后台 goroutine 可能依然在工作。我们需要让主 goroutine 等待后台 goroutine 完成工作后再退出,我们使用了一个 channel 来同步两个 goroutine,在后台 goroutine 返回之前,它先打印一个日志信息,然后向 done 对应的 channel 发送一个值。主 goroutine 在退出前先等待从 done 对应的 channel 接收一个值。因此,总是可以在程序退出前正确输出 “done” 消息。

func main() {
    conn, err := net.Dial("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    done := make(chan struct{})
    go func() {
        io.Copy(os.Stdout, conn) // NOTE: ignoring errors
        log.Println("done")
        done <- struct{}{} // signal the main goroutine
    }()
    mustCopy(conn, os.Stdin)
    conn.Close()
    <-done // wait for background goroutine to finish
}
  • 基于 channel 发送消息有两个重要方面。首先每个消息都有一个值,但是有时候通讯的事实和发生的时刻也同样重要。当我们更希望强调通讯发生的时刻时,我们将它称为消息事件。有些消息事件并不携带额外的信息,它仅仅是用作两个 goroutine 之间的同步,这时候我们可以用 struct{} 空结构体作为 channels 元素的类型,虽然也可以使用 bool 或 int 类型实现同样的功能,done <- 1 语句也比 done <- struct{}{} 更短。

  • 如果发送者知道,没有更多的值需要发送到 channel 的话,那么让接收者也能及时知道没有多余的值可接收将是有用的,因为接收者可以停止不必要的接收等待。这可以通过内置的 close 函数来关闭 channel 实现:

close(naturals)
  • 当一个 channel 被关闭后,再向该 channel 发送数据将导致 panic 异常。当一个被关闭的 channel 中已经发送的数据都被成功接收后,后续的接收操作将不再阻塞,它们会立即返回一个零值。
  • 接收 channel 语句中可以额外增加第二个值,标识 chnnel 是否已经关闭
x, ok := <-naturals
  • Go 语言的 range 循环可直接在 channels 上面迭代。使用 range 循环是上面处理模式的简洁语法,它依次从 channel 接收数据,当 channel 被关闭并且没有值可接收时跳出循环。

在下面的程序中,我们的计数器 goroutine 只生成 100 个含数字的序列,然后关闭 naturals 对应的 channel,这将导致计算平方数的 squarer 对应的 goroutine 可以正常终止循环并关闭 squares 对应的 channel。(在一个更复杂的程序中,可以通过 defer 语句关闭对应的 channel。)最后,主 goroutine 也可以正常终止循环并退出程序。

func main() {
    naturals := make(chan int)
    squares := make(chan int)

    // Counter
    go func() {
        for x := 0; x < 100; x++ {
            naturals <- x
        }
        close(naturals)
    }()

    // Squarer
    go func() {
        for x := range naturals {
            squares <- x * x
        }
        close(squares)
    }()

    // Printer (in main goroutine)
    for x := range squares {
        fmt.Println(x)
    }
}
  • 试图重复关闭一个 channel 将导致 panic 异常,试图关闭一个 nil 值的 channel 也将导致 panic 异常。关闭一个 channels 还会触发一个广播机制,我们将在后面讨论。

单方向的 channel#

  • 当一个 channel 作为一个函数参数是,它一般总是被专门用于只发送或者只接收。

    为了表明这种意图并防止被滥用,Go 语言的类型系统提供了单方向的 channel 类型,分别用于只发送或只接收的 channel。类型 chan<- int 表示一个只发送 int 的 channel,只能发送不能接收。相反,类型 <-chan int 表示一个只接收 int 的 channel,只能接收不能发送。(箭头 <- 和关键字 chan 的相对位置表明了 channel 的方向。)这种限制将在编译期检测。

  • 因为关闭操作只用于断言不再向 channel 发送新的数据,所以只有在发送者所在的 goroutine 才会调用 close 函数,因此对一个只接收的 channel 调用 close 将是一个编译错误。

这是改进的版本,这一次参数使用了单方向 channel 类型:

func counter(out chan<- int) {
    for x := 0; x < 100; x++ {
        out <- x
    }
    close(out)
}

func squarer(out chan<- int, in <-chan int) {
    for v := range in {
        out <- v * v
    }
    close(out)
}

func printer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}

func main() {
    naturals := make(chan int)
    squares := make(chan int)
    go counter(naturals)
    go squarer(squares, naturals)
    printer(squares)
}

调用 counter (naturals) 将导致将 chan int 类型的 naturals 隐式地转换为 chan<- int 类型只发送型的 channel。调用 printer (squares) 也会导致相似的隐式转换,这一次是转换为 <-chan int 类型只接收型的 channel。任何双向 channel 向单向 channel 变量的赋值操作都将导致该隐式转换。

带缓冲的 channel#

带缓存的 Channel 内部持有一个元素队列。队列的最大容量是在调用 make 函数创建 channel 时通过第二个参数指定的。下面的语句创建了一个可以持有三个字符串元素的带缓存 Channel。图 8.2 是 ch 变量对应的 channel 的图形表示形式。

ch = make(chan string, 3)

img

向缓存 Channel 的发送操作就是向内部缓存队列的尾部插入元素,接收操作则是从队列的头部删除元素。如果内部缓存队列是满的,那么发送操作将阻塞直到因另一个 goroutine 执行接收操作而释放了新的队列空间。相反,如果 channel 是空的,接收操作将阻塞直到有另一个 goroutine 执行发送操作而向队列插入元素。

我们可以在无阻塞的情况下连续向新创建的 channel 发送三个值:

ch <- "A"
ch <- "B"
ch <- "C"

此刻,channel 的内部缓存队列将是满的(图 8.3),如果有第四个发送操作将发生阻塞。

img

如果我们接收一个值,

fmt.Println(<-ch) // "A"

那么 channel 的缓存队列将不是满的也不是空的(图 8.4),因此对该 channel 执行的发送或接收操作都不会发送阻塞。通过这种方式,channel 的缓存队列解耦了接收和发送的 goroutine。

img

在某些特殊情况下,程序可能需要知道 channel 内部缓存的容量,可以用内置的 cap 函数获取:

fmt.Println(cap(ch)) // "3"

同样,对于内置的 len 函数,如果传入的是 channel,那么将返回 channel 内部缓存队列中有效元素的个数。因为在并发程序中该信息会随着接收操作而失效,但是它对某些故障诊断和性能优化会有帮助。

fmt.Println(len(ch)) // "2"

在继续执行两次接收操作后 channel 内部的缓存队列将又成为空的,如果有第四个接收操作将发生阻塞:

fmt.Println(<-ch) // "B"
fmt.Println(<-ch) // "C"

下面的例子展示了一个使用了带缓存 channel 的应用。它并发地向三个镜像站点发出请求,三个镜像站点分散在不同的地理位置。它们分别将收到的响应发送到带缓存 channel,最后接收者只接收第一个收到的响应,也就是最快的那个响应。因此 mirroredQuery 函数可能在另外两个响应慢的镜像站点响应之前就返回了结果。(顺便说一下,多个 goroutines 并发地向同一个 channel 发送数据,或从同一个 channel 接收数据都是常见的用法。)

func mirroredQuery() string {
    responses := make(chan string, 3)
    go func() { responses <- request("asia.gopl.io") }()
    go func() { responses <- request("europe.gopl.io") }()
    go func() { responses <- request("americas.gopl.io") }()
    return <-responses // return the quickest response
}

func request(hostname string) (response string) { /* ... */ }

如果我们使用了无缓存的 channel,那么两个慢的 goroutines 将会因为没有人接收而被永远卡住。这种情况,称为 goroutines 泄漏,这将是一个 BUG。和垃圾变量不同,泄漏的 goroutines 并不会被自动回收,因此确保每个不再需要的 goroutine 能正常退出是重要的。

关于无缓存或带缓存 channel 之间的选择,或者是带缓存 channel 的容量大小的选择,都可能影响程序的正确性。无缓存 channel 更强地保证了每个发送操作与相应的同步接收操作;但是对于带缓存 channel,这些操作是解耦的。同样,即使我们知道将要发送到一个 channel 的信息的数量上限,创建一个对应容量大小带缓存 channel 也是不现实的,因为这要求在执行任何接收操作之前缓存所有已经发送的值。如果未能分配足够的缓冲将导致程序死锁。

用带缓冲的 channel 控制并发数量#

此外对于 buffered channel,我们可以用一个有容量限制的 buffered channel 来控制并发,这类似于操作系统里的计数信号量概念。从概念上讲,channel 里的 n 个空槽代表 n 个可以处理内容的 token (通行证),从 channel 里接收一个值会释放其中的一个 token,并且生成一个新的空槽位。这样保证了在没有接收介入时最多有 n 个发送操作。(这里可能我们拿 channel 里填充的槽来做 token 更直观一些,不过还是这样吧~)。由于 channel 里的元素类型并不重要,我们用一个零值的 struct {} 来作为其元素。

下面的 crawl 函数,将对 links.Extract 的调用操作用获取、释放 token 的操作包裹起来,来确保同一时间对其只有 20 个调用。信号量数量和其能操作的 IO 资源数量应保持接近。

// goroutine获取token后,可以进行抓取操作,如果满20了
// 那么 goroutine 会等到有获取 token 后再去执行
var tokens = make(chan struct{}, 20)

func crawl(url string) []string {
    fmt.Println(url)
    tokens <- struct{}{} // 获取 token
    list, err := links.Extract(url)
    <-tokens // 释放 token
    if err != nil {
        log.Print(err)
    }
    return list
}

并发循环的一个典型示例#

在并发循环中为了知道最后一个 goroutine 什么时候结束 (最后一个结束并不一定是最后一个开始),我们需要一个递增的计数器,在每一个 goroutine 启动时加一,在 goroutine 退出时减一。这需要一种特殊的计数器,这个计数器需要在多个 goroutine 操作时做到安全并且提供在其减为零之前一直等待的一种方法。这种计数类型被称为 sync.WaitGroup,下面的代码就用到了这种方法:

// makeThumbnails6为从通道中接收到的每个文件创建缩略图。
// 返回每个创建的缩略图所占的自己数。
func makeThumbnails6(filenames <-chan string) int64 {
    sizes := make(chan int64)
    var wg sync.WaitGroup // number of working goroutines
    for f := range filenames {
        wg.Add(1)
        // worker
        go func(f string) {
            defer wg.Done()
            thumb, err := thumbnail.ImageFile(f)
            if err != nil {
                log.Println(err)
                return
            }
            info, _ := os.Stat(thumb) // OK to ignore error
            sizes <- info.Size()
        }(f)
    }

    // closer
    go func() {
        wg.Wait()
        close(sizes)
    }()

    var total int64
    for size := range sizes {
        total += size
    }
    return total
}

注意 Add 和 Done 方法的不对策。Add 是为计数器加一,必须在 worker goroutine 开始之前调用,而不是在 goroutine 中;否则的话我们没办法确定 Add 是在”closer” goroutine 调用 Wait 之前被调用。并且 Add 还有一个参数,但 Done 却没有任何参数;其实它和 Add (-1) 是等价的。我们使用 defer 来确保计数器即使是在出错的情况下依然能够正确地被减掉。上面的程序代码结构是当我们使用并发循环,但又不知道迭代次数时很通常而且很地道的写法。

select 多通道复用#

select 语句的一般形式,和 switch 语句稍微有点相似。也会有几个 case 和最后的 default 选择支。每一个 case 代表一个通信操作 (在某个 channel 上进行发送或者接收) 并且会包含一些语句组成的一个语句块。

select {
case <-ch1:
    // ...
case x := <-ch2:
    // ...use x...
case ch3 <- y:
    // ...
default:
    // ...
}

一个接收表达式可能只包含接收表达式自身 (译注:不把接收到的值赋值给变量什么的),就像上面的第一个 case,或者包含在一个简短的变量声明中,像第二个 case 里一样;第二种形式让你能够在当前 case 块中引用接收到的值。

select 会等待 case 中有能够执行的 case 时去执行。当条件满足时,select 才会去通信并执行 case 之后的语句;这时候其它通信是不会执行的,当没有 case 准备好时 select 会去执行 default 之后的语句,使用 default 分支可以避免 select 的阻塞。一个没有任何 case 的 select 语句写作 select {},会永远地等待下去。

下面这个例子更微秒。ch 这个 channel 的 buffer 大小是 1,所以会交替的为空或为满,所以只有一个 case 可以进行下去,无论 i 是奇数或者偶数,它都会打印 0 2 4 6 8。

ch := make(chan int, 1)
for i := 0; i < 10; i++ {
    select {
    case x := <-ch:
        fmt.Println(x) // "0" "2" "4" "6" "8"
    case ch <- i:
    }
}

如果多个 case 同时就绪时,select 会随机地选择一个执行,这样来保证每一个 channel 都有平等的被 select 的机会。增加上面例子的 buffer 大小会使其输出变得不确定,因为当 buffer 既不为满也不为空时,select 语句的执行情况就像是抛硬币的行为一样是随机的。

本作品采用《CC 协议》,转载必须注明作者和本文链接
公众号:网管叨 bi 叨 | Golang、Laravel、Docker、K8s 等学习经验分享
未填写
文章
113
粉丝
368
喜欢
487
收藏
317
排名:34
访问:20.4 万
私信
所有博文
社区赞助商