利用 file 相关操作写一个效率还可以的文件队列

写一个文件队列


const (
    blockLen int64 = 64
    // header
    headOffset = 0
    // version
    data0offset = headOffset
    // is blockLen
    data1offset = 8
    // is offset
    data2offset       = 16
    headLen     int64 = 64
)

type fqm struct {
    queueDir    string
    drLock      sync.Mutex
    queueHandle *os.File
    headerLen   int64
    header      struct {
        version  int64
        blockLen int64
        offset   int64
    }
}

func (itself *fqm) VACUUM() error {
    itself.drLock.Lock()
    itself.drLock.Unlock()
    var err error
    arms.IsExistOrCreate(itself.getQueueTmpPath(), "")
    tmpQueueHandle, err := os.OpenFile(itself.getQueueTmpPath(), os.O_RDWR, 0666)
    if err != nil {
        return err
    }
    // 迁移头
    header := make([]byte, 64)
    if _, err = itself.queueHandle.ReadAt(header, 0); err != nil {
        return err
    }
    if _, err = tmpQueueHandle.WriteAt(header, 0); err != nil {
        return err
    }
    mDataLen := 1024 * 1024
    blockData := make([]byte, mDataLen)
    var i int64
    // 迁移剩余队列
    for {

        lastN, _ := itself.queueHandle.ReadAt(blockData, itself.header.offset*itself.header.blockLen+headLen+i*int64(mDataLen))

        if lastN < mDataLen {
            lastData := make([]byte, lastN)
            for di := 0; di < lastN; di++ {
                lastData[di] = blockData[di]
            }
            if _, err = tmpQueueHandle.WriteAt(lastData, headLen+i*int64(mDataLen)); err != nil {
                return err
            }
            break
        } else {
            if _, err = tmpQueueHandle.WriteAt(blockData, headLen+i*int64(mDataLen)); err != nil {
                return err
            }
        }

        i += 1
    }
    // 新队列重制偏移量
    itself.header.offset = 0
    _, err = tmpQueueHandle.WriteAt(Int64ToBytes(itself.header.offset), data2offset)
    if err != nil {
        return err
    }
    _ = itself.queueHandle.Close()
    if err = os.Remove(itself.getQueuePath()); err != nil {
        return err
    }
    _ = tmpQueueHandle.Close()
    if err = os.Rename(itself.getQueueTmpPath(), itself.getQueuePath()); err != nil {
        return err
    }
    itself.queueHandle, err = os.OpenFile(itself.getQueuePath(), os.O_RDWR, 0666)

    if err != nil {
        return err
    }
    return nil
    // new
    // read old
    // write new
    // delete new
}

func (itself *fqm) getQueuePath() string {
    return itself.queueDir + "/1_000_000_000.q"
}
func (itself *fqm) getQueueTmpPath() string {
    return itself.queueDir + "/1_000_000_000.q.tmp"
}

func (itself *fqm) init() error {
    var err error
    arms.IsExistOrCreate(itself.getQueuePath(), "")
    itself.queueHandle, err = os.OpenFile(itself.getQueuePath(), os.O_RDWR, 0666)
    if err != nil {
        return err
    }
    headerData := make([]byte, headLen)
    n, err := itself.queueHandle.ReadAt(headerData, data1offset)
    if n == 0 {
        _, err = itself.queueHandle.Write(make([]byte, headLen))
        _, err = itself.queueHandle.WriteAt(Int64ToBytes(itself.header.version), data0offset)
        _, err = itself.queueHandle.WriteAt(Int64ToBytes(itself.header.blockLen), data1offset)
        _, err = itself.queueHandle.WriteAt(Int64ToBytes(itself.header.offset), data2offset)
    } else {

        blockLenData := Int64ToBytes(itself.header.blockLen)
        _, err = itself.queueHandle.ReadAt(blockLenData, data1offset)
        itself.header.blockLen = BytesToInt64(blockLenData)

        offsetData := Int64ToBytes(itself.header.offset)
        _, err = itself.queueHandle.ReadAt(offsetData, data2offset)
        itself.header.offset = BytesToInt64(offsetData)
    }
    return err
}

func (itself *fqm) push(data string) error {
    itself.drLock.Lock()
    defer itself.drLock.Unlock()
    i := 0
    // 有效表示位
    unitData := make([]byte, blockLen)
    unitData[i] = 1
    i += 1

    dataByte := []byte(data)
    if len(dataByte) > 55 {
        return errors.New("当前数据长度超过最大长度")
    }
    dataLenByte := Int64ToBytes(int64(len(dataByte)))

    for _, item := range dataLenByte {
        unitData[i] = item
        i += 1
    }
    for _, item := range dataByte {
        unitData[i] = item
        i += 1
    }
    n, _ := itself.queueHandle.Seek(0, io.SeekEnd)
    _, err := itself.queueHandle.WriteAt(unitData, n)
    return err
}

func (itself *fqm) pop() (string, error) {
    itself.drLock.Lock()
    defer itself.drLock.Unlock()
    dataLenByte := Int64ToBytes(int64(0))
    blockOffset := itself.header.offset*itself.header.blockLen + headLen
    // 验证位
    //vIndex = index
    // 数据长度位
    lIndex := blockOffset + 1
    // 数据起始位
    dataIndex := blockOffset + 1 + int64(len(dataLenByte))

    _, err := itself.queueHandle.ReadAt(dataLenByte, lIndex)
    if err != nil {
        return "", err
    }

    lLen := BytesToInt64(dataLenByte)
    data := make([]byte, lLen)
    _, err = itself.queueHandle.ReadAt(data, dataIndex)
    if err != nil {
        return "", err
    }

    itself.header.offset += 1
    _, err = itself.queueHandle.WriteAt(Int64ToBytes(itself.header.offset), data2offset)
    if err != nil {
        return "", err
    }

    return cast.ToString(data), nil
}

func fqmStd(dirPath string) (*fqm, error) {
    tmp := fqm{queueDir: dirPath, header: struct {
        version  int64
        blockLen int64
        offset   int64
    }{version: 1, blockLen: 64, offset: 0}}
    err := tmp.init()
    return &tmp, err
}

func Int64ToBytes(i int64) []byte {
    buf := make([]byte, 8)
    binary.BigEndian.PutUint64(buf, uint64(i))
    return buf
}

func BytesToInt64(buf []byte) int64 {
    if len(buf) < 8 {
        buf = append(make([]byte, 8-len(buf)), buf...)
    }
    return int64(binary.BigEndian.Uint64(buf))
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
biubiubiu
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!