大话 goroutine

goroutine本质上是大号版的异步执行句柄,比之nodejs中的单线程事件循环处理器。之所以在使用goroutine,感觉不到异步,在于golang已经封装了各种异步io操作,运行时一旦发现异步io状态发生改变,则适时进行goroutine切换。让你基本上感觉不到像基于事件编程所带来的直观上的任务执行乱序。

启动 VS 执行

goroutine 这种由运行时控制,构建于线程之上,又比线程粒度小的操作,操作系统一点也不care。通常分配给线程内存会在8M左右,goroutine则只有2kb,一个MB一个KB完全在不同级别,单单性能上就提高很多。结论就是,在任务调度上,goroutine是弱于线程的,但是在资源消耗上,goroutine则是极低的。

异步编程为了追求程序的性能,强行的将线性的程序打乱,程序变得非常的混乱与复杂。对程序状态的管理也变得异常困难。接触网络编程的同学,很容易理解启动与执行完全是两个概念。

    sum()    // 同步执行普通函数,等待它执行完毕
go sum()   // 用go关键字启动一个goroutine立即返回,它会异步执行

换而言之,普通函数是即调即用,用go关键字修饰的函数只是启动立即返回,在当前线程空闲的时候(main函数是一个特殊的goroutine,一旦它没空,意味着你的gorotuine哪怕启动了,也是作不出什么妖来。)作异步执行。更直白点,你看到的goroutine是它启动的样子,它的异步执行由运行时所决定,至于groutine与goroutine执行顺序,用俗话讲他大爷还是他大爷,回到异步执行流乱序的常态。

创建goroutine

示例使用了select 来阻塞主线程,若使用sleep,则本身是不知道多长时间是恰到好处。
太长浪费资源,太短则有可能所请求的网站还没来得及响应,主线程就退出了,达不到请求目标网站的目的。

package main

import (
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
    "time"
)

func responseSize(url string) {
    fmt.Println("Step1: ", url)
    response, err := http.Get(url)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println("Step2: ", url)
    defer response.Body.Close()

    fmt.Println("Step3: ", url)
    body, err := ioutil.ReadAll(response.Body)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("Step4: ", len(body))
}

func main() {
        go responseSize("https://www.stackoverflow.com")
        go responseSize("https://www.shiyanlou.com")
        go responseSize("https://www.baidu.com")

        select {}
}

执行 go run main.go。查看4个阶段响应,字节长度信息,以三个响应速度不一,响应资源不等网站为例。每次print都有io切换(即goroutine在运行时指挥下自动切换)。很明显goroutine 的输出是无序的,与goroutine的启动顺序无关。

Step1:  https://www.baidu.com
Step1:  https://www.stackoverflow.com
Step1:  https://www.shiyanlou.com
Step2:  https://www.baidu.com
Step3:  https://www.baidu.com
Step4:  227
Step2:  https://www.shiyanlou.com
Step3:  https://www.shiyanlou.com
Step4:  196083
Step2:  https://www.stackoverflow.com
Step3:  https://www.stackoverflow.com
Step4:  116044

等待goroutine执行完毕

sync.WaitGroup 在此是同步阻塞主线程,等待一组goroutine执行完毕

Add 方法向 WaitGroup 实例设置默认计数,Done 减少一个,Wait 方法负责阻塞等待其它goroutine执行完毕。

...
func responseSize(url string) {  
    defer wg.Done()  
    ...
}

var wg sync.WaitGroup      
func  main() {       
    wg.Add(3)     
    ...
    wg.Wait()    
    fmt.Println("terminate Program")    
}  

从goroutine中取值

goroutine 间通常使用channel,类似于linux中的pipe管道,来实现通信。

 package main

 import (
     "fmt"
     "io/ioutil"
     "log"
     "net/http"
     "sync"
 )

 var wg sync.WaitGroup

 func responseSize(url string, nums chan<- int) {
     defer wg.Done()
     response, err := http.Get(url)
     if err != nil {
         log.Fatal(err)
     }
     defer response.Body.Close()
     body, err := ioutil.ReadAll(response.Body)
     if err != nil {
         log.Fatal(err)
     }
     nums <- len(body)      // 将值写入通道
 }

 func main() {
     nums := make(chan int)   // 声明通道
     wg.Add(3)
     go responseSize("https://www.stackoverflow.com", nums)
     fmt.Println(<-nums)    //  发送数据
     wg.Wait()    
     close(nums)   // 关闭通道

控制goroutine执行

使用通道可以控制goroutine的执行与暂停,通道方便goroutine间通信

package main

import (
    "fmt"
    "sync"
    "time"
)

var i int

func work() {
    time.Sleep(250 * time.Millisecond)
    i++
    fmt.Println(i)
}

func routine(command <-chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    var status = "Play"
    for {
        select {
        case cmd := <-command:
            fmt.Println(cmd)
            switch cmd {
            case "Stop":
                return
            case "Pause":
                status = "Pause"
            default:
                status = "Play"
            }
        default:
            if status == "Play" {
                work()
            }
        }
    }
}

func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    command := make(chan string)
    go routine(command, &wg)

    time.Sleep(1 * time.Second)
    command <- "Pause"

    time.Sleep(1 * time.Second)
    command <- "Play"

    time.Sleep(1 * time.Second)
    command <- "Stop"

    wg.Wait()
}

