为什么这个内存cache使用起来效果不理想

package cache

import (
    "sync"
    "time"
)

type Item struct {
    value      interface{}
    expiration int64
}

type Cache struct {
    items             map[string]Item
    mu                sync.RWMutex
    defaultExpiration time.Duration
    cleanupInterval   time.Duration
}

func NewCache(defaultExpiration, cleanupInterval time.Duration) *Cache {
    cache := &Cache{
        items:             make(map[string]Item),
        defaultExpiration: defaultExpiration,
        cleanupInterval:   cleanupInterval,
    }
    go cache.cleanupExpired()
    return cache
}

func (c *Cache) Set(key string, value interface{}, expiration time.Duration) {
    c.mu.Lock()
    defer c.mu.Unlock()

    var exp int64
    now := time.Now().UnixNano()
    if expiration > 0 {
        exp = now + int64(expiration)
    } else {
        exp = now + int64(c.defaultExpiration)
    }

    item := Item{
        value:      value,
        expiration: exp,
    }
    c.items[key] = item
}

func (c *Cache) Get(key string) (interface{}, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()

    item, found := c.items[key]
    if !found {
        return nil, false
    }
    if time.Now().UnixNano() > item.expiration {
        c.mu.Lock()
        defer c.mu.Unlock()
        delete(c.items, key)
        return nil, false
    }
    return item.value, true
}

func (c *Cache) cleanupExpired() {
    for {
        time.Sleep(c.cleanupInterval)
        now := time.Now().UnixNano()

        c.mu.Lock()
        for key, item := range c.items {
            if now > item.expiration {
                delete(c.items, key)
            }
        }
        c.mu.Unlock()
    }
}
func TestCache1(t *testing.T) {

    cache := NewCache(2*time.Second, 2*time.Second)

    start := time.Now()
    for i := 1; i < 9999999; i++ {
        cache.Set(fmt.Sprintf("%d", i), cast.ToString(i), 2*time.Second)
        //if i%2 == 0 {
        //    endTime := time.Now()
        //    duration := endTime.Sub(start)
        //    if duration.Milliseconds() > 100 {
        //        fmt.Println("timeUnit", duration.Milliseconds(), "ms")
        //    }
        //    start = time.Now()
        //}
        if i%100000 == 0 {
            var m runtime.MemStats
            runtime.ReadMemStats(&m)
            fmt.Println(cast.ToString(m.Alloc/1024/1024)+"MB",
                cast.ToString(m.TotalAlloc/1024/1024)+"MB")
        }
    }
    endTime := time.Now()
    duration := endTime.Sub(start)
    fmt.Println("timeUnit", duration.Milliseconds(), "ms")

}
1551MB 1940MB
1555MB 1944MB
timeUnit 5759 ms

还有一个基于sync.map的

package cache

import (
    "sync"
    "time"
)

//var cacheStd = NewCache(time.Second*5, time.Second*10)

type Item struct {
    value      interface{}
    expiration int64
}

type Cache struct {
    items             sync.Map
    defaultExpiration time.Duration
    cleanupInterval   time.Duration
}

func NewCache(defaultExpiration, cleanupInterval time.Duration) *Cache {
    cache := &Cache{
        defaultExpiration: defaultExpiration,
        cleanupInterval:   cleanupInterval,
    }
    go cache.cleanupExpired()
    return cache
}

func (c *Cache) Set(key string, value interface{}, expiration time.Duration) {
    var exp int64
    now := time.Now().UnixNano()
    if expiration > 0 {
        exp = now + int64(expiration)
    } else {
        exp = now + int64(c.defaultExpiration)
    }

    item := Item{
        value:      value,
        expiration: exp,
    }
    c.items.Store(key, item)
}

func (c *Cache) Get(key string) (interface{}, bool) {
    item, found := c.items.Load(key)
    if !found {
        return nil, false
    }
    cachedItem := item.(Item)
    if time.Now().UnixNano() > cachedItem.expiration {
        c.items.Delete(key)
        return nil, false
    }
    return cachedItem.value, true
}

func (c *Cache) cleanupExpired() {
    for {
        time.Sleep(c.cleanupInterval)
        now := time.Now().UnixNano()

        c.items.Range(func(key, value interface{}) bool {
            item := value.(Item)
            if now > item.expiration {
                c.items.Delete(key)
            }
            return true
        })
    }
}

//func GetFromCache[T any](key string, action func() T) T {
//    data, ok := cacheStd.Get(key)
//    if ok {
//        return data.(T)
//    }
//    res := action()
//    cacheStd.Set(key, res, 0)
//    return res
//}

测试结果差了一倍

2461MB 3114MB
2473MB 3126MB
timeUnit 12503 ms

想对应的java代码

package com.example.jtool.controller;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Cache {
    private ConcurrentHashMap<String, Item> items;
    private long defaultExpiration;
    private long cleanupInterval;
    private ScheduledExecutorService executor;

    public Cache(long defaultExpiration, long cleanupInterval) {
        this.items = new ConcurrentHashMap<>();
        this.defaultExpiration = defaultExpiration;
        this.cleanupInterval = cleanupInterval;
        this.executor = Executors.newSingleThreadScheduledExecutor();
        this.executor.scheduleAtFixedRate(this::cleanupExpired, cleanupInterval, cleanupInterval, TimeUnit.NANOSECONDS);
    }

    public void set(String key, Object value, long expiration) {
        long exp = expiration > 0 ? System.nanoTime() + expiration : System.nanoTime() + defaultExpiration;
        Item item = new Item(value, exp);
        items.put(key, item);
    }

    public Object get(String key) {
        Item item = items.get(key);
        if (item == null || System.nanoTime() > item.getExpiration()) {
            items.remove(key);
            return null;
        }
        return item.getValue();
    }

    private void cleanupExpired() {
        long now = System.nanoTime();
        items.forEach((key, value) -> {
            Item item = value;
            if (now > item.expiration) {
                items.remove(key);
            }
        });
    }


    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();

        // 在这里放置需要测量时间的代码

        Cache cache = new Cache(2000000000L, 20000000000L); // 5 seconds, 10 seconds
        for (Integer i = 1; i < 9999999; i++ ){
            cache.set(i.toString(), i.toString(), 2000000000L);

            if( i%100000 == 0 ){
                Runtime runtime = Runtime.getRuntime();
                long memoryUsed = runtime.totalMemory() - runtime.freeMemory();
                System.out.println("Memory used: " + memoryUsed/1024/1024 + "MB");
            }
        }
        System.out.println("end");

        long endTime = System.currentTimeMillis();
        long elapsedTime = endTime - startTime;
        System.out.println("程序运行时间:" + elapsedTime + " 毫秒");
    }
}

class Item {
    private Object value;
    public long expiration;

    public Item(Object value, long expiration) {
        this.value = value;
        this.expiration = expiration;
    }

    public Object getValue() {
        return value;
    }

    public long getExpiration() {
        return expiration;
    }
}
Memory used: 1632MB
Memory used: 1648MB
Memory used: 1664MB
Memory used: 1680MB
Memory used: 1680MB
end
程序运行时间:3020 毫秒

更加不能理解的是,go版本的cache测试过程中会有明显的阻塞感。有时候可能达到几十上百。有没有同学清楚go版本的cache哪里写的有问题

本作品采用《CC 协议》,转载必须注明作者和本文链接
biubiubiu
讨论数量: 8

测试用例不符合实际场景,既没考虑并发,又没考虑读,你试试下面这个

