在 Go 中,正确的使用并发
Glyph Lefkowitz 最近写了一篇启蒙文章,其中他详细的说明了一些关于开发高并发软件的挑战,如果你开发软件但是没有阅读这篇问题,那么我建议你阅读一篇。这是一篇非常好的文章,现代软件工程应该拥有的丰富智慧。
从多个花絮中提取,但是如果我斗胆提出主要观点的总结,其内容就是:抢占式多任务和一般共享状态结合导致软件开发过程不可管理的复杂性, 开发人员可能更喜欢保持自己的一些理智以此避免这种不可管理的复杂性。抢占式调度对于哪些真正的并行任务是好的,但是当可变状态通过多并发线程共享时,明确的多任务合作更招人喜欢 。
尽管合作多任务,你的代码仍有可能是复杂的,它只是有机会保持可管理下一定的复杂性。当控制转移是明确一个代码阅读者至少有一些可见的迹象表明事情可能脱离正轨。没有明确标记每个新阶段是潜在的地雷:“如果这个操作不是原子操作,最后出现什么情况?”那么在每个命令之间的空间变成无尽的空间黑洞,可怕的 Heisenbugs 出现。
在过去的一年多,尽管在 Heka 上的工作(一个高性能数据、日志和指标处理引擎)已大多数使用 GO 语言开发。Go 的亮点之一就是语言本身有一些非常有用的并发原语。但是 Go 的并发性能怎么样,需要通过支持本地推理的鼓励代码镜头观察。
并非事实都是好的。所有的 Goroutine 访问相同的共享内存空间,状态默认可变,但是 Go 的调度程序不保证在上下文选择过程中的准确性。在单核设置中,Go 的运行时间进入“隐式协同工作”一类, 在 Glyph 中经常提到的异步程序模型列表选择 4。 当 Goroutine 能够在多核系统中并行运行,世事难料。
Go 不可能保护你,但是并不意味着你不能采取措施保护自己。在写代码过程中通过使用一些 Go 提供的原语,可最小化相关的抢占式调度产生的异常行为。请看下面 Glyph 示例“账号转换”代码段中 Go 接口(忽略哪些不易于最终存储定点小数的浮点数)
func Transfer(amount float64, payer, payee *Account,
server SomeServerType) error {
if payer.Balance() < amount {
return errors.New("Insufficient funds")
}
log.Printf("%s has sufficient funds", payer)
payee.Deposit(amount)
log.Printf("%s received payment", payee)
payer.Withdraw(amount)
log.Printf("%s made payment", payer)
server.UpdateBalances(payer, payee) // 假设这始终有效
return nil
}
这明显的是不安全的,如果从多个goroutine中调用的话,因为它们可能并发的从存款调度中得到相同的结果,然后一起请求更多的已取消调用的存款变量。最好是代码中危险部分不会被多个协程执行 。下面是实现这个功能的方法:
type transfer struct {
payer *Account
payee *Account
amount float64
}
var xferChan = make(chan *transfer)
var errChan = make(chan error)
func init() {
go transferLoop()
}
func transferLoop() {
for xfer := range xferChan {
if xfer.payer.Balance < xfer.amount {
errChan <- errors.New("Insufficient funds")
continue
}
log.Printf("%s has sufficient funds", xfer.payer)
xfer.payee.Deposit(xfer.amount)
log.Printf("%s received payment", xfer.payee)
xfer.payer.Withdraw(xfer.amount)
log.Printf("%s made payment", xfer.payer)
errChan <- nil
}
}
func Transfer(amount float64, payer, payee *Account,
server SomeServerType) error {
xfer := &transfer{
payer: payer,
payee: payee,
amount: amount,
}
xferChan <- xfer
err := <-errChan
if err == nil {
server.UpdateBalances(payer, payee) // Still magic.
}
return err
}
这里有更多代码,但是我们通过实现一个简单的事件循环消除并发问题。当代码首次执行时,它通过协程执行一个循环。为了转发请求,传递一个新创建的通道。结果从一个错误通道返回到循环外部。因为这些通道是没有缓冲的,他们是阻塞的,并且无论通过 Transfer 函数发送多少个并发传输请求 ,它们都将通过单一的运行事件循环连续的服务。
上面的代码也许有点笨拙,对于这样一个简单的场景一个互斥锁(mutex)也许会是一个更好的选择,但是我在尝试演示将状态操作隔离到单个协程的技术。即使有点笨拙,它对于大多数的需求已经表现的足够好了,即使是最简单的 Account 结构体实现,它也可以工作:
type Account struct {
balance float64
}
func (a *Account) Balance() float64 {
return a.balance
}
func (a *Account) Deposit(amount float64) {
log.Printf("depositing: %f", amount)
a.balance += amount
}
func (a *Account) Withdraw(amount float64) {
log.Printf("withdrawing: %f", amount)
a.balance -= amount
}
然而,Account 的实现过于幼稚似乎很愚蠢。通过不允许任何大于当前余额的提款,让 Account 结构体本身提供一些保护可能更有意义。如果我们将 Withdraw 函数改成下面内容会怎样?:
func (a *Account) Withdraw(amount float64) {
if amount > a.balance {
log.Println("Insufficient funds")
return
}
log.Printf("withdrawing: %f", amount)
a.balance -= amount
}
不幸的是,这个代码与我们原来的 Transfer 实现有同样的问题。并行执行或不切实际的上下文切换意味着我们最终可能会出现负值。 幸运的是,内部事件循环的想法在这里同样适用,也许更加整洁,因为事件循环协程可以很好地与每个人的 Account 结构实例耦合。 这个例子看起来就像下面这样:
type Account struct {
balance float64
deltaChan chan float64
balanceChan chan float64
errChan chan error
}
func NewAccount(balance float64) (a *Account) {
a = &Account{
balance: balance,
deltaChan: make(chan float64),
balanceChan: make(chan float64),
errChan: make(chan error),
}
go a.run()
return
}
func (a *Account) Balance() float64 {
return <-a.balanceChan
}
func (a *Account) Deposit(amount float64) error {
a.deltaChan <- amount
return <-a.errChan
}
func (a *Account) Withdraw(amount float64) error {
a.deltaChan <- -amount
return <-a.errChan
}
func (a *Account) applyDelta(amount float64) error {
newBalance := a.balance + amount
if newBalance < 0 {
return errors.New("Insufficient funds")
}
a.balance = newBalance
return nil
}
func (a *Account) run() {
var delta float64
for {
select {
case delta = <-a.deltaChan:
a.errChan <- a.applyDelta(delta)
case a.balanceChan <- a.balance:
// 什么也不做, 我们已经完成了我们的目的 w/ 通道释放。
}
}
}
这个 API 略有不同,Deposit 和 Withdraw 方法现在都返回了错误。它们并非直接处理它们的请求,而是把账户余额的调整量放入 deltaChan,在 run 方法运行时的事件循环中访问 deltaChan。同样的,Balance 方法通过阻塞不断地在事件循环中请求数据,直到它通过 balanceChan 接收到一个值。
须注意的要点是上述的代码,所有对结构内部数据值得直接访问和修改都是有事件循环触发的 within 代码来完成的. 如果公共 API 调用表现良好并且只使用给出的渠道同数据进行交互的话, 那么不管对公共方法进行多少并发的调用,我们都知道在任意给定的时间只会有它们之中的一个方法得到处理. 我们的时间循环代码推理起来更加容易了很多.
该模式的核心是 Heka’s 的设计. 当 Heka 启动时,它会读取配置文件并且在它自己的 go 例程中启动每一个插件. 随着时钟信号、关闭通知和其它控制信号,数据经由通道被送入插件中. 这样就鼓励了插件作者使用一种想上述事例那样的 事件循环类型的架构 来实现插件的功能.
再次,GO 不会保护你自己. 写一个同其内部数据管理和主题有争议的条件保持松耦合的 Heka 插件(或者任何架构)是完全可能的. 但是有一些需要注意的小地方,还有 Go 的 争议探测器 的自由应用程序,你可以编写的代码其行为可以预测,甚至在抢占式调度的门面代码中.
本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。
推荐文章: