Golang简单制作一个池

1.前置知识

1.代理模式

2.线程生命周期(先看图) — 了解线程的生命周期即可

3.go runtine

4.死锁,以及防止死锁, sync.Mutex

5.sync.WaitGroup 计数器

2.池思想

  • 为了减少内存的重复调用
  • 减少上下文切换时间

3.Golang中池思想

  • 根据合适的场景运用池,不合适的场景用池反而会适得其反
  • 由于go runtine 中的内存调度足够好,不需要进行内存的分配
  • 可以通过golang中限制 Goruntine的并发数,来限制内存调度
  • segmentfault.com/a/119000001795639...

4.进行实践(如果你看完了前置知识,你已经有能力独自写一个池)

1.确定任务和池的参数

const (
    STOPED = iota
    RUNNING
)

// Task 任务
type Task struct {
    //传入函数
    Handler func(v ...interface{})
    //需要传入的参数(v)
    Params  []interface{}
}

// Pool 池
type Pool struct {
    //最大容量
    MaxCap int
    //正在进行的任务数量
    active int
    //池的状态 RUNNING OR STOPED
    status int
    //任务通道
    chTask chan *Task
    //锁
    mu     sync.Mutex
    //用于等待一组线程的结束
    wg     sync.WaitGroup
}

2.根据代理模式和线程的生命周期创建函数

// NewPool 创建
func NewPool(maxCap int) (*Pool, error) {
    return nil,nil
}


// Put 就绪
func (p *Pool) Put(task *Task) error {
    return nil
}

// run 运行
func (p *Pool) run() {

}

//worker 阻塞
func (p *Pool) worker() {

}

// Close 结束
func (p *Pool) Close() {
}

3.填充内容

1.创建

// NewPool 创建
// 1.判断容量是否合理
// 2.初始化池
func NewPool(maxCap int) (*Pool, error) {
    if maxCap <= 0 {
        return nil, errors.New("invalid pool cap")
    }

    return &Pool{
        MaxCap: maxCap,
        status: RUNNING,
        chTask: make(chan *Task, maxCap),
    }, nil
}

2.就绪

// Put 就绪
// 1.加锁 --- 破坏循环等待条件
// 2.判断池的状态
// 3.放入消息队列
// 4.计数器+1
// 5.转到运行态
func (p *Pool) Put(task *Task) error {
    p.mu.Lock()
    defer p.mu.Unlock()

    if p.status == STOPED {
        return ErrPoolAlreadyClosed
    }

    //放入消息队列中
    p.chTask <- task
    //计数器+1
    p.wg.Add(1)

    //运行线程
    if p.active < p.MaxCap {
        p.run()
    }
    return nil
}

3.运行

// run 运行
// 1.运行数+1
// 2.通过 go runtine 运行
func (p *Pool) run() {
    p.active++
    go p.worker()
}

4.阻塞

//worker 阻塞
// 1.消费通道中的任务 <-p.chTask
//   select在channel为nil时会阻塞队列
// 2.执行函数
// 3.活动数-1
// 4.减少WaitGroup计数器的值
func (p *Pool) worker() {
    //延后执行
    defer func() {
        p.active--
        p.wg.Done()
    }()

    for {
        select {
        case task, ok := <-p.chTask:
            if !ok {
                return
            }
            task.Handler(task.Params...)
        }
    }
}

5.关闭

// Close 结束
// 1.设置状态为结束
// 2.关闭通道
// 3.阻塞直到WaitGroup计数器减为0
func (p *Pool) Close() {
    p.status = STOPED

    close(p.chTask)

    p.wg.Wait()
}

4.测试

func TestPool(t *testing.T) {
    t.Run("pool", func(t *testing.T) {

        pool, err := NewPool(20)
        defer pool.Close()
        assert.Nil(t, err)

        for i := 0; i < 20; i++ {
            pool.Put(&Task{
                Handler: func(v ...interface{}) {
                    fmt.Println(v)
                },
                Params: []interface{}{i},
            })
        }
    })
}

5.参考

segmentfault.com/a/119000002146835...

www.cnblogs.com/wongbingming/p/130...

geektutu.com/post/hpg-concurrency-...

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!