go 服务中常驻模块的优雅关闭

go 服务中常驻模块的优雅关闭

依赖

主要是使用 github.com/oklog/run 来做常驻服务的管理,同时反转依赖,服务模块依赖 *run.Group, 模块自身去实现 runner

这个库的实现也很简单,感兴趣的可以看一下源码。

main

package main

import (
    "log"
    "time"

    "demo/app"
    "demo/servers"

    "github.com/gin-gonic/gin"
)

func main() {
    httpsrv := servers.NewHTTPServer(":8080")
    httpsrv.LoadRoute(Route)

    a := app.NewApp()

    a.Register(
        &servers.SignalWatch{},
        httpsrv,
        // 其他常驻服务等等
        servers.NewPool(1, 10),
    )

    if err := a.Graceful(); err != nil {
        log.Printf("服务异常退出,原因: %s\n", err.Error())
    } else {
        log.Printf("服务已关闭\n")
    }
}

// Route 注册路由,这个方法一般单独放个文件,这里简单列一下
func Route(e *gin.Engine) {
    e.GET("/", func(c *gin.Context) {
        // 模拟来不及执行的协程,程序退出时应等待执行完成再退出
        pool.Go(func() {
            // 注意在携程中不要使用父级为 c 或 c.Request.Context() 的上下文
            // 假如这里是涉及数据库存储,以 gorm 为例 `db.WithContext(c).Save(&res)`
            // http 请求断开时,上下文也会退出,会退出后续操作,数据就不会存储
            time.Sleep(time.Second * 5)
            log.Println("我执行完啦")
        })
        c.JSON(200, "ok")
    })
}

app

常驻服务的管理

package app

import "github.com/oklog/run"

type runner interface {
    Run(wg *run.Group)
}

type App struct {
    servers []runner
    wg      run.Group
}

func NewApp() *App {
    return &App{}
}

// Register runner, should before Graceful
func (s *App) Register(f ...runner) {
    s.servers = append(s.servers, f...)
}

func (s *App) Graceful() error {
    for _, srv := range s.servers {
        srv.Run(&s.wg)
    }
    return s.wg.Run()
}

servers

一些常驻模块,可能不是服务,这里简单写一下

http
package servers

import (
    "context"
    "log"
    "net/http"
    "time"

    "github.com/gin-gonic/gin"
    "github.com/oklog/run"
)

type HTTPServer struct {
    addr   string
    engine *gin.Engine
}

type OptionFunc func(*HTTPServer)

func WithEngine(e *gin.Engine) OptionFunc {
    return func(s *HTTPServer) {
        s.engine = e
    }
}

// NewHTTPServer 提供 http 服务模块
func NewHTTPServer(addr string, opts ...OptionFunc) *HTTPServer {
    srv := &HTTPServer{
        addr:   addr,
        engine: gin.Default(),
    }
    for _, opt := range opts {
        opt(srv)
    }

    return srv
}

func (s *HTTPServer) LoadRoute(f func(e *gin.Engine)) {
    f(s.engine)
}

func (s *HTTPServer) Run(wg *run.Group) {
    srv := &http.Server{
        Addr:    s.addr,
        Handler: s.engine,
    }

    wg.Add(func() error {
        return srv.ListenAndServe()
    }, func(err error) {
        log.Println("HTTP 服务正在关闭")
        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()
        if err := srv.Shutdown(ctx); err != nil {
            log.Printf("HTTP 服务关闭时出现错误: %s\n", err)
        } else {
            log.Printf("HTTP 服务成功关闭\n")
        }
    })
}
signal
package servers

import (
    "context"
    "log"
    "os/signal"
    "syscall"

    "github.com/oklog/run"
)

// SignalWatch 信号监听模块
type SignalWatch struct {
}

func (s *SignalWatch) Run(wg *run.Group) {
    ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
    wg.Add(func() error {
        <-ctx.Done()
        log.Println("监听到退出信号了,开始关闭其他服务")
        return nil
    }, func(err error) {
        cancel()
        log.Println("信号监听已关闭")
    })
}

// 发现 github.com/oklog/run 提供了信号监听方法,简单列一下
func (s *SignalWatch) RunV2(wg *run.Group) {
    e, i := run.SignalHandler(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
    wg.Add(e, i)
}
pool

简单模拟一个协程池,其他更好的实现可参考:

package pool

import (
    "context"
    "github.com/oklog/run"
    "log"
    "sync"
    "sync/atomic"
)

var (
    poolObj *pool
    newOnce sync.Once
)

// pool 异步执行任务
type pool struct {
    worker uint
    jobs   chan func()
    wg     sync.WaitGroup

    closed int32
}

func NewPool(worker, cap uint) *pool {
    if worker == 0 {
        worker = 1
    }
    if cap == 0 {
        cap = 10
    }
    newOnce.Do(func() {
        poolObj = &pool{
            jobs:   make(chan func(), cap),
            worker: worker,
        }
    })
    return poolObj
}

func (p *pool) Run(wg *run.Group) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    defer p.close()
    for i := uint(0); i < p.worker; i++ {
        wg.Add(func() error {
            for {
                select {
                case job, ok := <-p.jobs:
                    if !ok {
                        return nil
                    }
                    p.runJob(job)
                case <-ctx.Done():
                    return nil
                }
            }
        }, func(err error) {
            cancel()
        })
    }
}

func (p *pool) close() {
    atomic.StoreInt32(&p.closed, 1)
    close(p.jobs)
    log.Printf("协程池关闭中\n")
    for job := range p.jobs {
        p.runJob(job)
    }
    p.wg.Wait()
    log.Printf("协程池已关闭\n")
}


func (p *pool) runJob(f func()) {
    p.wg.Add(1)
    go func() {
        defer func() {
            if err := recover(); err != nil {
                log.Panic(err)
            }
        }()
        defer p.wg.Done()
        f()
    }()
}

func Go(f func()) {
    // jobs 关闭时再写入会 panic,所以这里退化为同步执行即可
    if atomic.LoadInt32(&poolObj.closed) == 1 {
        f()
        return
    }
    poolObj.jobs <- f
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!