func TestCache2(t *testing.T) {

    cache := NewSyncCache(2*time.Second, 2*time.Second)

    start := time.Now()
    wg := sync.WaitGroup{}
    var m runtime.MemStats

    //一千并发
    for c := 1; c < 1000; c++ {
        wg.Add(1)
        go func(c int) {
            defer wg.Done()

            //每个并发写10万数据
            for i := 1; i < 100000; i++ {
                cache.Set(fmt.Sprintf("%d", i), i, 5*time.Second)
                if c == 1 && i%10000 == 0 {
                    runtime.ReadMemStats(&m)
                    fmt.Println(cast.ToString(m.Alloc/1024/1024)+"MB",
                        cast.ToString(m.TotalAlloc/1024/1024)+"MB")
                }
            }

            //每个并发读11万数据(多1万是要考虑到未命中场景)
            for i := 1; i < 110000; i++ {
                _, _ = cache.Get(fmt.Sprintf("%d", i))
                if c == 1 && i%10000 == 0 {
                    runtime.ReadMemStats(&m)
                    fmt.Println(cast.ToString(m.Alloc/1024/1024)+"MB",
                        cast.ToString(m.TotalAlloc/1024/1024)+"MB")
                }
            }
        }(c)
    }
    wg.Wait()
    runtime.ReadMemStats(&m)
    fmt.Println(cast.ToString(m.Alloc/1024/1024)+"MB",
        cast.ToString(m.TotalAlloc/1024/1024)+"MB")
    endTime := time.Now()
    duration := endTime.Sub(start)
    fmt.Println("timeUnit", duration.Milliseconds(), "ms")
}
4周前 评论
滚球兽进化 (楼主) 4周前

用开源库吧,自己造轮子没必要

4周前 评论
Oraoto

sync.Map 适用读多写少的场景,这个测试很不利。

delete(map, k) 是不会释放 map 自身占用的内存的。所以用 map 做缓存你需要定期复制 key value 到一个新的 map 里,或者 clear 全清空。

测试里过期时间和清理时间都是 2 秒,那么在第 2 秒的时候可能只清理了很少量的 key。因为 Java 插入性能好,整个测试只跑了 3 秒左右,只清了一次, cleanupExpired 基本不影响总耗时。

Go map 版本,因为插入慢,跑了 5 秒,清理 2 次,而且第二次(第4秒)要清理0-2秒几百万个 key,cleanupExpired 消耗占比会更大。

如果提前指定 map 大小,减少扩容,插入也是 3 秒左右,不过实际应用中多半不会提前知道大小。

3周前 评论
flc1125
func (c *Cache) Get(key string) (interface{}, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()

此处造成的阻塞影响很大。数据量越大,读写越频繁,锁的感觉也明显。

  • 方向一:可以考虑用 Tree 结构。所有数据存到这个 Tree 结构里,Gin 框架的路由设计用的是这个逻辑。优势就是可以减少锁的情况出现。但不足也有,就是如果数据在很深层,遍历IO会比较大,看怎么取舍。适合数据量小的。
  • 方向二:可以结合 key 做分桶,尽可能将大量的数据,分桶到N个map关系,这样,可以缩小锁的范围。举例有1000个桶,有个key的操作,也只会锁其中一个桶而已。理论上,你分的桶越多,锁的概率越低,但数据结构也会变得有点小复杂。。

没有完美的方案,基本属于空间/时间互换的逻辑。

3周前 评论

我个人认为 问题在 mutex ,看代码中 Cache 和 map公用一把锁,但是cleanupExpired 方法无时无刻都在进行时间检测,无时无刻都存在锁竞争关系。

优化建议

items map[string]Item 需要配一把单独的锁,用来给Get() Set() Delete()使用,Cache上暂时没发现持续变更字段,你的日期计算不会出现并发问题,所以外层Cache不需要锁试试。

1周前 评论
mengxin666 (作者) 1周前

上面有提到:

  • cache.cleanupExpired() 增加了锁的竞争,同时也遍历了整个 map。

    这里优化的话,可以理解一个问题,就是所有新加入的 cache 都有一个排序问题,就是总会执行到某个 key 时,剩余下的 key 都不用执行expire处理,所以这应该更像 时间轮(TimeWheel),采用 TimeWheel 来处理过期的 key,仅当 delayqueue 中有需要处理的 key 才进行加锁

  • Golang 中对于 map 的内存扩容机制,如果没有提前分配大概的长度,会根据装载因子进行扩容,需要重新申请内存,并且把旧的 key 迁移到新的 bucket 中,会占用时间

  • 增加桶以便减少锁的竞争,实际上实现起来比较复杂,实际到 如何 分配 key 到哪个 bucket,以及如果控制 bucket 数量,但这确实是优化的关键

    建议直接采用 轮子 github.com/allegro/bigcache

5天前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!