实用的context
context是什么
context翻译过来是上下文的意思,在go里面是用来给goroutine传递上下文的,可以传递取消信号、超时、k-v等等,比如说我们经常用它来存储客户端传过来的原子参数,token、cookie等等
context的使用
上文说了,context可以传递cv,设置goroutine取消、超时等,我们用一些简单代码来看看context是怎么实现这些功能的
传递k-v
func TestCommon(t *testing.T) {
ctx := setTokenContext(context.Background())
wg := new(sync.WaitGroup)
wg.Add(2)
go run(ctx, wg, 1)
go run(ctx, wg, 2)
wg.Wait()
}
func getToken(ctx context.Context) string {
return ctx.Value("token").(string) // 从context中获取token,因为value方法取出来是interface{},所以需要类型断言成string
}
func setTokenContext(ctx context.Context) context.Context{
return context.WithValue(ctx, "token", "abc-abc-abc") // context中设置token
}
func run(ctx context.Context, wg *sync.WaitGroup, num int) {
//todo 业务
defer wg.Done()
token := getToken(ctx)
fmt.Printf("这里是第%d个goroutian, token为:%s \n", num, token)
}
上面这段代码我们现创建了一个context,然后通过WithValue函数往context中存储了token数据,然后开启两个groutine去获取token,最终的打印结果为下图
关闭goroutine
通常我们可以通过context发送一个信号来关闭我们需要关闭的goroutine
func TestCommon(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go run(ctx, 1) //goroutian1
go run(ctx, 2) //goroutian2
time.Sleep(time.Second)
cancel() // 发送取消信号
time.Sleep(time.Second)
}
func run(ctx context.Context, num int) {
for {
select {
case <- ctx.Done():
fmt.Printf("groutine%d被关闭了\n", num)
return
default:
//执行正常逻辑
fmt.Printf("groutine%d正在执行\n", num)
}
time.Sleep(time.Second) // 每秒钟执行一次
}
}
先调用了一下WithCancel,可以看到函数返回了一个context,还有一个cancel
函数,这个cancel函数其实就是发送一个信号给ctx.Done()
返回的channel
,所以我们通过监听这个channel就可以友好的关闭goroutine,具体的实现逻辑等下文源码解析时会说
打印结果
控制超时
可以使用context来控制groutian的执行时间,超过某个时间就关闭掉,例如http超时,在golang里就是通过context实现的,先看示例代码:
func TestCommon(t *testing.T) {
//ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10 * time.Second))
ctx, _ := context.WithTimeout(context.Background(), time.Second * 3)
go run(ctx) //goroutian1
time.Sleep(5 * time.Second)
}
func run(ctx context.Context) {
for {
select {
case <- ctx.Done():
fmt.Printf("groutine超时被关闭了\n")
return
default:
//执行正常逻辑
fmt.Printf("groutine正在执行\n")
}
time.Sleep(time.Second) // 每秒钟执行一次
}
}
先调用了WithTimeout
函数,返回两个数据,第一个是context,第二个和上面取消的逻辑一样,也会返回一个cancel
函数,所以,也是可以手动cancel的,这里主要说超时,所以先把它过滤掉。WithTimeout函数第二个参数是个时间,time.xx
,这里可以传秒、分钟、小时等等等,传多少时间进去,代表我们希望groutine多长时间超时,超时以后和上文提到的手动取消一样,也是发送一个信号给ctx.Done()
返回的channel
注意:代码中注释了一行context.WithDeadline其实它和WithTimeout的执行是完全一样的,就是参数不一样而已,这个一会解析源码的时候也会说
打印结果:
原理解析
先看下context源码包,位于$GOPATH/src/context文件夹下,context.go,很简单,总共500多行代码,注释就有200多行
接口
context
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
Context
是一个接口,定义了 4 个方法
Done()
返回一个 channel,可以表示 context 被取消的信号:当这个 channel 被关闭时,说明 context 被取消了。注意,这是一个只读的channel。 我们知道,读一个已关闭的 channel 会读出相应类型的零值。并且源码里没有地方会向这个 channel 里面放入值。换句话说,这是一个 receive-only
的 channel。因此在子协程里读这个 channel,除非被关闭,否则读不出来任何东西。也正是利用了这一点,子协程从 channel 里读出了值(零值)后,就可以做一些收尾工作,尽快退出。
Err()
返回一个错误,表示 channel 被关闭的原因。例如是被取消,还是超时。
Deadline()
返回 context 的截止时间,通过此时间,函数就可以决定是否进行接下来的操作,如果时间太短,就可以不往下做了,否则浪费系统资源。当然,也可以用这个 deadline 来设置一个 I/O 操作的超时时间。
Value()
获取之前设置的 key 对应的 value。
canceler
再来看另一个接口
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
实现了上面定义的两个方法的 Context,就表明该 Context 是可取消的。源码中有两个类型实现了 canceler 接口:cancelCtx 和 timerCtx。
结构体
emptyCtx
源码中给出了一个context接口的空实现,一般用来当源数据
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
这段代码太简单了,一个emptyCtx实现了Context接口,并且每个函数都返回都是空的这是一个空的Context,不会被cancel,不会用来储存值,源码把它包装成了两个对象
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}
func TODO() Context {
return todo
}
Background 通常用在 main 函数中,作为所有 context 的根节点。TODO一般是在那种我们不知道要传什么ctx时,作为一个占位符来使用
valueCtx
type valueCtx struct {
Context
key, val interface{}
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
//创建一个带key-value的context
func WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
valueCtx的结构很简单,匿名字段Context,还有key,value为两个interface用来存储值
先看一下获取valueCtx的方法WithValue
,函数有三个参数,刚好和结构体对应,比如我们上面示例代码那块写的ctx := context.WithValue(context.Background(), "token", "abc")
,这个background就是parent,然后赋值给结构体的第一个参数,最后返回。我们发现ctx是没有其他结构类似SetValue
这种可以设置key和值的方法的,那假如说我想再往里面增加一条数据怎么办呢,我们只能像这样:
ctx := context.WithValue(context.Background(), "a", 1)
ctx = context.WithValue(ctx, "b", 2)
这时候两个ctx变量已经不是同一个context了,第一个ctx变成了第二个ctx的parent,那这个时候第二个ctx是个什么结构,根据上面的代码,生成的ctx结构应该是这样的
valueCtx {
parent: {
parent:emptyCtx //background
key: "a",
value: 1,
}
key:"b",
value:2
}
很像一个倒着的链表,用parent来关联父级,现在再来看看Value
函数,代码很简单,判断key是否和传过来的key相等,如果相等直接返回,否则调用父级的value方法,这里相当于递归寻找,直到最后找到background根节点。
cancelCtx
我们再开看可取消的context实现
type cancelCtx struct {
Context
mu sync.Mutex // 用来加锁
done chan struct{} // 接收关闭信号的channel
children map[canceler]struct{} // 所有的子节点存为一个map
err error // 保存被取消以后的取消原因,cancel或者超时
}
它实现了canceler接口,是一个可取消的context,由于里面也含有一个Context匿名字段,所以可以被看作是Context类型,先来看看怎么创建它
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}
代码很简单,初始化一个cancelCtx,和valueCtx一样,把parent赋值给了匿名属性Context,然后返回一个Context和cancel函数,主要看一下propagateCancel
函数做了些什么
func propagateCancel(parent Context, child canceler) {
// 获取父级的done,判断父级是否含有cancelCtx
// 如果父级无cancelCtx,直接返回
// 否则使用map增加关联关系,方便将来cancel父级的时候把子级一起cancel
done := parent.Done()
if done == nil {
return // parent is never canceled
}
// 如果父级已经取消了,取消当前ctx
select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}
// 寻找父级cancelCtx(递归查找)
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
// 核心!!!!!父级的children中保存当前ctx
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
atomic.AddInt32(&goroutines, +1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
其实这段代码的核心就两点,判断是否有上级,并且是cancelCtx类型,如果没有,什么也不用做,如果有的话就用上级的children保存当前ctx,方便将来级联取消。
接下来我们看看cancel函数
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
// 获取ctx的done(是个channel),然后关闭这个channel
d, _ := c.done.Load().(chan struct{})
if d == nil {
c.done.Store(closedchan)
} else {
close(d)
}
// 获取下级context,级联取消
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
这段代码的核心其实就是close这个ctx中的done,还记得上文中提到我们怎么监听协程是否需要关闭:
case <- ctx.Done():
通过监听ctx.Done(),如果拿到值就代表被cancel了,当调用close时,这边的监听就可以拿到数据了。
最后,让我们看下timeCtx的实现
// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
// 如果父级也是时间类型的ctx,判断超时时间是否大于父级的超时时间,如果大于其实是不需要生成新的ctx,上文有讲,因为父级超时时间到了会直接调用cancel,把子级也一起取消。
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
// 生成时间类型ctx,并设置cancelCtx属性
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
// 上文有专门针对这个函数的讲解
propagateCancel(parent, c)
dur := time.Until(d)
// 如果超时时间小于0,直接取消
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
// 重点!!!!!!!开启定时器,并且到时调用cancel函数
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
代码很简单,如果理解了上面cancel类型的context,那这个无非就是加了一个定时器,到时自动触发cancel函数而已。
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: