Redis重构

摘要

  1. 开发分支为: feature/cache_redis
  2. 缓存模块重构划分为2个阶段, 新版与旧版同时为开发提供服务. 原因:
    • 现有使用到redis的模块大约有几十个key,目前暂时无法评估出全部调整到新版所需的时间,所以暂时保留.
    • 新东西自然会引入新风险.需要使用一段时间.
    • 后面找个时间把现有的几十个key调整完毕后,老模块移除.
  3. 当下业内关于使用redis作为缓存技术已经成为主流.本次的模块封装是针对redis的封装,没有使用过多的多态接口等设计模式.

  4. 希望新的redis模块可以给大家带来 易读, 易写, 灵活, 效率 等方面的提升.

  5. game 和 gateway 的server 已经支持新的redis模块(只需要添加下启动配置参数即可),其他的server需要的话另行添加.

准备阶段

RedisServer

调整前: 原来我们的开发环境使用的redis server 是docker部署的集群方式.
调整后: 直接安装单实例版方式即可.

docker安装方式(普通):

// 拉取镜像
docker pull redis:latest
// 启动
docker run -p 6379:6379 --name my_redis -v /data/redis/redis.conf:/etc/redis/redis.conf  -v /data/redis/data:/var/redis_data -d redis redis-server /etc/redis/redis.conf --appendonly yes

docker安装方式(集群):
原来的集群存在外网IP连接失败问题,这里提供一个新的:
Docker-compose部署redis-cluster集群

windos安装方式:

https://github.com/MSOpenTech/redis/releases (自行下载)

配置文件

需要增加一个新的配置参数:

// 测试环境
"--cache_server_nodes", "func|192.168.0.155:6379|1,speed|192.168.0.155:6379|2,mutex|192.168.0.155:6379|3",

// 线上环境 (由于线上使用的是集群模式,只能只用0号库,所以下面的配置跟上面的配置区别主要是 DBIndex)
"--cache_server_nodes", "_cluster_|hicasino.*****.clustercfg.use2.cache.amazonaws.com:6379|0",
  1. 此参数值为 数组
  2. 每个元素的组成部分分别为:DB名称 | DB host | DB index
    • DB名称 只是给此db库起了名字
    • DB host 表示 redis服务器的地址和端口
    • DB index 表示 每个redis server 有 16个库可以被使用, 这个就是控制使用哪个库的. 通常 0号库作为测试用途,1-15作为业务用途.
  3. 通常我们提供三个元素(func,speed,mutex):
    • func : 此名称表示此db库用于功能开发,存储业务功能使用.
    • speed: 此名称表示此db库只是用于加速类的数据使用.
    • mutex: 此名称表示此db库用于分布式锁使用的key.
  4. 当配置的是集群的时候,注意:
    • 元素要保持一个且name=_cluster_, 因为是集群所以这里不需要多个元素.
    • host填一个即可. 包会自动扫描所有其他节点.

框架介绍

Key类型

  • 图中的黄色虚线范围为代码模块封装部分.

  • 模块提供如图的 Key string hash list set zset bit mutex 八个重要的基本对象,封装了大部分redis类型底层的常用命令

  • key管理着redis中的键,作为后面几个具体类型的基类.后面几个类型继承于key.

Reply类型

redis命令本身是有返回值的. 上面封装的各个类型的方法返回值(结果)几乎均由reply对象管理,reply对象本身提供了丰富的类型转换供使用,下面是部分方法列表:

高阶功能

  • 分布式锁
    微服务架构,分布式锁技术是不可或缺的. 目前业内较成熟的方案有 redis, etcd 两个方案, 当然也有自己造轮子的. 模块里面的 mutex 对象就是为了解决此问题的.
  • Pipe方法
    redis提供的pipe技术为了提高redis访问性能, 模块里的 Pipe 方法提供了方便友好的支撑.
  • Exec方法
    redis提供的exec技术为了提高redis写入性能, 模块里的 Exec 方法提供了方便友好的支撑.

使用规范

使用redis的时候,如何选择适合我们的redis类型(框架提供的这几个键类型对象)是非常重要的.

选择Key类型

思考选择合适的类型(参考):

