有关 go 互斥锁和自旋锁的性能对比的疑惑

1. 运行环境

go version go1.18.5 windows/amd64

2. 问题描述?

最近需要做一个本地缓存的需求,缓存数据从 redis 读取,遂考虑到在大量并发的情况下,减少 go 协程切换的消耗,考虑用自旋锁来实现

实现的大概的思路是:

  • 大量并发请求
    • 首先获得自旋锁的 goroutine 上锁
    • 获得自旋锁的 goroutine 去 redis 读取数据到本地缓存
    • 其他未获得自旋锁的 goroutine 原地自旋等待缓存数据
  • 读取的协程读取到数据之后解锁
  • 未获得自旋锁的 goroutine 使用已加载到本地的缓存数据

但是我经过实际测试,发现用自旋锁的速度和用互斥锁的速度要差1~2个数量级,这让我很不理解?

之后我经过大量测试发现:

  • 自旋锁在 本地操作(值自增),阻塞操作(sleep 或者 channel recv)时,性能高于互斥锁
  • 在 HTTP 操作时性能和互斥锁相当
  • 在通过 “github.com/go-redis/redis/v8” 操作 redis 获取数据时,自旋锁的性能和互斥锁要差1个数量级

不知道是否有大佬可以解答一番?不吝赐教

3. 您期望得到的结果?

自旋锁的效率应该是高于互斥锁的

4. 您实际得到的结果?

自旋锁在操作 redis 时的效率要远远低于互斥锁

5. 代码实现如下

  • 自旋锁的实现
type SpinLocker uint32

func (sl *SpinLocker) Lock() {
    for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
        runtime.Gosched()
    }
}

func (sl *SpinLocker) Unlock() {
    atomic.StoreUint32((*uint32)(sl), 0)
}
  • 一些测试操作的实现
// 本地值自增操作

var localValue int

func operationLocalValue() {
    localValue++
}

// redis 读取缓存操作

var cache struct {
    value string
}

var redisCacheKey string = "go_foo_test_cache"
var redisCacheValue string = "Hello Spin Key"
var redisCtx context.Context = context.Background()
var redisClient *redis.Client = func() *redis.Client {
    client := redis.NewClient(func() *redis.Options {
        opt, err := redis.ParseURL("redis://:@127.0.0.1:6379")
        if err != nil {
            panic(err)
        }
        return opt
    }())
    _, err := client.Ping(redisCtx).Result()
    if err != nil {
        panic(err)
    }
    _, err = client.Set(redisCtx, redisCacheKey, redisCacheValue, time.Hour).Result()
    if err != nil {
        panic(err)
    }
    return client
}()

func loadCacheFromRedis() {
    cv, err := redisClient.Get(redisCtx, redisCacheKey).Result()
    if err != nil {
        panic(err)
    }
    cache.value = cv
}

// sleep 阻塞操作

func blockingGoroutine(d time.Duration) {
    time.Sleep(d)
}

// channel 阻塞操作

var channel chan int = make(chan int)

func channelSender(d time.Duration, max int) {
    t := time.NewTicker(d)
    defer t.Stop()
    c := 0
    for range t.C {
        channel <- 1
        c++
        if c > max {
            return
        }
    }
}

func channelReceiver() {
    <-channel
}

// http 请求操作

func httpServer() {
    http.HandleFunc("/sync/locker/foo", func(w http.ResponseWriter, r *http.Request) {
        w.Write([]byte("Hello Spin Locker"))
    })
    http.ListenAndServe("127.0.0.1:8000", nil)
}

func httpClient() {
    req, err := http.NewRequest("GET", "http://127.0.0.1:8000/sync/locker/foo", nil)
    if err != nil {
        panic(err)
    }
    req.Close = true
    c := http.Client{}
    _, err = c.Do(req)
    if err != nil {
        panic(err)
    }
}
  • 调用处
var spinLocker SpinLocker
var mutex sync.Mutex

// 以 HTTP 为例
func SpinLockerPerformanceOnHttpRequest(gCount int) {
    go httpServer()
    gp := sync.WaitGroup{}
    gp.Add(gCount)
    for index := 0; index != 100; index++ {
        go func() {
            spinLocker.Lock()
            httpClient()
            spinLocker.Unlock()
            gp.Done()
        }()
    }
    gp.Wait()
}

func MutexLockerPerformanceOnHttpRequest(gCount int) {
    go httpServer()
    gp := sync.WaitGroup{}
    gp.Add(gCount)
    for index := 0; index != 100; index++ {
        go func() {
            spinLocker.Lock()
            httpClient()
            spinLocker.Unlock()
            gp.Done()
        }()
    }
    gp.Wait()
}

// 以 redis 获取数据为例

func SpinLockerPerformanceOnLoadCacheFromRedis(gCount int) string {
    gp := sync.WaitGroup{}
    gp.Add(gCount)
    for index := 0; index != gCount; index++ {
        go func() {
            spinLocker.Lock()
            loadCacheFromRedis()
            spinLocker.Unlock()
            gp.Done()
        }()
    }
    gp.Wait()
    return cache.value
}

func MutexLockerPerformanceOnLoadCacheFromRedis(gCount int) string {
    gp := sync.WaitGroup{}
    gp.Add(gCount)
    for index := 0; index != gCount; index++ {
        go func() {
            mutex.Lock()
            loadCacheFromRedis()
            mutex.Unlock()
            gp.Done()
        }()
    }
    gp.Wait()
    return cache.value
}

为了免除泛型的影响,没有用泛型来写


经过评论中 singleflight 的提示,发现了我自己 在读取 redis 实现的问题,我没有从本地缓存读取,修改后的逻辑如下:

func SpinLockerPerformanceOnLoadCacheFromRedis(gCount int) (string, int32) {
    cache = struct {
        value       string
        holderCount int32
    }{}
    gp := sync.WaitGroup{}
    gp.Add(gCount)
    for index := 0; index != gCount; index++ {
        go func() {
            spinLocker.Lock()
            if len(cache.value) != 0 {
                spinLocker.Unlock()
                goto USE_CACHE
            }
            loadCacheFromRedis()
            spinLocker.Unlock()
        USE_CACHE:
            atomic.AddInt32(&cache.holderCount, 1)
            gp.Done()
        }()
    }
    gp.Wait()
    return cache.value, cache.holderCount
}

func MutexLockerPerformanceOnLoadCacheFromRedis(gCount int) (string, int32) {
    cache = struct {
        value       string
        holderCount int32
    }{}
    gp := sync.WaitGroup{}
    gp.Add(gCount)
    for index := 0; index != gCount; index++ {
        go func() {
            if len(cache.value) != 0 {
                goto USE_CACHE
            }
            mutex.Lock()
            if len(cache.value) != 0 {
                mutex.Unlock()
                goto USE_CACHE
            }
            loadCacheFromRedis()
            mutex.Unlock()
        USE_CACHE:
            atomic.AddInt32(&cache.holderCount, 1)
            gp.Done()
        }()
    }
    gp.Wait()
    return cache.value, cache.holderCount
}

// 新增 singleflight 的形式

func getValueFromRedisByKey(k string) string {
    v, err := redisClient.Get(redisCtx, k).Result()
    if err != nil {
        panic(err)
    }
    return v
}

var gsf singleflight.Group

func SingleFlightPerformanceOnLoadCacheFromRedis(gCount int) (string, int32) {
    cache = struct {
        value       string
        holderCount int32
    }{}
    gp := sync.WaitGroup{}
    gp.Add(gCount)
    for index := 0; index != gCount; index++ {
        go func() {
            _, err, _ := gsf.Do(redisCacheKey, func() (interface{}, error) {
                cache.value = getValueFromRedisByKey(redisCacheKey)
                return cache.value, nil
            })
            if err != nil {
                panic(err)
            }
            atomic.AddInt32(&cache.holderCount, 1)
            gp.Done()
        }()
    }
    gp.Wait()
    return cache.value, cache.holderCount
}

修改后的性能对比

  • mutex = singleflight 优于 spinlocker
  • mutex 的效率和 spinlocker 不再有数量级的差距,mutex 的效率是 spinlocker 的 3~4 倍

至此,仍然存有疑惑,为什么自旋锁的性能在这种场景下没有优于互斥锁?


再次测试后的对比

1w 协程内
结论仍然一样
65535 协程时
mutex = spinlocker 优于 singleflight
10w 协程时
mutex 略优于 spinlocker 优于 singleflight


感谢其他大佬提出的 go-redis 配套缓存库 “github.com/go-redis/cache/v8”
不过经过对该库的测试,发现其性能并不如直接使用 mutex
使用方式如下:

func RedisV8CachePerformanceOnLoadCacheFromRedis(gCount int) int32 {
    cacheHandler := redisCache.New(&redisCache.Options{
        Redis:      redisClient,
        LocalCache: redisCache.NewTinyLFU(1000, time.Hour),
    })
    gp := sync.WaitGroup{}
    gp.Add(gCount)
    for index := 0; index != gCount; index++ {
        go func() {
            var v string
            err := cacheHandler.Get(redisCtx, redisCacheKey, &v)
            if err != nil {
                panic(err)
            }
            if v != redisCacheValue {
                panic("result wrong")
            }
            atomic.AddInt32(&cache.holderCount, 1)
            gp.Done()
        }()
    }
    gp.Wait()
    return cache.holderCount
}

