Arena 内存管理

原文地址:Arena 内存管理 - LynxDB 数据库官网
Gitee地址:gitee.com/bailizhang/lynxdb
GitHub地址:github.com/baili-zhang/lynxdb

因为如果每个客户端请求都向堆申请内存,那么随着请求越来越多,堆中的内存碎片也会越来越多,从而导致需要 GC 的内存空间越来越多。虽然,ZGC 的性能已经非常优秀了,但是还是让 GC 尽量少工作一些,以避免系统延迟抖动和垃圾收集时的 CPU 消耗。

Arena 类

作用:直接分配一大块内存,申请内存时,每次只能申请固定大小的内存。申请的内存用位图记录,如果某块内存被申请了,那么该内存对应的 bit 被设置为 true。如果该块内存没有被申请,则设置为 false。

public class Arena {
    private final ByteBuffer buffer;
    private final BitSet bitSet;
    private final int allocSize;
    private final int bufferCount;
    private int maxClearBit;

    public Arena(int mainBufferSize, int allocBufferSize) {
        allocSize = allocBufferSize;

        buffer = ByteBuffer.allocateDirect(mainBufferSize);
        bufferCount = mainBufferSize/ allocBufferSize;

        if(mainBufferSize % allocBufferSize != 0) {
            throw new RuntimeException();
        }

        bitSet = new BitSet(bufferCount);
        maxClearBit = 0;
    }

    public synchronized ArenaBuffer alloc() throws ArenaOverflowException {
        // 因为 alloc 内存并不是频繁发生,所以直接使用 cardinality() 计算
        if(bitSet.cardinality() == bufferCount) {
            throw new ArenaOverflowException();
        }

        int nextClearBit = bitSet.nextClearBit(maxClearBit);
        bitSet.set(nextClearBit);
        maxClearBit = (nextClearBit + 1) % bufferCount;

        ByteBuffer allocBuffer = buffer.slice(nextClearBit * allocSize, allocSize);
        return new ArenaBuffer(nextClearBit, allocBuffer);
    }

    public synchronized void dealloc(ArenaBuffer arenaBuffer) {
        int bit = arenaBuffer.bit();
        bitSet.clear(bit);
    }
}

ArenaBuffer 类

作用:ArenaBuffer 是 Arena 分配的固定大小的内存。ArenaBuffer 可以再次被分配成不固定大小的 Segment 块。

public class ArenaBuffer {
    private final int bit;
    private final ByteBuffer buffer;
    private final BitSet bitSet;

    public ArenaBuffer(int bit, ByteBuffer buffer) {
        this.bit = bit;
        this.buffer = buffer;
        this.bitSet = new BitSet(buffer.limit());
    }

    public int bit() {
        return bit;
    }

    public int position() {
        return buffer.position();
    }

    public synchronized void read(SocketChannel channel) throws IOException {
        int oldPosition = buffer.position();
        channel.read(buffer);
        int newPosition = buffer.position();

        if(oldPosition == newPosition) {
            return;
        }

        bitSet.set(oldPosition, newPosition, true);
    }

    public synchronized boolean notFull() {
        return BufferUtils.isNotOver(buffer);
    }

    public synchronized boolean isClear() {
        return bitSet.isEmpty();
    }

    public synchronized Segment alloc(int offset, int length) {
        bitSet.set(offset, offset + length, true);
        ByteBuffer segmentBuffer = buffer.slice(offset, length).asReadOnlyBuffer();
        return new Segment(this, offset, length, segmentBuffer);
    }

    public synchronized void dealloc(Segment segment) {
        int offset = segment.offset();
        int length = segment.length();
        bitSet.clear(offset, offset + length);
    }
}

Segment 类

作用:Segment 是只读的 Buffer,主要是由业务逻辑使用。

public record Segment(
        ArenaBuffer parent,
        int offset,
        int length,
        ByteBuffer buffer
) {
    public static void deallocAll(Segment[] segments) {
        Arrays.stream(segments).forEach(Segment::dealloc);
    }

    public static Buffers buffers(Segment[] segments) {
        ByteBuffer[] buffers = Arrays.stream(segments)
                .map(Segment::buffer)
                .toArray(ByteBuffer[]::new);
        return new Buffers(buffers);
    }

    public void dealloc() {
        parent.dealloc(this);
    }
}

