代码解决缓存穿透和缓存雪崩问题

前言

  • 当我们去使用 redis 组合 golang 时,看到网上有关很多缓存穿透和缓存雪崩的问题,但是真正组合在一起的代码比较少,其实可以去多参考他人的代码编写出自己的模板
  • 不过像这种已经成套体系,能用现成的才是最好的(笑)

什么是缓存穿透,缓存雪崩,缓存击穿

1.缓存雪崩

大量缓存在同一时间失效

  • 解决方法: 设置缓存时间为一定范围的随机数

2.缓存穿透

缓存和数据库中都不存在(请求数据没有被缓存拦截,一直都在找数据库,但是数据库没有,所以一直找)

  • 解决方法:当第一次命中时设置 该缓存 value 为 DISABLE ,之后每次都只会打到该缓存上

3.缓存击穿

缓存失效后,有某些 key 被超高并发地访问

  • 解决方法:使用互斥锁,有锁时,等待获取

golang 缓存穿透 + 缓存雪崩 + 缓存击穿解决方法

1.第三方包

  • 数据库映射框架 orm : gorm
  • 缓存连接工具: go-redis
  • 错误封装工具: pkg/errors

2.配置结构体 + 编写缓存 key + 缓存设置时间

// 首先选好一个数据库中的表
// 缓存选用的结构是 序列化后的 string   ---
// (hash 不选是因为hash 无法同时设置过期时间,防止缓存成为一个永久缓存 -- 哈哈我才不说是因为懒)
type CatUser struct {
    ID        uint      `gorm:"column:id" json:"id"`
    CreatedAt time.Time `gorm:"column:created_at" json:"createdAt"`
    UpdatedAt time.Time `gorm:"column:updated_at" json:"updatedAt"`
    UserId    int       `gorm:"column:user_id" json:"user_id"`
    Name      string    `gorm:"column:name" json:"name"`
    Password  string    `gorm:"cloumn:password" json:"password"`
}

// RedisKey 缓存key
func (o *CatUser) RedisKey() string {
    // 这里推荐: 1.用:分隔 1.如果有能够识别唯一标识的 id ,用它 -- (用id也行)
    // user_id 能唯一标识该数据 -- 同 id 类似
    return fmt.Sprintf("cat_user:%d", o.UserId)
}

func (o *CatUser) ArrayRedisKey() string {
    return fmt.Sprintf("cat_user")
}

// 缓存时间
func (o *CatUser) RedisDuration() time.Duration {
    // 这个时候可以用随机时间 解决缓存雪崩问题
    // 设置 30 ~ 60 分钟  -- 这里记得不要设置  0 ~ n 时间,因为万一是 0 相当于没有设置
    return time.Duration((rand.Intn(60-30) + 30)) * time.Minute
}

3.编写缓存操作

1.同步缓存

// SyncToRedis 添加缓存
// 使用 序列化后的 string 类型存储 缓存
func (o *CatUser) SyncToRedis(conn *redis.Conn) error {
    if o.RedisKey() == "" {
        return errors.New("not set redis key")
    }
    buf, err := json.Marshal(o)
    if err != nil {
        return errors.WithStack(err)
    }
    if err = conn.SetEX(context.Background(), o.RedisKey(), string(buf), o.RedisDuration()).Err(); err != nil {
        return errors.WithStack(err)
    }
    return nil
}

2.获取缓存

// 获取缓存
// 1.判断是否存在 key
// 2.获取是否为空
// 3.判断是否缓存穿透
// 3.获取后反序列化

// GetFromRedis 获取缓存
func (o *CatUser) GetFromRedis(conn *redis.Conn) error {
    if o.RedisKey() == "" {
        return errors.New("not set redis key")
    }
    buf, err := conn.Get(context.Background(), o.RedisKey()).Bytes()
    if err != nil {
        if err == redis.Nil {
            return redis.Nil
        }
        return errors.WithStack(err)
    }
  // 是否出现过缓存穿透
    if string(buf) == "DISABLE" {
        return errors.New("not found data in redis nor db")
    }

    if err = json.Unmarshal(buf, o); err != nil {
        return errors.WithStack(err)
    }
    return nil
}

3.删除缓存

func (o *CatUser) DeleteFromRedis(conn *redis.Conn) error {
    if o.RedisKey() != "" {
        if err := conn.Del(context.Background(), o.RedisKey()).Err(); err != nil {
            return errors.WithStack(err)
        }
    }
    // 同时删除数组缓存
    if o.ArrayRedisKey() != "" {
        if err := conn.Del(context.Background(), o.ArrayRedisKey()).Err(); err != nil {
            return errors.WithStack(err)
        }
    }
    return nil
}

4.重点 – 获取数据

// MustGet 获取数据
// 1.先从缓存中获取
// 2.如果没找到 --找数据库 (也没找到--设置DISABLE 防止缓存穿透)
func (o *CatUser) MustGet(engine *gorm.DB, conn *redis.Conn) error {
    err := o.GetFromRedis(conn)
  // 如果为空证明找到了,提前返回不考虑后续操作
    if err == nil {
        return nil
    }

    if err != nil && err != redis.Nil {
        return errors.WithStack(err)
    }
    // 在缓存中没有找到这条数据,则从数据库中找
    var count int64
    if err = engine.Count(&count).Error; err != nil {
        return errors.WithStack(err)
    }
    // 如果 为 count =0  设置 DISABLE 防止缓存穿透
    if count == 0 {
        if err = conn.SetNX(context.Background(), o.RedisKey(), "DISABLE", o.RedisDuration()).Err(); err != nil {
            return errors.WithStack(err)
        }
        return errors.New("not found data in redis nor db")
    }

    // 这个时候找到了 -- 并且数据库中存在数据 --加锁防止缓存击穿
    // 设置 5 秒的互斥锁锁时间
    var mutex = o.RedisKey() + "_MUTEX"
    if err = conn.Get(context.Background(), mutex).Err(); err != nil {
    // 非 缓存为空 异常错误,提前报错
        if err != redis.Nil {
            return errors.WithStack(err)
        }
        // err == redis.Nil
    // 设置 5 s 的互斥锁时间
        if err = conn.SetNX(context.Background(), mutex, 1, 3*time.Second).Err(); err != nil {
            return errors.WithStack(err)
        }
        // 从数据库中查找
        if err = engine.First(&o).Error; err != nil {
            return errors.WithStack(err)
        }
        // 同步缓存
        if err = o.SyncToRedis(conn); err != nil {
            return errors.WithStack(err)
        }
        // 删除锁
        if err = conn.Del(context.Background(), mutex).Err(); err != nil {
            return errors.WithStack(err)
        }
    } else {
        // 这个时候不为空,加了锁 -- 进行循环等等待
        var index int
        for {
            if index > 10{
                return errors.New(mutex + " lock error")
            }
            if err2 := conn.Get(context.Background(), mutex).Err(); err2 != nil {
                break
            } else {
                time.Sleep(30 * time.Millisecond)
                index++
                continue
            }
        }
        if err = o.MustGet(engine, conn); err != nil {
            return errors.WithStack(err)
        }
    }
    return nil
}

5. 数组操作相同

func (o *CatUser) ArraySyncToRedis(list []CatUser, conn *redis.Conn) error {
    if o.ArrayRedisKey() == "" {
        return errors.New("not set redis key")
    }
    buf, err := json.Marshal(list)
    if err != nil {
        return errors.WithStack(err)
    }
    if err = conn.SetEX(context.Background(), o.ArrayRedisKey(), string(buf), o.RedisDuration()).Err(); err != nil {
        return errors.WithStack(err)
    }
    return nil
}

func (o *CatUser) ArrayGetFromRedis(conn *redis.Conn) ([]CatUser, error) {
    if o.RedisKey() == "" {
        return nil, errors.New("not set redis key")
    }
    buf, err := conn.Get(context.Background(), o.ArrayRedisKey()).Bytes()
    if err != nil {
        if err == redis.Nil {
            return nil, redis.Nil
        }
        return nil, errors.WithStack(err)
    }
    if string(buf) == "DISABLE" {
        return nil, errors.New("not found data in redis nor db")
    }

    var list []CatUser
    if err = json.Unmarshal(buf, &list); err != nil {
        return nil, errors.WithStack(err)
    }
    return list, nil
}

func (o *CatUser) ArrayDeleteFromRedis(conn *redis.Conn) error {
    return o.DeleteFromRedis(conn)
}

// ArrayMustGet
func (o *CatUser) ArrayMustGet(engine *gorm.DB, conn *redis.Conn) ([]CatUser, error) {
    list, err := o.ArrayGetFromRedis(conn)
    if err == nil {
        return list, nil
    }
    if err != nil && err != redis.Nil {
        return nil, errors.WithStack(err)
    }

    // not found in redis
    var count int64
    if err = engine.Count(&count).Error; err != nil {
        return nil, errors.WithStack(err)
    }
    if count == 0 {
        if err = conn.SetNX(context.Background(), o.ArrayRedisKey(), "DISABLE", o.RedisDuration()).Err(); err != nil {
            return nil, errors.WithStack(err)
        }
        return nil, errors.New("not found data in redis nor db")
    }

    var mutex = o.ArrayRedisKey() + "_MUTEX"
    if err = conn.Get(context.Background(), mutex).Err(); err != nil {
        if err != redis.Nil {
            return nil, errors.WithStack(err)
        }
        // err = redis.Nil
        if err = conn.SetNX(context.Background(), mutex, 1, 3*time.Second).Err(); err != nil {
            return nil, errors.WithStack(err)
        }
        if err = engine.Find(&list).Error; err != nil {
            return nil, errors.WithStack(err)
        }
        if err = o.ArraySyncToRedis(list, conn); err != nil {
            return nil, errors.WithStack(err)
        }
        if err = conn.Del(context.Background(), mutex).Err(); err != nil {
            return nil, errors.WithStack(err)
        }
    } else {
        var index int
        for {
            if index > 10 {
                return nil, errors.New(mutex + " lock error")
            }
            if err2 := conn.Get(context.Background(), mutex).Err(); err2 != nil {
                break
            } else {
                time.Sleep(50 * time.Millisecond)
                index++
                continue
            }
        }
        list, err = o.ArrayMustGet(engine, conn)
        if err != nil {
            return nil, errors.WithStack(err)
        }
    }
    return list, nil
}

6.单元测试

func TestCatUser(t *testing.T) {
    db := InitDb()
    conn := InitRedis()

    t.Run("single", func(t *testing.T) {
        var cu CatUser
        engine := db.Model(&CatUser{}).Where("user_id=?", 1)
        cu.UserId = 1
        cu.DeleteFromRedis(conn)
        if err := cu.MustGet(engine, conn); err != nil {
            fmt.Printf("%+v", err)
            panic(err)
        }
        fmt.Println(cu)
    })

    t.Run("list", func(t *testing.T) {
        var cu CatUser
        engine := db.Model(&CatUser{})
        list, err := cu.ArrayMustGet(engine, conn)
        if err != nil {
            panic(err)
        }
        fmt.Println(list)
    })
}

7.代码总结

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "github.com/catbugdemo/errors"
    "github.com/go-redis/redis/v8"
    "gorm.io/gorm"
    "math/rand"
    "time"
)

// 首先选好一个数据库中的表
// 缓存选用的结构是 序列化后的 string   ---
// (hash 不选是因为hash 无法同时设置过期时间,防止缓存成为一个永久缓存 -- 哈哈我才不说是因为懒)

// 确定一个结构体
type CatUser struct {
    ID        uint      `gorm:"column:id" json:"id"`
    CreatedAt time.Time `gorm:"column:created_at" json:"createdAt"`
    UpdatedAt time.Time `gorm:"column:updated_at" json:"updatedAt"`
    UserId    int       `gorm:"column:user_id" json:"user_id"`
    Name      string    `gorm:"column:name" json:"name"`
    Password  string    `gorm:"cloumn:password" json:"password"`
}

// 然后写设置缓存 -- (因为这是开始 -- 同时也最简单)

// RedisKey 缓存key
func (o *CatUser) RedisKey() string {
    // 这里推荐: 1.用:分隔 1.如果有能够识别唯一标识的 id ,用它 -- (用id也行)
    // user_id 能唯一标识该数据 -- 同 id 类似
    return fmt.Sprintf("cat_user:%d", o.UserId)
}

func (o *CatUser) ArrayRedisKey() string {
    return fmt.Sprintf("cat_user")
}

// 时间
func (o *CatUser) RedisDuration() time.Duration {
    // 根据秒来识别
    // 这个时候可以用随机时间 解决缓存雪崩问题
    // 设置 30 ~ 60 分钟  -- 这里记得不要设置  0 ~ n 时间,因为万一是 0 相当于没有设置
    // 如果是 -1 则设置为永久
    return time.Duration((rand.Intn(60-30) + 30)) * time.Minute
}

// SyncToRedis 添加缓存
// 使用 序列化后的 string 类型存储 缓存
func (o *CatUser) SyncToRedis(conn *redis.Conn) error {
    if o.RedisKey() == "" {
        return errors.New("not set redis key")
    }
    buf, err := json.Marshal(o)
    if err != nil {
        return errors.WithStack(err)
    }
    if err = conn.SetEX(context.Background(), o.RedisKey(), string(buf), o.RedisDuration()).Err(); err != nil {
        return errors.WithStack(err)
    }
    return nil
}

// 获取缓存
// 1.判断是否存在 key
// 2.获取是否为空
// 3.判断是否缓存穿透
// 3.获取后反序列化

// GetFromRedis 获取缓存
func (o *CatUser) GetFromRedis(conn *redis.Conn) error {
    if o.RedisKey() == "" {
        return errors.New("not set redis key")
    }
    buf, err := conn.Get(context.Background(), o.RedisKey()).Bytes()
    if err != nil {
        if err == redis.Nil {
            return redis.Nil
        }
        return errors.WithStack(err)
    }
    if string(buf) == "DISABLE" {
        return errors.New("not found data in redis nor db")
    }

    if err = json.Unmarshal(buf, o); err != nil {
        return errors.WithStack(err)
    }
    return nil
}

func (o *CatUser) DeleteFromRedis(conn *redis.Conn) error {
    if o.RedisKey() != "" {
        if err := conn.Del(context.Background(), o.RedisKey()).Err(); err != nil {
            return errors.WithStack(err)
        }
    }
    // 设置
    if o.ArrayRedisKey() != "" {
        if err := conn.Del(context.Background(), o.ArrayRedisKey()).Err(); err != nil {
            return errors.WithStack(err)
        }
    }
    return nil
}

// MustGet 获取数据
// 1.先从缓存中获取
// 2.如果没找到 --找数据库 (也没找到--设置DISABLE 防止缓存穿透)
func (o *CatUser) MustGet(engine *gorm.DB, conn *redis.Conn) error {
    err := o.GetFromRedis(conn)
    if err == nil {
        return nil
    }

    if err != nil && err != redis.Nil {
        return errors.WithStack(err)
    }
    // 在缓存中没有找到这条数据,则从数据库中找
    var count int64
    if err = engine.Count(&count).Error; err != nil {
        return errors.WithStack(err)
    }
    // 如果 为 count =0  设置 DISABLE 防止缓存穿透
    if count == 0 {
        if err = conn.SetNX(context.Background(), o.RedisKey(), "DISABLE", o.RedisDuration()).Err(); err != nil {
            return errors.WithStack(err)
        }
        return errors.New("not found data in redis nor db")
    }

    // 这个时候找到了 -- 并且数据库中存在数据 --加锁防止缓存击穿
    // 设置 5 秒的互斥锁锁时间
    var mutex = o.RedisKey() + "_MUTEX"
    if err = conn.Get(context.Background(), mutex).Err(); err != nil {
        if err != redis.Nil {
            return errors.WithStack(err)
        }
        // err == redis.Nil
        if err = conn.SetNX(context.Background(), mutex, 1, 5*time.Second).Err(); err != nil {
            return errors.WithStack(err)
        }
        // 从数据库中查找
        if err = engine.First(&o).Error; err != nil {
            return errors.WithStack(err)
        }
        // 同步缓存
        if err = o.SyncToRedis(conn); err != nil {
            return errors.WithStack(err)
        }
        // 删除锁
        if err = conn.Del(context.Background(), mutex).Err(); err != nil {
            return errors.WithStack(err)
        }
    } else {
        // 这个时候不为空,加了锁 -- 进行循环等等待
        var index int
        for {
            if index > 10 {
                return errors.New(mutex + " lock error")
            }
            if err2 := conn.Get(context.Background(), mutex).Err(); err2 != nil {
                break
            } else {
                time.Sleep(30 * time.Millisecond)
                index++
                continue
            }
        }
        if err = o.MustGet(engine, conn); err != nil {
            return errors.WithStack(err)
        }
    }
    return nil
}

