一种多协程跑脚本的写法

有时候数据缺失,需要写脚本从其他地方的接口获取数据。
1.采用生产-消费模式,只需配置start,end,生产者数量,消费者数量。
2.监听中断信号,安全退出

package main
import (
    "fmt"
    "os"
    "os/signal"
    "runtime"
    "strconv"
    "sync"
    "syscall"
    "time"
)
//多协程跑脚本任务
type Handler struct {
    TaskCh      chan int
    ResultCh    chan string
    Start       int
    End         int
    wgWork      sync.WaitGroup
    wgConsumer  sync.WaitGroup
    WorkerNum   int
    ConsumerNum int
    InterruptCh chan os.Signal //接受os信号
    IsClose     bool
}
func main() {
    handler := NewHandler()
    handler.Init()
    handler.Serve()
}
func NewHandler() *Handler {
    return &Handler{
        TaskCh:      make(chan int),
        ResultCh:    make(chan string),
        Start:       0,
        End:         100,
        WorkerNum:   3,
        ConsumerNum: 3,
        wgWork:      sync.WaitGroup{},
        wgConsumer:  sync.WaitGroup{},
        InterruptCh: make(chan os.Signal),
        IsClose:     false,
    }
}
func (h *Handler) Init() {
    runtime.GOMAXPROCS(256)
    if h.Start > h.End {
        panic("start > End")
    }
    signal.Notify(h.InterruptCh, os.Interrupt, syscall.SIGTERM) //接受系统中断信号,会写入通道
}
func (h *Handler) Serve() {
    //1.把任务写入队列
    go func(s, e int) {
        defer close(h.TaskCh) //1.1 任务全部写入后及时关闭通道
        for i := s; i <= e; i++ {
            if h.IsClose {
                return
            }
            h.TaskCh <- i
        }
    }(h.Start, h.End)
    //2.创建worker,每个worker循环监听任务队列
    h.wgWork.Add(h.WorkerNum)
    for j := 0; j < h.WorkerNum; j++ {
        go h.StartWork()
    }
    //3.返回结果,循环读取结果队列
    h.wgConsumer.Add(h.ConsumerNum)
    for i := 0; i < h.ConsumerNum; i++ {
        go h.Consumer()
    }
    //4.监听系统信号,提前关闭任务
    go func() {
        <-h.InterruptCh
        fmt.Print("receive os signal closing \n")
        h.Close()
    }()
    //wait是等待worker协程,worker完成后,结果通道也就没有数据写了,要及时关掉
    h.wgWork.Wait()
    close(h.ResultCh)
    h.wgConsumer.Wait()
}
//worker是死循环,任务读完退出
func (h *Handler) StartWork() {
    defer func() {        
        h.wgWork.Done()
    }()
    for {
        select {
        case t, ok := <-h.TaskCh:
            if ok == false {
                return
            }
            res := GetData(t)
            h.ResultCh <- res
            fmt.Println("input result:", res)
        }
    }
}
func (h *Handler) Consumer() {
    defer h.wgConsumer.Done()
    for res := range h.ResultCh {
        time.Sleep(1e9)
        fmt.Println("res:", res)
    }
}
func (h *Handler) Close() {
    h.IsClose = true
}
func GetData(number int) string {
    return strconv.Itoa(number)
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
用过哪些工具?为啥用这个工具(速度快,支持高并发...)?底层如何实现的?
讨论数量: 1

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!