字节切片的多级内存池

AI摘要
本文展示了Go语言多级内存池的五个迭代版本。第一版存在池分裂问题;第二版通过记录原始索引解决分裂;第三版引入容量上限避免内存浪费;第四版用数组替代映射提升性能;第五版基于时间戳判断池使用频率,优化内存分配策略。核心演进路径:解决分裂→控制内存→提升效率→智能回收。

第一个版本

package memoryPool

import (
    "bytes"
    "sync"
)

// BytesPool 字节切片的多级内存池
type BytesPool struct {
    pools map[int]*sync.Pool
    step  int
    mux   sync.RWMutex
}

// NewBytesPool 创建字节切片的多级内存池
func NewBytesPool(step int) *BytesPool {
    return &BytesPool{
        pools: map[int]*sync.Pool{},
        step:  step,
        mux:   sync.RWMutex{},
    }
}

// Get 获取字节切片
func (r *BytesPool) Get(capacity int) *bytes.Buffer {
    poolIndex := (capacity + r.step - 1) / r.step
    r.mux.RLock()
    pool, ok := r.pools[poolIndex]
    r.mux.RUnlock()
    if ok {
        return pool.Get().(*bytes.Buffer)
    }
    return bytes.NewBuffer(make([]byte, 0, poolIndex*r.step))
}

// Put 归还字节切片
func (r *BytesPool) Put(b *bytes.Buffer) {
    poolIndex := (b.Cap() + r.step - 1) / r.step
    b.Truncate(0)
    r.mux.RLock()
    pool, ok := r.pools[poolIndex]
    r.mux.RUnlock()
    if ok {
        pool.Put(b)
        return
    }
    r.mux.Lock()
    pool, ok = r.pools[poolIndex]
    if ok {
        r.mux.Unlock()
        pool.Put(b)
        return
    }
    pool = &sync.Pool{
        New: func() any {
            return bytes.NewBuffer(make([]byte, 0, poolIndex*r.step))
        },
    }
    r.pools[poolIndex] = pool
    r.mux.Unlock()
    pool.Put(b)
}

由于GetPut 使用不同capacity计算key,会导致池分裂。
复现代码:

func TestPoolSplit(t *testing.T) {
    p := NewBytesPool(64)
    // 请求 100 B
    b1 := p.Get(100)
    t.Log("Get index=2, Cap=", b1.Cap()) // 128
    // 模拟内部扩容
    b1.Write(make([]byte, 300))
    t.Log("After write Cap=", b1.Cap()) // >=512
    // 归还
    p.Put(b1)
    // 再请求 100 B,应该复用同一个池
    b2 := p.Get(100)
    if b1 != b2 {
        t.Log("Different buffer returned → 池分裂!")
    }
}

第二个版本

package memoryPool

import (
    "bytes"
    "sync"
)

// BytesPool 字节切片的多级内存池
type BytesPool struct {
    pools map[int]*sync.Pool
    step  int
    mux   sync.RWMutex
}

type Buffer struct {
    *bytes.Buffer
    poolIndex int
}

// NewBytesPool 创建字节切片的多级内存池
func NewBytesPool(step int) *BytesPool {
    return &BytesPool{
        pools: map[int]*sync.Pool{},
        step:  step,
        mux:   sync.RWMutex{},
    }
}

// Get 获取字节切片
func (r *BytesPool) Get(capacity int) *Buffer {
    poolIndex := (capacity + r.step - 1) / r.step
    r.mux.RLock()
    pool, ok := r.pools[poolIndex]
    r.mux.RUnlock()
    if ok {
        return pool.Get().(*Buffer)
    }
    r.mux.Lock()
    pool, ok = r.pools[poolIndex]
    if !ok {
        pool = &sync.Pool{
            New: func() any {
                return &Buffer{
                    Buffer:    bytes.NewBuffer(make([]byte, 0, poolIndex*r.step)),
                    poolIndex: poolIndex,
                }
            },
        }
        r.pools[poolIndex] = pool
    }
    r.mux.Unlock()
    return pool.Get().(*Buffer)
}

// Put 归还字节切片
func (r *BytesPool) Put(b *Buffer) {
    b.Reset()
    wantIndex := (b.Cap() + r.step - 1) / r.step
    r.mux.RLock()
    if wantIndex == b.poolIndex {
        //内存尺寸没有扩张,直接归还
        pool := r.pools[wantIndex]
        r.mux.RUnlock()
        pool.Put(b)
        return
    }
    //内存尺寸已扩张,检查扩张后对应的pool是否存在
    pool, ok := r.pools[wantIndex]
    if ok {
        r.mux.RUnlock()
        //内存尺寸扩张后,对应的pool存在,说明已经有需求创建更大的内存尺寸,直接归还新的池子中进行复用
        b.poolIndex = wantIndex
        pool.Put(b)
        return
    }
    //内存尺寸扩张后,对应的pool不存在,说明当前没有创建更大内存尺寸的需求,继续归还到原来的池子中,以便后续复用
    pool = r.pools[b.poolIndex]
    r.mux.RUnlock()
    pool.Put(b)
}