// 同样对数组进行操作

// ArraySyncToRedis 添加缓存
// 使用 序列化后的 string 类型存储 缓存
func (o *CatUser) ArraySyncToRedis(list []CatUser, conn *redis.Conn) error {
    if o.ArrayRedisKey() == "" {
        return errors.New("not set redis key")
    }
    buf, err := json.Marshal(list)
    if err != nil {
        return errors.WithStack(err)
    }
    if err = conn.SetEX(context.Background(), o.ArrayRedisKey(), string(buf), o.RedisDuration()).Err(); err != nil {
        return errors.WithStack(err)
    }
    return nil
}

// 获取缓存
// 1.判断是否存在 key
// 2.获取是否为空
// 3.判断是否缓存穿透
// 3.获取后反序列化

// ArrayGetFromRedis 获取缓存
func (o *CatUser) ArrayGetFromRedis(conn *redis.Conn) ([]CatUser, error) {
    if o.RedisKey() == "" {
        return nil, errors.New("not set redis key")
    }
    buf, err := conn.Get(context.Background(), o.ArrayRedisKey()).Bytes()
    if err != nil {
        if err == redis.Nil {
            return nil, redis.Nil
        }
        return nil, errors.WithStack(err)
    }
    if string(buf) == "DISABLE" {
        return nil, errors.New("not found data in redis nor db")
    }

    var list []CatUser
    if err = json.Unmarshal(buf, &list); err != nil {
        return nil, errors.WithStack(err)
    }
    return list, nil
}

// ArrayDeleteFromRedis 删除缓存
func (o *CatUser) ArrayDeleteFromRedis(conn *redis.Conn) error {
    return o.DeleteFromRedis(conn)
}

// ArrayMustGet
func (o *CatUser) ArrayMustGet(engine *gorm.DB, conn *redis.Conn) ([]CatUser, error) {
    list, err := o.ArrayGetFromRedis(conn)
    if err == nil {
        return list, nil
    }
    if err != nil && err != redis.Nil {
        return nil, errors.WithStack(err)
    }

    // not found in redis
    var count int64
    if err = engine.Count(&count).Error; err != nil {
        return nil, errors.WithStack(err)
    }
    if count == 0 {
        if err = conn.SetNX(context.Background(), o.ArrayRedisKey(), "DISABLE", o.RedisDuration()).Err(); err != nil {
            return nil, errors.WithStack(err)
        }
        return nil, errors.New("not found data in redis nor db")
    }

    var mutex = o.ArrayRedisKey() + "_MUTEX"
    if err = conn.Get(context.Background(), mutex).Err(); err != nil {
        if err != redis.Nil {
            return nil, errors.WithStack(err)
        }
        // err = redis.Nil
        if err = conn.SetNX(context.Background(), mutex, 1, 5*time.Second).Err(); err != nil {
            return nil, errors.WithStack(err)
        }
        if err = engine.Find(&list).Error; err != nil {
            return nil, errors.WithStack(err)
        }
        if err = o.ArraySyncToRedis(list, conn); err != nil {
            return nil, errors.WithStack(err)
        }
        if err = conn.Del(context.Background(), mutex).Err(); err != nil {
            return nil, errors.WithStack(err)
        }
    } else {
        var index int
        for {
            if index > 10 {
                return nil, errors.New(mutex + " lock error")
            }
            if err2 := conn.Get(context.Background(), mutex).Err(); err2 != nil {
                break
            } else {
                time.Sleep(30 * time.Millisecond)
                index++
                continue
            }
        }
        list, err = o.ArrayMustGet(engine, conn)
        if err != nil {
            return nil, errors.WithStack(err)
        }
    }
    return list, nil
}

总结

  • 按照上面的方法,就能够集合 包括 缓存穿透 + 缓存雪崩 + 缓存击穿的解决方法
  • 但是每次都要写这么多,单独一个还行,多了就吃不消了
  • 没关系,我写了一个 自动生成模板,直接用就行了

自动生成模板地址

结语

  • 感谢各位读者大大的阅读

参考

github.com/fwhezfwhez/model_conver...

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 2年前 自动加精
讨论数量: 2

其实用singleflight就可以解决缓存击穿的问题 不用写这么多代码的

1年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!