运行结果

1
2
3
4
Pause
Play
5
6
7
8
9
Stop

原子函数修复条件竞争

争用条件是由于对共享资源的不同步访问而引起。原子函数提供了用于同步访问整数和指针的底层锁定机制。同步包下的atomic中的函数通过锁定对共享资源的访问来提供支持同步goroutine的支持。

package main

import (
        "fmt"
        "runtime"
        "sync"
        "sync/atomic"
)

var (
        counter int32
        wg      sync.WaitGroup
)

func main() {
        wg.Add(3)

        go increment("python")
        go increment("java")
        go increment("Golang")

        wg.Wait()
        fmt.Println("Counter:", counter)
}

func increment(name string) {
        defer wg.Done()

        for range name {
                atomic.AddInt32(&counter, 1)
                runtime.Gosched()
        }
}

使用原子函数对值时,它会强制一次有且仅一个goroutine能够完成值修改操作。当别goroutine试图调用任何原子函数时,会自动同步所对应引用的变量。

mutex定义临界区

mutex通常被用来定义临界区,确保该区域的代码一次只能被一个goroutine访问执行。

package main

import (
        "fmt"
        "sync"
)

var (
        counter int32
        wg      sync.WaitGroup
        mutex   sync.Mutex
)

func main() {
        wg.Add(3)

        go increment("Python")
        go increment("Go Programming Language")
        go increment("Golang")

        wg.Wait()
        fmt.Println("Counter:", counter)
}

func increment(lang string) {
        defer wg.Done()

        for i := 0; i < 3; i++ {
                mutex.Lock()
                {
                        fmt.Println(lang)
                        counter++
                }
                mutex.Unlock()
        }
}

去掉锁,执行下面语句,分析数据竞争

go run -race main.go

goroutine并发

解决数据争用,需要确保一次请求时仅有一个goroutine在使用临界资源。用消息传递或用锁机制。
每个gorotuine在自己的领域内都是一个独立的个体,goroutine间的地位是平等的,没有谁比谁更高贵。

消息传递,你不用向我请求,我有了消息会通知你。goroutine的启动类似于到医院挂号,院方会在合适的时候给你看病(让goroutine运行)。golang的通道则典型用了这种消息传递机制。

用锁,简单粗暴。其道理好比,将goroutine间原先对临界资源的争用,转移到对锁的抢夺上。有锁就有权使用。存在的问题在于,如果抢锁的人多,系统发锁/收锁需要将通知到即将使用临界资源的所有人。成本太高,即便改进产生的变种读写锁,条件锁等,终归太重了。

二者的区别,在于临界资源的使用方。用锁是使用者主动出手,干预共享资源。用消息传递,一切听指挥,你要用事先向上级报备,上级视情况通知你去用。典型的走过去,与送过来的关系。有点控制反转的味道。

goroutine异步,乱序执行其意义在于最大程度压榨CPU性能。带来的问题,如同nodejs回调地狱般可恶,在调式编程不易理解。为了协同各个goroutine有效工作,需要一些同步手段。如其它语言并发编程中一般,go也搞出常规的原子函数,读写锁,WaitGroup,条件锁等同步,但这已经满足不了需求(太麻烦),golang在语言级别整出了通道channel。

Context

细细思之,goroutine本质上相当于半独立的行省,各行其是,会天下大乱。goroutine与groutine之间也存在从属关系(父子共存),竞争关系(谁行谁上),互斥关系(有你没我) 等。为了协调一组goroutine共同做大事,golang1.7之后将实验性质的 context 包归并了主库,其主要功能在于产生一组context上下文树,实现了一堆有关系的goroutine协同工作,比如A协程生了B,B又产生的C程。这种子子孙孙无穷尽也的趋势,你就无法使用waitgroup(因其一开始对要启动的goroutine数目不确定)。借鉴了linux父子进程,goroutine小号版的进程形式,context包提供了这种树形关系,比如父goroutine挂了之前,会先通知其子挂,同理其子也会通知到时孙辈....

不仅如此,context 还提供了超时,截止日期,让其从属goroutine在符合待定条件下,自行消亡,而不致于一直占着茅坑不拉屎,最后出现因瞬时请求量巨大,服务端积存了大量未死亡(默认超时时长比较大)的goroutine,而耗尽了资源,一不小心将你本来弱小的服务搞死的局面。

context当然也提供了一个value接口,用于存储值。类似于vue的数据流下发,避免频繁在多个goroutine间拷贝数据。其在web请求处理方面有积极意义,好比用过微信的token同学知道,其存在token作用域(token请求存在范围限制0,用这个接口你可以存储一些带作用域的请求标识,这样在web网络通信时,就可基于请求转发到不同组的goroutine进行一系列处理。

本作品采用《CC 协议》,转载必须注明作者和本文链接
pardon110
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
开发者 @ 社科大
文章
133
粉丝
24
喜欢
100
收藏
54
排名:107
访问:8.9 万
私信
所有博文
社区赞助商