字节切片的多级内存池
第一个版本
package memoryPool
import (
"bytes"
"sync"
)
// BytesPool 字节切片的多级内存池
type BytesPool struct {
pools map[int]*sync.Pool
step int
mux sync.RWMutex
}
// NewBytesPool 创建字节切片的多级内存池
func NewBytesPool(step int) *BytesPool {
return &BytesPool{
pools: map[int]*sync.Pool{},
step: step,
mux: sync.RWMutex{},
}
}
// Get 获取字节切片
func (r *BytesPool) Get(capacity int) *bytes.Buffer {
poolIndex := (capacity + r.step - 1) / r.step
r.mux.RLock()
pool, ok := r.pools[poolIndex]
r.mux.RUnlock()
if ok {
return pool.Get().(*bytes.Buffer)
}
return bytes.NewBuffer(make([]byte, 0, poolIndex*r.step))
}
// Put 归还字节切片
func (r *BytesPool) Put(b *bytes.Buffer) {
poolIndex := (b.Cap() + r.step - 1) / r.step
b.Truncate(0)
r.mux.RLock()
pool, ok := r.pools[poolIndex]
r.mux.RUnlock()
if ok {
pool.Put(b)
return
}
r.mux.Lock()
pool, ok = r.pools[poolIndex]
if ok {
r.mux.Unlock()
pool.Put(b)
return
}
pool = &sync.Pool{
New: func() any {
return bytes.NewBuffer(make([]byte, 0, poolIndex*r.step))
},
}
r.pools[poolIndex] = pool
r.mux.Unlock()
pool.Put(b)
}
由于Get 和 Put 使用不同capacity计算key,会导致池分裂。
复现代码:
func TestPoolSplit(t *testing.T) {
p := NewBytesPool(64)
// 请求 100 B
b1 := p.Get(100)
t.Log("Get index=2, Cap=", b1.Cap()) // 128
// 模拟内部扩容
b1.Write(make([]byte, 300))
t.Log("After write Cap=", b1.Cap()) // >=512
// 归还
p.Put(b1)
// 再请求 100 B,应该复用同一个池
b2 := p.Get(100)
if b1 != b2 {
t.Log("Different buffer returned → 池分裂!")
}
}
第二个版本
package memoryPool
import (
"bytes"
"sync"
)
// BytesPool 字节切片的多级内存池
type BytesPool struct {
pools map[int]*sync.Pool
step int
mux sync.RWMutex
}
type Buffer struct {
*bytes.Buffer
poolIndex int
}
// NewBytesPool 创建字节切片的多级内存池
func NewBytesPool(step int) *BytesPool {
return &BytesPool{
pools: map[int]*sync.Pool{},
step: step,
mux: sync.RWMutex{},
}
}
// Get 获取字节切片
func (r *BytesPool) Get(capacity int) *Buffer {
poolIndex := (capacity + r.step - 1) / r.step
r.mux.RLock()
pool, ok := r.pools[poolIndex]
r.mux.RUnlock()
if ok {
return pool.Get().(*Buffer)
}
r.mux.Lock()
pool, ok = r.pools[poolIndex]
if !ok {
pool = &sync.Pool{
New: func() any {
return &Buffer{
Buffer: bytes.NewBuffer(make([]byte, 0, poolIndex*r.step)),
poolIndex: poolIndex,
}
},
}
r.pools[poolIndex] = pool
}
r.mux.Unlock()
return pool.Get().(*Buffer)
}
// Put 归还字节切片
func (r *BytesPool) Put(b *Buffer) {
b.Reset()
wantIndex := (b.Cap() + r.step - 1) / r.step
r.mux.RLock()
if wantIndex == b.poolIndex {
//内存尺寸没有扩张,直接归还
pool := r.pools[wantIndex]
r.mux.RUnlock()
pool.Put(b)
return
}
//内存尺寸已扩张,检查扩张后对应的pool是否存在
pool, ok := r.pools[wantIndex]
if ok {
r.mux.RUnlock()
//内存尺寸扩张后,对应的pool存在,说明已经有需求创建更大的内存尺寸,直接归还新的池子中进行复用
b.poolIndex = wantIndex
pool.Put(b)
return
}
//内存尺寸扩张后,对应的pool不存在,说明当前没有创建更大内存尺寸的需求,继续归还到原来的池子中,以便后续复用
pool = r.pools[b.poolIndex]
r.mux.RUnlock()
pool.Put(b)
}
改进池分裂问题:
- 没扩张 → 回原池
- 扩张且大池已存在 → 进大池
- 扩张但大池未创建 → 退回原池
使用内存池时,如果大部分情况下某几个尺寸即可满足,但是偶发的会尺寸要求可能导致内存池持有超大尺寸的内存,这个超大尺寸的内存又不是高频使用的,可能导致浪费,所以需要新增最大尺寸限制。
第三个版
package memoryPool
import (
"bytes"
"sync"
)
// BytesPool 字节切片的多级内存池
type BytesPool struct {
pools map[int]*sync.Pool
step int
maxCapacity int
mux sync.RWMutex
}
type Buffer struct {
*bytes.Buffer
poolIndex int
}
// NewBytesPool 创建字节切片的多级内存池
func NewBytesPool(step int, maxCapacity int) *BytesPool {
return &BytesPool{
pools: map[int]*sync.Pool{},
step: step,
maxCapacity: maxCapacity,
mux: sync.RWMutex{},
}
}
// Get 获取字节切片
func (r *BytesPool) Get(capacity int) *Buffer {
//超过内存池管理的尺寸,直接创建
if capacity > r.maxCapacity {
return &Buffer{
Buffer: bytes.NewBuffer(make([]byte, 0, capacity)),
poolIndex: -1,
}
}
poolIndex := (capacity + r.step - 1) / r.step
r.mux.RLock()
pool, ok := r.pools[poolIndex]
r.mux.RUnlock()
if ok {
return pool.Get().(*Buffer)
}
r.mux.Lock()
pool, ok = r.pools[poolIndex]
if !ok {
pool = &sync.Pool{
New: func() any {
return &Buffer{
Buffer: bytes.NewBuffer(make([]byte, 0, poolIndex*r.step)),
poolIndex: poolIndex,
}
},
}
r.pools[poolIndex] = pool
}
r.mux.Unlock()
return pool.Get().(*Buffer)
}
// Put 归还字节切片
func (r *BytesPool) Put(b *Buffer) {
//超过内存池管理的尺寸,不复用,避免长期占用内存
if b.poolIndex == -1 {
return
}
b.Reset()
wantIndex := (b.Cap() + r.step - 1) / r.step
r.mux.RLock()
if wantIndex == b.poolIndex {
//内存尺寸没有扩张,直接归还
pool := r.pools[wantIndex]
r.mux.RUnlock()
pool.Put(b)
return
}
//内存尺寸已扩张,检查扩张后对应的pool是否存在
pool, ok := r.pools[wantIndex]
if ok {
r.mux.RUnlock()
//内存尺寸扩张后,对应的pool存在,说明已经有需求创建更大的内存尺寸,直接归还新的池子中进行复用
b.poolIndex = wantIndex
pool.Put(b)
return
}
//内存尺寸扩张后,对应的pool不存在,说明当前没有创建更大内存尺寸的需求,继续归还到原来的池子中,以便后续复用
pool = r.pools[b.poolIndex]
r.mux.RUnlock()
pool.Put(b)
}
既然有maxCapacity限制,那么就可以计算出最大的poolIndex,所以可以用slice代替map[int]*sync.Pool,从而去掉锁的开销。
第四个版本
package memoryPool
import (
"bytes"
"sync"
"sync/atomic"
)
type syncPool struct {
sync.Pool
used *int32
}
// BytesPool 字节切片的多级内存池
type BytesPool struct {
pools []*syncPool
maxPoolIndex int
step int
}
type Buffer struct {
*bytes.Buffer
poolIndex int
}
// NewBytesPool 创建字节切片的多级内存池
func NewBytesPool(step int, maxCapacity int) *BytesPool {
maxPoolIndex := (maxCapacity + step - 1) / step
pools := make([]*syncPool, maxPoolIndex+1)
for i := 1; i <= maxPoolIndex; i++ {
ix := i
p := syncPool{
Pool: sync.Pool{
New: func() any {
return &Buffer{
Buffer: bytes.NewBuffer(make([]byte, 0, ix*step)),
poolIndex: ix,
}
},
},
used: new(int32),
}
pools[i] = &p
}
return &BytesPool{pools: pools, step: step, maxPoolIndex: maxPoolIndex}
}
// Get 获取字节切片
func (r *BytesPool) Get(capacity int) *Buffer {
poolIndex := (capacity + r.step - 1) / r.step
if poolIndex > r.maxPoolIndex {
return &Buffer{
Buffer: bytes.NewBuffer(make([]byte, 0, capacity)),
poolIndex: -1,
}
}
atomic.StoreInt32(r.pools[poolIndex].used, 1)
return r.pools[poolIndex].Get().(*Buffer)
}
// Put 归还字节切片
func (r *BytesPool) Put(b *Buffer) {
//超过内存池管理的尺寸,不复用,避免长期占用内存
wantIndex := (b.Cap() + r.step - 1) / r.step
if wantIndex > r.maxPoolIndex {
return
}
b.Reset()
//内存尺寸没有扩张,直接归还
if wantIndex == b.poolIndex {
r.pools[wantIndex].Put(b)
return
}
//内存尺寸已扩张,检查是否有创建更大的内存尺寸的需求,有则直接归还新的池子中进行复用
pool := r.pools[wantIndex]
if atomic.LoadInt32(pool.used) == 1 {
b.poolIndex = wantIndex
pool.Put(b)
return
}
//当前没有创建更大内存尺寸的需求,继续归还到原来的池子中,以便后续复用
r.pools[b.poolIndex].Put(b)
}
这一版引入used标记是否有创建更大的内存尺寸的需求,从而决定在内尺寸扩大后如何归还到内存池中。这个used是一次性标记的,如果标记一次后,后续再也没有此类尺寸的内存需求,则又会导致归还到一个低频使用池中。
第五个版本
package memoryPool
import (
"bytes"
"sync"
"sync/atomic"
"time"
)
type syncPool struct {
sync.Pool
lastUsed *uint32
}
// BytesPool 字节切片的多级内存池
type BytesPool struct {
pools []*syncPool
maxPoolIndex int
step int
}
type Buffer struct {
*bytes.Buffer
poolIndex int
}
// NewBytesPool 创建字节切片的多级内存池
func NewBytesPool(step int, maxCapacity int) *BytesPool {
if step <= 0 || maxCapacity <= 0 {
panic("step and maxCapacity must be positive")
}
maxPoolIndex := (maxCapacity + step - 1) / step
pools := make([]*syncPool, maxPoolIndex+1)
for i := 1; i <= maxPoolIndex; i++ {
ix := i
p := syncPool{
Pool: sync.Pool{
New: func() any {
return &Buffer{
Buffer: bytes.NewBuffer(make([]byte, 0, ix*step)),
poolIndex: ix,
}
},
},
lastUsed: new(uint32),
}
pools[i] = &p
}
return &BytesPool{pools: pools, step: step, maxPoolIndex: maxPoolIndex}
}
// Get 获取字节切片
func (r *BytesPool) Get(capacity int) *Buffer {
if capacity <= 0 {
capacity = 1
}
poolIndex := (capacity + r.step - 1) / r.step
if poolIndex > r.maxPoolIndex {
return &Buffer{
Buffer: bytes.NewBuffer(make([]byte, 0, capacity)),
poolIndex: -1,
}
}
atomic.StoreUint32(r.pools[poolIndex].lastUsed, uint32(time.Now().Unix()))
return r.pools[poolIndex].Get().(*Buffer)
}
// Put 归还字节切片
func (r *BytesPool) Put(b *Buffer) {
//超过内存池管理的尺寸,不复用,避免长期占用内存
wantIndex := (b.Cap() + r.step - 1) / r.step
if wantIndex > r.maxPoolIndex {
return
}
b.Reset()
//未扩容,直接归还
if wantIndex == b.poolIndex {
r.pools[wantIndex].Put(b)
return
}
//扩容了:检查目标池是否“近期”被使用过(如 120 秒内)
pool := r.pools[wantIndex]
if uint32(time.Now().Unix())-atomic.LoadUint32(pool.lastUsed) <= 120 {
b.poolIndex = wantIndex
pool.Put(b)
return
}
//否则归还原池
r.pools[b.poolIndex].Put(b)
}
这一版本加入了检查目标池是否“近期”被使用过,如果被使用过则说明是一个高频的pool,直接归还,否则是一个低频的pool,归还到原pool中。
本作品采用《CC 协议》,转载必须注明作者和本文链接
关于 LearnKu