debug 其源代码,发现 Cache.getBytes 这个方法存在“缓存穿透”的问题,不知是否是我使用的方法不对,如果不对,请指出


经过大佬提示,应该用 Cache.Once 方法,修改后如下:

func RedisV8CacheOncePerformanceOnLoadCacheFromRedis(gCount int) int32 {
    cacheHandler := redisCache.New(&redisCache.Options{
        Redis:      redisClient,
        LocalCache: redisCache.NewTinyLFU(1000, time.Hour),
    })
    gp := sync.WaitGroup{}
    gp.Add(gCount)
    for index := 0; index != gCount; index++ {
        go func() {
            var v string
            err := cacheHandler.Once(&redisCache.Item{
                Ctx:   redisCtx,
                Key:   redisCacheKey,
                Value: &v,
                Do: func(i *redisCache.Item) (interface{}, error) {
                    return i.Value, nil
                },
            })
            if err != nil {
                panic(err)
            }
            if v != redisCacheValue {
                panic("result wrong")
            }
            atomic.AddInt32(&cache.holderCount, 1)
            gp.Done()
        }()
    }
    gp.Wait()
    return cache.holderCount
}

实际性能和 singleflight 的结论差不多,结论如下:

// PerformanceOnLoadCacheFromRedis

// 100 g
// - MutexLocker
// 3.853s 35118op       33606 ns/op        1618 B/op         101 allocs/op
// - go-redis/cache/v8 Once
// 2.828s 966op       1283387 ns/op      205865 B/op         439 allocs/op
// - SpinLocker
// 4.404s 350op       4841889 ns/op        1861 B/op         107 allocs/op

// 65535 g
// - MutexLocker
// 4.696s 72op      17069562 ns/op     1054311 B/op       65564 allocs/op
// - go-redis/cache/v8 Once
// 3.588s 46op      26429683 ns/op     9519285 B/op      264817 allocs/op
// - SpinLocker
// 3.669s 73op      18067181 ns/op     1168958 B/op       65810 allocs/op

// 10w g
// - MutexLocker
// 3.417s 42op      25607279 ns/op     1604059 B/op      100029 allocs/op
// - go-redis/cache/v8 Once
// 3.533s 32op      35686584 ns/op    14245437 B/op      402675 allocs/op
// - SpinLocker
// 3.543s 45op      25656558 ns/op     1701715 B/op      100233 allocs/op

但仍然没有解决“为什么在这个场景下,自旋锁的性能不如互斥锁”的疑问


具体的压力测试代码在这里

Mericustar
讨论数量: 23
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
        runtime.Gosched()
}

非大佬,你这个写的看着不像是自旋呃,而是让出协程,再次执行的时间取决于go的调度。按你说的场景,感觉可以用singleflight包

1年前 评论
Mericustar (楼主) 1年前
Mericustar (楼主) 1年前
Mericustar (楼主) 1年前
Mericustar (楼主) 1年前
Mericustar (楼主) 1年前
Mericustar (楼主) 1年前
Mericustar (楼主) 1年前

既然你都用github.com/go-redis/redis/v8

为什么不顺便用配套的github.com/go-redis/cache/v8

1年前 评论
Mericustar (楼主) 1年前
Mericustar (楼主) 1年前
renxiaotu (作者) 1年前
renxiaotu (作者) 1年前
Mericustar (楼主) 1年前
renxiaotu (作者) 1年前
Mericustar (楼主) 1年前

这个是官方对于Once方法的示例,同一个实例同一时间只会有一个调用,其它调用会等待首个调用执行完成并获取其结果

func Example_advancedUsage() {
    ring := redis.NewRing(&redis.RingOptions{
        Addrs: map[string]string{
            "server1": ":6379",
            "server2": ":6380",
        },
    })

    mycache := cache.New(&cache.Options{
        Redis:      ring,
        LocalCache: cache.NewTinyLFU(1000, time.Minute),
    })

    obj := new(Object)
    err := mycache.Once(&cache.Item{
        Key:   "mykey",
        Value: obj, // destination
        Do: func(*cache.Item) (interface{}, error) {
            return &Object{
                Str: "mystring",
                Num: 42,
            }, nil
        },
    })
    if err != nil {
        panic(err)
    }
    fmt.Println(obj)
    // Output: &{mystring 42}
}
1年前 评论
Mericustar (楼主) 1年前
Mericustar (楼主) 1年前

事实上Once用的就是singleflight,只是帮你封装好了

file

1年前 评论
Mericustar (楼主) 1年前
renxiaotu (作者) 1年前
Mericustar (楼主) 1年前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!