利用 file 相关操作写一个效率还可以的文件队列
写一个文件队列
const (
blockLen int64 = 64
// header
headOffset = 0
// version
data0offset = headOffset
// is blockLen
data1offset = 8
// is offset
data2offset = 16
headLen int64 = 64
)
type fqm struct {
queueDir string
drLock sync.Mutex
queueHandle *os.File
headerLen int64
header struct {
version int64
blockLen int64
offset int64
}
}
func (itself *fqm) VACUUM() error {
itself.drLock.Lock()
itself.drLock.Unlock()
var err error
arms.IsExistOrCreate(itself.getQueueTmpPath(), "")
tmpQueueHandle, err := os.OpenFile(itself.getQueueTmpPath(), os.O_RDWR, 0666)
if err != nil {
return err
}
// 迁移头
header := make([]byte, 64)
if _, err = itself.queueHandle.ReadAt(header, 0); err != nil {
return err
}
if _, err = tmpQueueHandle.WriteAt(header, 0); err != nil {
return err
}
mDataLen := 1024 * 1024
blockData := make([]byte, mDataLen)
var i int64
// 迁移剩余队列
for {
lastN, _ := itself.queueHandle.ReadAt(blockData, itself.header.offset*itself.header.blockLen+headLen+i*int64(mDataLen))
if lastN < mDataLen {
lastData := make([]byte, lastN)
for di := 0; di < lastN; di++ {
lastData[di] = blockData[di]
}
if _, err = tmpQueueHandle.WriteAt(lastData, headLen+i*int64(mDataLen)); err != nil {
return err
}
break
} else {
if _, err = tmpQueueHandle.WriteAt(blockData, headLen+i*int64(mDataLen)); err != nil {
return err
}
}
i += 1
}
// 新队列重制偏移量
itself.header.offset = 0
_, err = tmpQueueHandle.WriteAt(Int64ToBytes(itself.header.offset), data2offset)
if err != nil {
return err
}
_ = itself.queueHandle.Close()
if err = os.Remove(itself.getQueuePath()); err != nil {
return err
}
_ = tmpQueueHandle.Close()
if err = os.Rename(itself.getQueueTmpPath(), itself.getQueuePath()); err != nil {
return err
}
itself.queueHandle, err = os.OpenFile(itself.getQueuePath(), os.O_RDWR, 0666)
if err != nil {
return err
}
return nil
// new
// read old
// write new
// delete new
}
func (itself *fqm) getQueuePath() string {
return itself.queueDir + "/1_000_000_000.q"
}
func (itself *fqm) getQueueTmpPath() string {
return itself.queueDir + "/1_000_000_000.q.tmp"
}
func (itself *fqm) init() error {
var err error
arms.IsExistOrCreate(itself.getQueuePath(), "")
itself.queueHandle, err = os.OpenFile(itself.getQueuePath(), os.O_RDWR, 0666)
if err != nil {
return err
}
headerData := make([]byte, headLen)
n, err := itself.queueHandle.ReadAt(headerData, data1offset)
if n == 0 {
_, err = itself.queueHandle.Write(make([]byte, headLen))
_, err = itself.queueHandle.WriteAt(Int64ToBytes(itself.header.version), data0offset)
_, err = itself.queueHandle.WriteAt(Int64ToBytes(itself.header.blockLen), data1offset)
_, err = itself.queueHandle.WriteAt(Int64ToBytes(itself.header.offset), data2offset)
} else {
blockLenData := Int64ToBytes(itself.header.blockLen)
_, err = itself.queueHandle.ReadAt(blockLenData, data1offset)
itself.header.blockLen = BytesToInt64(blockLenData)
offsetData := Int64ToBytes(itself.header.offset)
_, err = itself.queueHandle.ReadAt(offsetData, data2offset)
itself.header.offset = BytesToInt64(offsetData)
}
return err
}
func (itself *fqm) push(data string) error {
itself.drLock.Lock()
defer itself.drLock.Unlock()
i := 0
// 有效表示位
unitData := make([]byte, blockLen)
unitData[i] = 1
i += 1
dataByte := []byte(data)
if len(dataByte) > 55 {
return errors.New("当前数据长度超过最大长度")
}
dataLenByte := Int64ToBytes(int64(len(dataByte)))
for _, item := range dataLenByte {
unitData[i] = item
i += 1
}
for _, item := range dataByte {
unitData[i] = item
i += 1
}
n, _ := itself.queueHandle.Seek(0, io.SeekEnd)
_, err := itself.queueHandle.WriteAt(unitData, n)
return err
}
func (itself *fqm) pop() (string, error) {
itself.drLock.Lock()
defer itself.drLock.Unlock()
dataLenByte := Int64ToBytes(int64(0))
blockOffset := itself.header.offset*itself.header.blockLen + headLen
// 验证位
//vIndex = index
// 数据长度位
lIndex := blockOffset + 1
// 数据起始位
dataIndex := blockOffset + 1 + int64(len(dataLenByte))
_, err := itself.queueHandle.ReadAt(dataLenByte, lIndex)
if err != nil {
return "", err
}
lLen := BytesToInt64(dataLenByte)
data := make([]byte, lLen)
_, err = itself.queueHandle.ReadAt(data, dataIndex)
if err != nil {
return "", err
}
itself.header.offset += 1
_, err = itself.queueHandle.WriteAt(Int64ToBytes(itself.header.offset), data2offset)
if err != nil {
return "", err
}
return cast.ToString(data), nil
}
func fqmStd(dirPath string) (*fqm, error) {
tmp := fqm{queueDir: dirPath, header: struct {
version int64
blockLen int64
offset int64
}{version: 1, blockLen: 64, offset: 0}}
err := tmp.init()
return &tmp, err
}
func Int64ToBytes(i int64) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(i))
return buf
}
func BytesToInt64(buf []byte) int64 {
if len(buf) < 8 {
buf = append(make([]byte, 8-len(buf)), buf...)
}
return int64(binary.BigEndian.Uint64(buf))
}
本作品采用《CC 协议》,转载必须注明作者和本文链接