Golang简单制作一个池
1.前置知识
1.代理模式
- 可以用以下的例子来学习
- github.com/crazybber/go-pattern-ex...
2.线程生命周期(先看图) — 了解线程的生命周期即可
- 线程状态: 创建(new),就绪(runnable),运行(run),阻塞(block),结束(dead)
- www.cnblogs.com/sunddenly/p/410656...
3.go runtine
- 了解go runtine 中的运行状态能帮助我们判断输出结果为什么是稀奇古怪的
- blog.csdn.net/kojhliang/article/de...
4.死锁,以及防止死锁, sync.Mutex
- 死锁四种条件: 互斥条件,请求与保持条件,不剥夺条件,循环等待条件
- zhuanlan.zhihu.com/p/61221667
- studygolang.com/pkgdoc
5.sync.WaitGroup 计数器
- 可以减少项目外使用 time.Sleep()
- studygolang.com/articles/12972
2.池思想
- 为了减少内存的重复调用
- 减少上下文切换时间
3.Golang中池思想
- 根据合适的场景运用池,不合适的场景用池反而会适得其反
- 由于go runtine 中的内存调度足够好,不需要进行内存的分配
- 可以通过golang中限制 Goruntine的并发数,来限制内存调度
- segmentfault.com/a/119000001795639...
4.进行实践(如果你看完了前置知识,你已经有能力独自写一个池)
1.确定任务和池的参数
const (
STOPED = iota
RUNNING
)
// Task 任务
type Task struct {
//传入函数
Handler func(v ...interface{})
//需要传入的参数(v)
Params []interface{}
}
// Pool 池
type Pool struct {
//最大容量
MaxCap int
//正在进行的任务数量
active int
//池的状态 RUNNING OR STOPED
status int
//任务通道
chTask chan *Task
//锁
mu sync.Mutex
//用于等待一组线程的结束
wg sync.WaitGroup
}
2.根据代理模式和线程的生命周期创建函数
// NewPool 创建
func NewPool(maxCap int) (*Pool, error) {
return nil,nil
}
// Put 就绪
func (p *Pool) Put(task *Task) error {
return nil
}
// run 运行
func (p *Pool) run() {
}
//worker 阻塞
func (p *Pool) worker() {
}
// Close 结束
func (p *Pool) Close() {
}
3.填充内容
1.创建
// NewPool 创建
// 1.判断容量是否合理
// 2.初始化池
func NewPool(maxCap int) (*Pool, error) {
if maxCap <= 0 {
return nil, errors.New("invalid pool cap")
}
return &Pool{
MaxCap: maxCap,
status: RUNNING,
chTask: make(chan *Task, maxCap),
}, nil
}
2.就绪
// Put 就绪
// 1.加锁 --- 破坏循环等待条件
// 2.判断池的状态
// 3.放入消息队列
// 4.计数器+1
// 5.转到运行态
func (p *Pool) Put(task *Task) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.status == STOPED {
return ErrPoolAlreadyClosed
}
//放入消息队列中
p.chTask <- task
//计数器+1
p.wg.Add(1)
//运行线程
if p.active < p.MaxCap {
p.run()
}
return nil
}
3.运行
// run 运行
// 1.运行数+1
// 2.通过 go runtine 运行
func (p *Pool) run() {
p.active++
go p.worker()
}
4.阻塞
//worker 阻塞
// 1.消费通道中的任务 <-p.chTask
// select在channel为nil时会阻塞队列
// 2.执行函数
// 3.活动数-1
// 4.减少WaitGroup计数器的值
func (p *Pool) worker() {
//延后执行
defer func() {
p.active--
p.wg.Done()
}()
for {
select {
case task, ok := <-p.chTask:
if !ok {
return
}
task.Handler(task.Params...)
}
}
}
5.关闭
// Close 结束
// 1.设置状态为结束
// 2.关闭通道
// 3.阻塞直到WaitGroup计数器减为0
func (p *Pool) Close() {
p.status = STOPED
close(p.chTask)
p.wg.Wait()
}
4.测试
func TestPool(t *testing.T) {
t.Run("pool", func(t *testing.T) {
pool, err := NewPool(20)
defer pool.Close()
assert.Nil(t, err)
for i := 0; i < 20; i++ {
pool.Put(&Task{
Handler: func(v ...interface{}) {
fmt.Println(v)
},
Params: []interface{}{i},
})
}
})
}
5.参考
segmentfault.com/a/119000002146835...
www.cnblogs.com/wongbingming/p/130...
geektutu.com/post/hpg-concurrency-...
本作品采用《CC 协议》,转载必须注明作者和本文链接