Arena 在 LynxDB Socket 中的使用

ArenaAllocator

作用:ArenaAllocator 是一个单例的 Arena 内存分配器。

public class ArenaAllocator {
    public static final int ARENA_BUFFER_SIZE = 1024 * 8;
    private static final Arena arena = new Arena(1024 * 1024 * 1024, ARENA_BUFFER_SIZE);
    public static ArenaBuffer alloc() {
        try {
            return arena.alloc();
        } catch (ArenaOverflowException ignored) {
            // TODO 处理 Arena 溢出问题
            throw new RuntimeException();
        }
    }

    public static void dealloc(ArenaBuffer arenaBuffer) {
        arena.dealloc(arenaBuffer);
    }
}

ArenaBufferManager

作用:管理从 Arena 中分配的内存,并按照一定的规则生成 Segement。

public class ArenaBufferManager {
    private final List<ArenaBuffer> arenaBuffers = new LinkedList<>();
    private volatile int position = 0;

    public ArenaBuffer readableArenaBuffer() {
        if(arenaBuffers.isEmpty()) {
            ArenaBuffer newArenaBuffer = ArenaAllocator.alloc();
            arenaBuffers.addLast(newArenaBuffer);
            return newArenaBuffer;
        }

        ArenaBuffer arenaBuffer = arenaBuffers.getLast();
        if(arenaBuffer.notFull()) {
            return arenaBuffer;
        }

        ArenaBuffer newArenaBuffer = ArenaAllocator.alloc();
        arenaBuffers.addLast(newArenaBuffer);
        return newArenaBuffer;
    }

    public void dealloc() {
        arenaBuffers.forEach(ArenaAllocator::dealloc);
    }

    public boolean notEnoughToRead(int length) {
        if(arenaBuffers.isEmpty()) {
            throw new RuntimeException();
        }

        int size = arenaBuffers.size();
        int total = arenaBuffers.getLast().position()
                + (size - 1) * ArenaAllocator.ARENA_BUFFER_SIZE;

        return position + length > total;
    }

    public Segment[] read(int length, boolean isPositionChange) {
        // 检查数据足够吗
        if(notEnoughToRead(length)) {
            throw new RuntimeException();
        }

        int idx = position / ArenaAllocator.ARENA_BUFFER_SIZE;
        int size = arenaBuffers.size();

        List<Segment> segments = new ArrayList<>();
        int tempPosition = position, remainingLength = length;
        for(int i = idx; i < size && remainingLength > 0; i ++) {
            ArenaBuffer arenaBuffer = arenaBuffers.get(i);
            int bufferPosition = arenaBuffer.position();

            int readPosition = tempPosition % ArenaAllocator.ARENA_BUFFER_SIZE;
            int readLength = Math.min(remainingLength, bufferPosition - readPosition);

            Segment segment = arenaBuffer.alloc(readPosition, readLength);
            segments.add(segment);

            remainingLength -= readLength;
            tempPosition += readLength;
        }

        if(isPositionChange) {
            position += length;
        }

        return segments.toArray(Segment[]::new);
    }

    public int readInt(boolean isPositionChange) {
        Segment[] segments = read(INT_LENGTH, isPositionChange);
        if(segments.length == 1) {
            int value = segments[0].buffer().getInt();
            // 返还分配的内存
            Segment.deallocAll(segments);
            return value;
        }

        ByteBuffer intBuffer = BufferUtils.intByteBuffer();
        for (Segment segment : segments) {
            intBuffer.put(segment.buffer());
        }
        // 返还分配的内存
        Segment.deallocAll(segments);
        return intBuffer.rewind().getInt();
    }

    public void incrementPosition(int length) {
        position += length;
    }

    public void clearFreeBuffers() {
        ArenaBuffer arenaBuffer;
        while (!arenaBuffers.isEmpty()
                && (arenaBuffer = arenaBuffers.getFirst()).isClear()) {
            ArenaAllocator.dealloc(arenaBuffer);
            arenaBuffers.removeFirst();
            position -= ArenaAllocator.ARENA_BUFFER_SIZE;
        }
    }
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!