go 服务中常驻模块的优雅关闭
go 服务中常驻模块的优雅关闭
依赖
主要是使用 github.com/oklog/run
来做常驻服务的管理,同时反转依赖,服务模块依赖 *run.Group
, 模块自身去实现 runner
。
这个库的实现也很简单,感兴趣的可以看一下源码。
main
package main
import (
"log"
"time"
"demo/app"
"demo/servers"
"github.com/gin-gonic/gin"
)
func main() {
httpsrv := servers.NewHTTPServer(":8080")
httpsrv.LoadRoute(Route)
a := app.NewApp()
a.Register(
&servers.SignalWatch{},
httpsrv,
// 其他常驻服务等等
servers.NewPool(1, 10),
)
if err := a.Graceful(); err != nil {
log.Printf("服务异常退出,原因: %s\n", err.Error())
} else {
log.Printf("服务已关闭\n")
}
}
// Route 注册路由,这个方法一般单独放个文件,这里简单列一下
func Route(e *gin.Engine) {
e.GET("/", func(c *gin.Context) {
// 模拟来不及执行的协程,程序退出时应等待执行完成再退出
pool.Go(func() {
// 注意在携程中不要使用父级为 c 或 c.Request.Context() 的上下文
// 假如这里是涉及数据库存储,以 gorm 为例 `db.WithContext(c).Save(&res)`
// http 请求断开时,上下文也会退出,会退出后续操作,数据就不会存储
time.Sleep(time.Second * 5)
log.Println("我执行完啦")
})
c.JSON(200, "ok")
})
}
app
常驻服务的管理
package app
import "github.com/oklog/run"
type runner interface {
Run(wg *run.Group)
}
type App struct {
servers []runner
wg run.Group
}
func NewApp() *App {
return &App{}
}
// Register runner, should before Graceful
func (s *App) Register(f ...runner) {
s.servers = append(s.servers, f...)
}
func (s *App) Graceful() error {
for _, srv := range s.servers {
srv.Run(&s.wg)
}
return s.wg.Run()
}
servers
一些常驻模块,可能不是服务,这里简单写一下
http
package servers
import (
"context"
"log"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/oklog/run"
)
type HTTPServer struct {
addr string
engine *gin.Engine
}
type OptionFunc func(*HTTPServer)
func WithEngine(e *gin.Engine) OptionFunc {
return func(s *HTTPServer) {
s.engine = e
}
}
// NewHTTPServer 提供 http 服务模块
func NewHTTPServer(addr string, opts ...OptionFunc) *HTTPServer {
srv := &HTTPServer{
addr: addr,
engine: gin.Default(),
}
for _, opt := range opts {
opt(srv)
}
return srv
}
func (s *HTTPServer) LoadRoute(f func(e *gin.Engine)) {
f(s.engine)
}
func (s *HTTPServer) Run(wg *run.Group) {
srv := &http.Server{
Addr: s.addr,
Handler: s.engine,
}
wg.Add(func() error {
return srv.ListenAndServe()
}, func(err error) {
log.Println("HTTP 服务正在关闭")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Printf("HTTP 服务关闭时出现错误: %s\n", err)
} else {
log.Printf("HTTP 服务成功关闭\n")
}
})
}
signal
package servers
import (
"context"
"log"
"os/signal"
"syscall"
"github.com/oklog/run"
)
// SignalWatch 信号监听模块
type SignalWatch struct {
}
func (s *SignalWatch) Run(wg *run.Group) {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
wg.Add(func() error {
<-ctx.Done()
log.Println("监听到退出信号了,开始关闭其他服务")
return nil
}, func(err error) {
cancel()
log.Println("信号监听已关闭")
})
}
// 发现 github.com/oklog/run 提供了信号监听方法,简单列一下
func (s *SignalWatch) RunV2(wg *run.Group) {
e, i := run.SignalHandler(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
wg.Add(e, i)
}
pool
简单模拟一个协程池,其他更好的实现可参考:
package pool
import (
"context"
"github.com/oklog/run"
"log"
"sync"
"sync/atomic"
)
var (
poolObj *pool
newOnce sync.Once
)
// pool 异步执行任务
type pool struct {
worker uint
jobs chan func()
wg sync.WaitGroup
closed int32
}
func NewPool(worker, cap uint) *pool {
if worker == 0 {
worker = 1
}
if cap == 0 {
cap = 10
}
newOnce.Do(func() {
poolObj = &pool{
jobs: make(chan func(), cap),
worker: worker,
}
})
return poolObj
}
func (p *pool) Run(wg *run.Group) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
defer p.close()
for i := uint(0); i < p.worker; i++ {
wg.Add(func() error {
for {
select {
case job, ok := <-p.jobs:
if !ok {
return nil
}
p.runJob(job)
case <-ctx.Done():
return nil
}
}
}, func(err error) {
cancel()
})
}
}
func (p *pool) close() {
atomic.StoreInt32(&p.closed, 1)
close(p.jobs)
log.Printf("协程池关闭中\n")
for job := range p.jobs {
p.runJob(job)
}
p.wg.Wait()
log.Printf("协程池已关闭\n")
}
func (p *pool) runJob(f func()) {
p.wg.Add(1)
go func() {
defer func() {
if err := recover(); err != nil {
log.Panic(err)
}
}()
defer p.wg.Done()
f()
}()
}
func Go(f func()) {
// jobs 关闭时再写入会 panic,所以这里退化为同步执行即可
if atomic.LoadInt32(&poolObj.closed) == 1 {
f()
return
}
poolObj.jobs <- f
}
本作品采用《CC 协议》,转载必须注明作者和本文链接