上面只是一般情况下的选择思路. 日常开发中可能会遇到一些更复杂的管理数据,这个时候可能就需要多类型嵌套组合的方案了, 比如: 一个 hash 的 value 里面存储的是另一个 string 的 key…

相关目录(package)

  • 图中1号: 项目中管理redis数据的业务code的地方,主要包含 kes 和 对象模块, 日常开发中主要用到的目录.
  • 图中2号: 封装redis的package, redis封装模块所在地.
  • 图中3号: 编写的单元测试文件. 同时也提供了一些样例.

代码管理规范

在我们确定了要使用的redis类型之后,我们就要声明自己需要用到的key对象:

// [了解] key的基本属性
type key struct {
    DB     string        // 哪个db中(表示哪个db索引,如果redis服务采用集群部署的话=>所有db都落到0号库)
    KEY    string        // Key名称
    Coding ECoding       // 是否使用json编码
    Ttl    time.Duration // 存活时间ms(仅作存储,需要自行调用k.Overdue())
}
// [示例1]
func AllotPlayerIdKey(module string) *redis.String {
    return &redis.String{redis.Key("func", fmt.Sprintf("AllotPlayerId?module=%s", module), false)}
}
// [示例2]
func ApiLimitStageHtpKey(htp, api, stage string) *redis.Zset {
    return &redis.Zset{redis.Key("func",
        fmt.Sprintf("ApiLimitStageHtp?htp=%s&api=%s&stage=%s", htp, api, stage),
        true,
        20*time.Second)}
}

Key使用规范:

  • 定义一个函数,用来生成自己的redis key类型.

  • 此函数需要定义在 上图中的 1号目录里的keys.go文件中(internal/pkg/cache/keys.go).

  • 此函数形式: func xxxxxxKey(…) *redis.tttt {}

    • xxxx 为自己的业务命名(不要太短),
    • tttt 为redis package 里的类型(String,List,Set…)
  • 为key的属性字段赋值.

  • key.KEY 名称形式: xxxxxx?arg1=val1&arg2=val2&…

    • xxxxx 为key名称, 参数通过?符号连接, 参数之间用&符号连接, 参数与只之间用=号连接(跟http的query参数格式一样).

keys.go内容排版:

用Struct对象封装自己的Key:
有时候你可能不想赤裸裸的是用Key对象. 期望用struct封装一下相关的业务模块化代码.
可能是一组函数, 或者 struct 对象. 此时可以启用单独的.go文件存放于 cache目录下. 举个例子:

简单使用

参考单元测试代码:

高阶使用

一. OOP封装管理自己的Key

// 构造函数(供业务方生成对象)
func Manager(acct string) *manager {
    return &manager{
        key: ManagerUsersKey(),
        Acct: acct,
    }
}

// 用 Struct 封装
type manager struct {
    key *redis.Hash // key指针总是放最前面, 提高阅读性

    // 入参部分
    Acct string `json:"-"`
    ...

    // 临时数据部分
    Id       int64  `json:"id"`
    ...
}

// 封装一系列的管理key数据的方法
func (this *manager) Load() error
func (this *manager) Add(passwd, name string, role int)
func (this *manager) Xxxx(...)

二. 分布式锁使用

// 这里只是演示mutex的用法

// go 1 (假设是进程A的)
    go func() {
        defer wg.Done()

        // ====> !!!重点 get mutex
        m := SomeFunMutex("game")
        m.Lock()
        defer m.Unlock()
        // --------------

        util.Sleep(1 * time.Second)
        base += "x"
    }()

    // go 2 (假设是进程B的)
    begin := time.Now()
    diff := time.Duration(0)
    go func() {
        defer wg.Done()

        // ====> !!!重点 get mutex
        m := SomeFunMutex("game")
        if err := m.Lock(); err != nil {
            t.Error(err)
        }
        defer m.Unlock()
        // --------------

        base += "y"
        diff = time.Since(begin)
    }()
    wg.Wait()

    // 如果不使用mutex 则 base = "yx".
    if base != "xy" || diff.Seconds() < 1 {
        t.Error(`base != "xy" || diff.Seconds() < 1`, base, diff)
    }

三. redis.Pipe使用
提供批量处理redis命令的能力,非原子的. 正确使用以提高读写性能