改进池分裂问题:

  • 没扩张 → 回原池
  • 扩张且大池已存在 → 进大池
  • 扩张但大池未创建 → 退回原池

使用内存池时,如果大部分情况下某几个尺寸即可满足,但是偶发的会尺寸要求可能导致内存池持有超大尺寸的内存,这个超大尺寸的内存又不是高频使用的,可能导致浪费,所以需要新增最大尺寸限制。

第三个版

package memoryPool

import (
    "bytes"
    "sync"
)

// BytesPool 字节切片的多级内存池
type BytesPool struct {
    pools       map[int]*sync.Pool
    step        int
    maxCapacity int
    mux         sync.RWMutex
}

type Buffer struct {
    *bytes.Buffer
    poolIndex int
}

// NewBytesPool 创建字节切片的多级内存池
func NewBytesPool(step int, maxCapacity int) *BytesPool {
    return &BytesPool{
        pools:       map[int]*sync.Pool{},
        step:        step,
        maxCapacity: maxCapacity,
        mux:         sync.RWMutex{},
    }
}

// Get 获取字节切片
func (r *BytesPool) Get(capacity int) *Buffer {
    //超过内存池管理的尺寸,直接创建
    if capacity > r.maxCapacity {
        return &Buffer{
            Buffer:    bytes.NewBuffer(make([]byte, 0, capacity)),
            poolIndex: -1,
        }
    }
    poolIndex := (capacity + r.step - 1) / r.step
    r.mux.RLock()
    pool, ok := r.pools[poolIndex]
    r.mux.RUnlock()
    if ok {
        return pool.Get().(*Buffer)
    }
    r.mux.Lock()
    pool, ok = r.pools[poolIndex]
    if !ok {
        pool = &sync.Pool{
            New: func() any {
                return &Buffer{
                    Buffer:    bytes.NewBuffer(make([]byte, 0, poolIndex*r.step)),
                    poolIndex: poolIndex,
                }
            },
        }
        r.pools[poolIndex] = pool
    }
    r.mux.Unlock()
    return pool.Get().(*Buffer)
}

// Put 归还字节切片
func (r *BytesPool) Put(b *Buffer) {
    //超过内存池管理的尺寸,不复用,避免长期占用内存
    if b.poolIndex == -1 {
        return
    }
    b.Reset()
    wantIndex := (b.Cap() + r.step - 1) / r.step
    r.mux.RLock()
    if wantIndex == b.poolIndex {
        //内存尺寸没有扩张,直接归还
        pool := r.pools[wantIndex]
        r.mux.RUnlock()
        pool.Put(b)
        return
    }
    //内存尺寸已扩张,检查扩张后对应的pool是否存在
    pool, ok := r.pools[wantIndex]
    if ok {
        r.mux.RUnlock()
        //内存尺寸扩张后,对应的pool存在,说明已经有需求创建更大的内存尺寸,直接归还新的池子中进行复用
        b.poolIndex = wantIndex
        pool.Put(b)
        return
    }
    //内存尺寸扩张后,对应的pool不存在,说明当前没有创建更大内存尺寸的需求,继续归还到原来的池子中,以便后续复用
    pool = r.pools[b.poolIndex]
    r.mux.RUnlock()
    pool.Put(b)
}

既然有maxCapacity限制,那么就可以计算出最大的poolIndex,所以可以用slice代替map[int]*sync.Pool,从而去掉锁的开销。

第四个版本

package memoryPool

import (
    "bytes"
    "sync"
    "sync/atomic"
)

type syncPool struct {
    sync.Pool
    used *int32
}

// BytesPool 字节切片的多级内存池
type BytesPool struct {
    pools        []*syncPool
    maxPoolIndex int
    step         int
}

type Buffer struct {
    *bytes.Buffer
    poolIndex int
}

// NewBytesPool 创建字节切片的多级内存池
func NewBytesPool(step int, maxCapacity int) *BytesPool {
    maxPoolIndex := (maxCapacity + step - 1) / step
    pools := make([]*syncPool, maxPoolIndex+1)
    for i := 1; i <= maxPoolIndex; i++ {
        ix := i
        p := syncPool{
            Pool: sync.Pool{
                New: func() any {
                    return &Buffer{
                        Buffer:    bytes.NewBuffer(make([]byte, 0, ix*step)),
                        poolIndex: ix,
                    }
                },
            },
            used: new(int32),
        }
        pools[i] = &p
    }
    return &BytesPool{pools: pools, step: step, maxPoolIndex: maxPoolIndex}
}