// demo 1
redis.Pipe(
        redis.Sender(this.key, func() {this.key().Del("stage_at", "cost_time", "total_times", "error_times")}),
        redis.Sender(this.key, func() {this.key().Set("stage_at", flake.StatisAt, true)}),
        ... 可以挂多个 Sender
        )

// demo 2 (跟 demo 1的用途一样, 写法不一样)
redis.Pipe(
        redis.Sender(this.key, func() {
        this.key().Del("stage_at", "cost_time", "total_times", "error_times")
        this.key().Set("stage_at", flake.StatisAt, true)
        },func(rps []*redis.Resp){ ...在此处理上面批量执行命令的结果... }))

// demo 3
redis.Pipe(redis.Sender(this.key, func() {
            // 循环执行命令
            for u, times := range userTimes {
                this.key.Incr(u, times)
            }
        }, func(rps []*redis.Resp){ ...在此处理上面批量执行命令的结果... }))

四. redis.Exec使用
事务原子操作,统一返回结果, 正确使用以提高写性能

err := redis.Exec(redis.Sender(this.key, func() {
        util.Cast(flake.CostTime != 0, func() { this.key.IncrBy("cost_time", flake.CostTime.Milliseconds()) }, nil)
        util.Cast(flake.TotalTimes != 0, func() { this.key.IncrBy("total_times", flake.TotalTimes) }, nil)
        util.Cast(flake.ErrorTimes != 0, func() { this.key.IncrBy("error_times", flake.ErrorTimes) }, nil)
    }})

问题&坑

  • reply结果类型转换
    redis.reply 虽然提供了种类丰富的结果类型转换,但如果你本身对redis命令不够熟悉.则可能会对方法的执行结果不确定该用哪种类型. 下面提供几种思路供大家选择:

    • a. https://www.redis.com.cn/commands.html 到命令手册中找到原始的命令介绍.然后找到命令的返回值,根据返回值特点确定结果转换方法.
    • b. 使用 reply.String 方法打印出返回结果.根据结果值特征确定自己想要的转换方法.
    • c. 使用 reply.StringMap or reply.Values 方法打印出返回结果.根据结果值特征确定自己想要的转换方法.
  • TTL过期问题

    // 底层 key 结构中的 Ttl 字段 虽然管理着 此key的生命周期, 
    // 但是它并不能智能的帮你管理key的真正ttl(因为各个命令以及方法的复杂性).需要自行手动决定.
    type key struct {
      ...
      Ttl    time.Duration // 存活时间ms(仅作存储,需要自行调用k.Overdue())
    }
    
    • 目前只有 redis.String.Set 方法内部自动处理了key的ttl. 其他的key类型以及方法中不自动处理ttl.
    • 在日常开发中如果需要更新key的ttl,需要手动执行 key.Overdue() 方法:
      // 某个缓存key的管理对象中的 Update 方法
      func (this *LimitStage) Update(userTimes map[int64]int) error {
        defer this.key().Overdue() // 更新完后 更新ttl (如果缺失此code,key本身的ttl将没有影响)
        return this.key().Incr(uid, userTimes[uid]) // 更新值
      }
      
  • 集群带来的redis.Exec使用问题

    • 这个问题是分布式集群带来的,如果不是采用分布式集群的 redis server 部署方案,则不会有此问题.
    • Redis 集群使用数据分片(sharding)而非一致性哈希(consistency hashing)来实现: 一个 Redis 集群包含 16384 个哈希槽(hash slot), 数据库中的每个键都属于这 16384 个哈希槽的其中一个.

      由于 redis.Exec 是原子操作的,如果管道中的命令用到的键不是同一个,则存在被集群打散的风险. 相反如果此方法都是针对同一个键进行执行的,则不会踩到这个坑.

  • 开发环境Redis集群不完善问题
    • 在开发环境中我们.我们现在使用的是自搭建的集群,此集群存在暴漏不出外部访问IP的问题.我尝试好久没能解决.
    • 使用的新的packet连接老的本地VM中的redis-cluster会出现IP访问不了的问题.
    • 解决方案:参见 准备阶段-RedisServer章节 的本地集群部分.