// Get 获取字节切片
func (r *BytesPool) Get(capacity int) *Buffer {
    poolIndex := (capacity + r.step - 1) / r.step
    if poolIndex > r.maxPoolIndex {
        return &Buffer{
            Buffer:    bytes.NewBuffer(make([]byte, 0, capacity)),
            poolIndex: -1,
        }
    }
    atomic.StoreInt32(r.pools[poolIndex].used, 1)
    return r.pools[poolIndex].Get().(*Buffer)
}

// Put 归还字节切片
func (r *BytesPool) Put(b *Buffer) {
    //超过内存池管理的尺寸,不复用,避免长期占用内存
    wantIndex := (b.Cap() + r.step - 1) / r.step
    if wantIndex > r.maxPoolIndex {
        return
    }
    b.Reset()
    //内存尺寸没有扩张,直接归还
    if wantIndex == b.poolIndex {
        r.pools[wantIndex].Put(b)
        return
    }
    //内存尺寸已扩张,检查是否有创建更大的内存尺寸的需求,有则直接归还新的池子中进行复用
    pool := r.pools[wantIndex]
    if atomic.LoadInt32(pool.used) == 1 {
        b.poolIndex = wantIndex
        pool.Put(b)
        return
    }
    //当前没有创建更大内存尺寸的需求,继续归还到原来的池子中,以便后续复用
    r.pools[b.poolIndex].Put(b)
}

这一版引入used标记是否有创建更大的内存尺寸的需求,从而决定在内尺寸扩大后如何归还到内存池中。这个used是一次性标记的,如果标记一次后,后续再也没有此类尺寸的内存需求,则又会导致归还到一个低频使用池中。

第五个版本

package memoryPool

import (
    "bytes"
    "sync"
    "sync/atomic"
    "time"
)

type syncPool struct {
    sync.Pool
    lastUsed *uint32
}

// BytesPool 字节切片的多级内存池
type BytesPool struct {
    pools        []*syncPool
    maxPoolIndex int
    step         int
}

type Buffer struct {
    *bytes.Buffer
    poolIndex int
}

// NewBytesPool 创建字节切片的多级内存池
func NewBytesPool(step int, maxCapacity int) *BytesPool {
    if step <= 0 || maxCapacity <= 0 {
        panic("step and maxCapacity must be positive")
    }
    maxPoolIndex := (maxCapacity + step - 1) / step
    pools := make([]*syncPool, maxPoolIndex+1)
    for i := 1; i <= maxPoolIndex; i++ {
        ix := i
        p := syncPool{
            Pool: sync.Pool{
                New: func() any {
                    return &Buffer{
                        Buffer:    bytes.NewBuffer(make([]byte, 0, ix*step)),
                        poolIndex: ix,
                    }
                },
            },
            lastUsed: new(uint32),
        }
        pools[i] = &p
    }
    return &BytesPool{pools: pools, step: step, maxPoolIndex: maxPoolIndex}
}

// Get 获取字节切片
func (r *BytesPool) Get(capacity int) *Buffer {
    if capacity <= 0 {
        capacity = 1
    }
    poolIndex := (capacity + r.step - 1) / r.step
    if poolIndex > r.maxPoolIndex {
        return &Buffer{
            Buffer:    bytes.NewBuffer(make([]byte, 0, capacity)),
            poolIndex: -1,
        }
    }
    atomic.StoreUint32(r.pools[poolIndex].lastUsed, uint32(time.Now().Unix()))
    return r.pools[poolIndex].Get().(*Buffer)
}

// Put 归还字节切片
func (r *BytesPool) Put(b *Buffer) {
    //超过内存池管理的尺寸,不复用,避免长期占用内存
    wantIndex := (b.Cap() + r.step - 1) / r.step
    if wantIndex > r.maxPoolIndex {
        return
    }
    b.Reset()
    //未扩容,直接归还
    if wantIndex == b.poolIndex {
        r.pools[wantIndex].Put(b)
        return
    }
    //扩容了:检查目标池是否“近期”被使用过(如 120 秒内)
    pool := r.pools[wantIndex]
    if uint32(time.Now().Unix())-atomic.LoadUint32(pool.lastUsed) <= 120 {
        b.poolIndex = wantIndex
        pool.Put(b)
        return
    }
    //否则归还原池
    r.pools[b.poolIndex].Put(b)
}

这一版本加入了检查目标池是否“近期”被使用过,如果被使用过则说明是一个高频的pool,直接归还,否则是一个低频的pool,归还到原pool中。

本作品采用《CC 协议》,转载必须注明作者和本文链接
梦想星辰大海
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!