Redis 技术整理

守听  金牌会员 | 2023-12-3 16:33:29 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 873|帖子 873|积分 2619

认识Redis

Redis官网:https://redis.io/
Redis诞生于2009年全称是Remote  Dictionary Server 远程词典服务器,是一个基于内存的键值型NoSQL数据库
特征

  • 键值(key-value)型,value支持多种不同数据结构,功能丰富
  • 单线程,每个命令具备原子性
  • 低延迟,速度快(基于内存.IO多路复用.良好的编码)
  • 支持数据持久化
  • 支持主从集群、分片集群
NoSQL可以翻译做Not Only SQL(不仅仅是SQL),或者是No SQL(非SQL的)数据库。是相对于传统关系型数据库而言,有很大差异的一种特殊的数据库,因此也称之为非关系型数据库
关系型数据是结构化的,即有严格要求,而NoSQL则对数据库格式没有严格约束,往往形式松散,自由
可以是键值型:

也可以是文档型:

甚至可以是图格式:

在事务方面:

  • 传统关系型数据库能满足事务ACID的原则
  • 非关系型数据库往往不支持事务,或者不能严格保证ACID的特性,只能实现基本的一致性
除了上面说的,在存储方式.扩展性.查询性能上关系型与非关系型也都有着显著差异,总结如下:


  • 存储方式

    • 关系型数据库基于磁盘进行存储,会有大量的磁盘IO,对性能有一定影响
    • 非关系型数据库,他们的操作更多的是依赖于内存来操作,内存的读写速度会非常快,性能自然会好一些



  • 扩展性

    • 关系型数据库集群模式一般是主从,主从数据一致,起到数据备份的作用,称为垂直扩展。
    • 非关系型数据库可以将数据拆分,存储在不同机器上,可以保存海量数据,解决内存大小有限的问题。称为水平扩展。
    • 关系型数据库因为表之间存在关联关系,如果做水平扩展会给数据查询带来很多麻烦

安装Redis

企业都是基于Linux服务器来部署项目,而且Redis官方也没有提供Windows版本的安装包
本文选择的Linux版本为CentOS 7
单机安装


  • 安装需要的依赖
  1. yum install -y gcc tcl
复制代码

  • 上传压缩包并解压
  1. tar -zxf redis-7.0.12.tar.gz
复制代码

  • 进入解压的redis目录
  1. cd redis-7.0.12
复制代码

  • 编译并安装
  1. make && make install
复制代码
默认的安装路径是在 /usr/local/bin目录下

该目录已经默认配置到环境变量,因此可以在任意目录下运行这些命令。其中:

  • redis-cli:是redis提供的命令行客户端
  • redis-server:是redis的服务端启动脚本
  • redis-sentinel:是redis的哨兵启动脚本
启动Redis

redis的启动方式有很多种,例如:

  • 默认启动
  • 指定配置启动
  • 开机自启
默认启动

安装完成后,在任意目录输入redis-server命令即可启动Redis:
  1. redis-server
复制代码
这种启动属于“前台启动”,会阻塞整个会话窗口,窗口关闭或者按下CTRL + C则Redis停止
指定配置启动

如果要让Redis以“后台”方式启动,则必须修改Redis配置文件,就在之前解压的redis安装包下(/usr/local/src/redis-6.2.6),名字叫redis.conf
修改redis.conf文件中的一些配置:可以先拷贝一份再修改
  1. # 允许访问的地址,默认是127.0.0.1,会导致只能在本地访问。修改为0.0.0.0则可以在任意IP访问,生产环境不要设置为0.0.0.0
  2. bind 0.0.0.0
  3. # 守护进程,修改为yes后即可后台运行
  4. daemonize yes
  5. # 密码,设置后访问Redis必须输入密码
  6. requirepass 072413
复制代码
Redis的其它常见配置:
  1. # 监听的端口
  2. port 6379
  3. # 工作目录,默认是当前目录,也就是运行redis-server时的命令,日志、持久化等文件会保存在这个目录
  4. dir .
  5. # 数据库数量,设置为1,代表只使用1个库,默认有16个库,编号0~15
  6. databases 1
  7. # 设置redis能够使用的最大内存
  8. maxmemory 512mb
  9. # 日志文件,默认为空,不记录日志,可以指定日志文件名
  10. logfile "redis.log"
复制代码
启动Redis:
  1. # 进入redis安装目录
  2. cd /opt/redis-6.2.13
  3. # 启动
  4. redis-server redis.conf
复制代码
停止服务:
  1. # 利用redis-cli来执行 shutdown 命令,即可停止 Redis 服务,
  2. # 因为之前配置了密码,因此需要通过 -u 来指定密码
  3. redis-cli -u password shutdown
复制代码
开机自启

可以通过配置来实现开机自启。
首先,新建一个系统服务文件:
  1. vim /etc/systemd/system/redis.service
复制代码
内容如下:
  1. [Unit]
  2. Description=redis-server
  3. After=network.target
  4. [Service]
  5. Type=forking
  6. ExecStart=/usr/local/bin/redis-server /opt/redis-7.0.12/redis.conf
  7. PrivateTmp=true
  8. [Install]
  9. WantedBy=multi-user.target
复制代码
然后重载系统服务:
  1. systemctl daemon-reload
复制代码
现在,我们可以用下面这组命令来操作redis了:
  1. # 启动
  2. systemctl start redis
  3. # 停止
  4. systemctl stop redis
  5. # 重启
  6. systemctl restart redis
  7. # 查看状态
  8. systemctl status redis
复制代码
执行下面的命令,可以让redis开机自启:
  1. systemctl enable redis
复制代码
主从集群安装

主:具有读写操作
从:只有读操作


  • 修改redis.conf文件
  1. # 开启RDB
  2. # save ""
  3. save 3600 1
  4. save 300 100
  5. save 60 10000
  6. # 关闭AOF
  7. appendonly no
复制代码

  • 将上面的redis.conf文件拷贝到不同地方
  1. # 方式一:逐个拷贝
  2. cp /usr/local/bin/redis-7.0.12/redis.conf /tmp/redis-7001
  3. cp /usr/local/bin/redis-7.0.12/redis.conf /tmp/redis-7002
  4. cp /usr/local/bin/redis-7.0.12/redis.conf /tmp/redis-7003
  5. # 方式二:管道组合命令,一键拷贝
  6. echo redis-7001 redis-7002 redis-7003 | xargs -t -n 1 cp /usr/local/bin/redis-7.0.12/redis.conf
复制代码

  • 修改各自的端口、rdb目录改为自己的目录
  1. sed -i -e 's/6379/7001/g' -e 's/dir .\//dir \/tmp\/redis-7001\//g' redis-7001/redis.conf
  2. sed -i -e 's/6379/7002/g' -e 's/dir .\//dir \/tmp\/redis-7002\//g' redis-7002/redis.conf
  3. sed -i -e 's/6379/7003/g' -e 's/dir .\//dir \/tmp\/redis-7003\//g' redis-7003/redis.conf
复制代码

  • 修改每个redis节点的IP声明。虚拟机本身有多个IP,为了避免将来混乱,需要在redis.conf文件中指定每一个实例的绑定ip信息,格式如下:
  1. # redis实例的声明 IP
  2. replica-announce-ip IP地址
  3. # 逐一执行
  4. sed -i '1a replica-announce-ip 192.168.150.101' redis-7001/redis.conf
  5. sed -i '1a replica-announce-ip 192.168.150.101' redis-7002/redis.conf
  6. sed -i '1a replica-announce-ip 192.168.150.101' redis-7003/redis.conf
  7. # 或者一键修改
  8. printf '%s\n' redis-7001 redis-7002 redis-7003 | xargs -I{} -t sed -i '1a replica-announce-ip 192.168.150.101' {}/redis.conf
复制代码

  • 启动
  1. # 第1个
  2. redis-server redis-7001/redis.conf
  3. # 第2个
  4. redis-server redis-7002/redis.conf
  5. # 第3个
  6. redis-server redis-7003/redis.conf
  7. # 一键停止
  8. printf '%s\n' redis-7001 redis-7002 redis-7003 | xargs -I{} -t redis-cli -p {} shutdown
复制代码

  • 开启主从关系:配置主从可以使用replicaof 或者slaveof(5.0以前)命令
永久配置:在redis.conf中添加一行配置
  1. slaveof <masterip> <masterport>
复制代码
临时配置:使用redis-cli客户端连接到redis服务,执行slaveof命令(重启后失效)
  1. # 5.0以后新增命令replicaof,与salveof效果一致slaveof <masterip> <masterport>
复制代码
卸载Redis


  • 查看redis是否启动
  1. ps aux | grep redis
复制代码

  • 若启动,则杀死进程
  1. kill -9 PID
复制代码


  • 停止服务
  1. redis-cli shutdown
复制代码

  • 查看/usr/local/lib目录中是否有与Redis相关的文件
  1. ll /usr/local/bin/redis-*
  2. # 有的话就删掉
  3. rm -rf /usr/local/bin/redis-*
复制代码
Redis客户端工具

命令行客户端

Redis安装完成后就自带了命令行客户端:redis-cli,使用方式如下:
  1. redis-cli [options] [commonds]
复制代码
其中常见的options有:

  • -h 127.0.0.1:指定要连接的redis节点的IP地址,默认是127.0.0.1
  • -p 6379:指定要连接的redis节点的端口,默认是6379
  • -a 072413:指定redis的访问密码
其中的commonds就是Redis的操作命令,例如:

  • ping:与redis服务端做心跳测试,服务端正常会返回pong
不指定commond时,会进入redis-cli的交互控制台:

图形化客户端

地址:https://github.com/uglide/RedisDesktopManager
不过该仓库提供的是RedisDesktopManager的源码,并未提供windows安装包。
在下面这个仓库可以找到安装包:https://github.com/lework/RedisDesktopManager-Windows/releases
下载之后,解压、安装


Redis默认有16个仓库,编号从0至15.  通过配置文件可以设置仓库数量,但是不超过16,并且不能自定义仓库名称。
如果是基于redis-cli连接Redis服务,可以通过select命令来选择数据库
  1. # 选择 0号库
  2. select 0
复制代码
Redis常见命令 / 对象

Redis是一个key-value的数据库,key一般是String类型,不过value的类型多种多样:

查命令的官网: https://redis.io/commands
在交互界面使用 help 命令查询:
  1. help [command]
复制代码

通用命令

通用指令是部分数据类型都可以使用的指令,常见的有:

  • KEYS:查看符合模板的所有key。在生产环境下,不推荐使用keys 命令,因为这个命令在key过多的情况下,效率不高
  • DEL:删除一个指定的key
  • EXISTS:判断key是否存在
  • EXPIRE:给一个key设置有效期,有效期到期时该key会被自动删除。内存非常宝贵,对于一些数据,我们应当给他一些过期时间,当过期时间到了之后,他就会自动被删除

    • 当使用EXPIRE给key设置的有效期过期了,那么此时查询出来的TTL结果就是-2
    • 如果没有设置过期时间,那么TTL返回值就是-1

  • TTL:查看一个KEY的剩余有效期

String命令

使用场景:

  • 验证码保存
  • 不易变动的对象保存
  • 简单锁的保存
String类型,也就是字符串类型,是Redis中最简单的存储类型
其value是字符串,不过根据字符串的格式不同,又可以分为3类:

  • string:普通字符串
  • int:整数类型,可以做自增.自减操作
  • float:浮点类型,可以做自增.自减操作

String的常见命令有:

  • SET:添加或者修改已经存在的一个String类型的键值对,对于SET,若key不存在则为添加,存在则为修改
  • GET:根据key获取String类型的value
  • MSET:批量添加多个String类型的键值对
  • MGET:根据多个key获取多个String类型的value
  • INCR:让一个整型的key自增1
  • INCRBY:让一个整型的key自增并指定步长

    • incrby num 2 让num值自增2
    • 也可以使用负数,是为减法,如:incrby num -2 让num值-2。此种类似 DECR 命令,而DECR是每次-1

  • INCRBYFLOAT:让一个浮点类型的数字自增并指定步长
  • SETNX:添加一个String类型的键值对(key不存在为添加,存在则不执行)
  • SETEX:添加一个String类型的键值对,并且指定有效期
注:以上命令除了INCRBYFLOAT 都是常用命令
key问题

key的设计

Redis没有类似MySQL中的Table的概念,我们该如何区分不同类型的key?
可以通过给key添加前缀加以区分,不过这个前缀不是随便加的,有一定的规范
Redis的key允许有多个单词形成层级结构,多个单词之间用:隔开,格式如下:

这个格式并非固定,也可以根据自己的需求来删除或添加词条
如项目名称叫 automation,有user和product两种不同类型的数据,我们可以这样定义key:

  • user相关的key:automation:user:1
  • product相关的key:automation:product:1
同时还需要满足:

  • key的长度最好别超过44字节(3.0版本是39字节)
  • key中别包含特殊字符
BigKey问题

BigKey通常“以Key的大小和Key中成员的数量来综合判定”,例如:

  • Key本身的数据量过大:一个String类型的Key,它的值为5 MB
  • Key中的成员数过多:一个ZSET类型的Key,它的成员数量为10,000个
  • Key中成员的数据量过大:一个Hash类型的Key,它的成员数量虽然只有1,000个但这些成员的Value(值)总大小为100 MB
判定元素大小的方式
  1. MEMORY USAGE key        # 查看某个key的内存大小,不建议使用:因为此命令对CPU使用率较高
  2. # 衡量值 或 值的个数
  3. STRLEN key                # string结构 某key的长度
  4. LLEN key                # list集合 某key的值的个数
  5. .............
复制代码
推荐值

  • 单个key的value小于10KB
  • 对于集合类型的key,建议元素数量小于1000
BigKey的危害

  • 网络阻塞:对BigKey执行读请求时,少量的QPS就可能导致带宽使用率被占满,导致Redis实例,乃至所在物理机变慢
  • 数据倾斜:BigKey所在的Redis实例内存使用率远超其他实例,无法使数据分片的内存资源达到均衡
  • Redis阻塞:对元素较多的hash、list、zset等做运算会耗时较旧,使主线程被阻塞
  • CPU压力:对BigKey的数据序列化和反序列化会导致CPU的使用率飙升,影响Redis实例和本机其它应用
如何发现BigKey


  • redis-cli --bigkeys 命令:redis-cli -a 密码 --bigkeys
此命令可以遍历分析所有key,并返回Key的整体统计信息与每个数据的Top1的key
不足:返回的是内存大小是TOP1的key,而此key不一定是BigKey,同时TOP2、3.......的key也不一定就不是BigKey

  • scan命令扫描:]每次会返回2个元素,第一个是下一次迭代的光标(cursor),第一次光标会设置为0,当最后一次scan 返回的光标等于0时,表示整个scan遍历结束了,第二个返回的是List,一个匹配的key的数组
  1. 127.0.0.1:7001> help SCAN
  2. SCAN cursor [MATCH pattern] [COUNT count] [TYPE type]
  3. summary: Incrementally iterate the keys space
  4. since: 2.8.0
  5. group: generic
复制代码
自定义代码来判定是否为BigKey
  1. import org.junit.jupiter.api.AfterEach;
  2. import org.junit.jupiter.api.BeforeEach;
  3. import org.junit.jupiter.api.Test;
  4. import redis.clients.jedis.Jedis;
  5. import redis.clients.jedis.ScanResult;
  6. import java.util.HashMap;
  7. import java.util.List;
  8. import java.util.Map;
  9. public class JedisTest {
  10.     private final static int STR_MAX_LEN = 10 * 1024;
  11.     private final static int HASH_MAX_LEN = 500;
  12.    
  13.     private Jedis jedis;
  14.     @BeforeEach
  15.     void setUp() {
  16.         // 1.建立连接
  17.         jedis = new Jedis("192.168.146.100", 6379);
  18.         // 2.设置密码
  19.         jedis.auth("072413");
  20.         // 3.选择库
  21.         jedis.select(0);
  22.     }
  23.     @Test
  24.     void testScan() {
  25.         int maxLen = 0;
  26.         long len = 0;
  27.         String cursor = "0";
  28.         do {
  29.             // 扫描并获取一部分key
  30.             ScanResult<String> result = jedis.scan(cursor);
  31.             // 记录cursor
  32.             cursor = result.getCursor();
  33.             List<String> list = result.getResult();
  34.             if (list == null || list.isEmpty()) {
  35.                 break;
  36.             }
  37.             // 遍历
  38.             for (String key : list) {
  39.                 // 判断key的类型
  40.                 String type = jedis.type(key);
  41.                 switch (type) {
  42.                     case "string":
  43.                         len = jedis.strlen(key);
  44.                         maxLen = STR_MAX_LEN;
  45.                         break;
  46.                     case "hash":
  47.                         len = jedis.hlen(key);
  48.                         maxLen = HASH_MAX_LEN;
  49.                         break;
  50.                     case "list":
  51.                         len = jedis.llen(key);
  52.                         maxLen = HASH_MAX_LEN;
  53.                         break;
  54.                     case "set":
  55.                         len = jedis.scard(key);
  56.                         maxLen = HASH_MAX_LEN;
  57.                         break;
  58.                     case "zset":
  59.                         len = jedis.zcard(key);
  60.                         maxLen = HASH_MAX_LEN;
  61.                         break;
  62.                     default:
  63.                         break;
  64.                 }
  65.                 if (len >= maxLen) {
  66.                     System.out.printf("Found big key : %s, type: %s, length or size: %d %n", key, type, len);
  67.                 }
  68.             }
  69.         } while (!cursor.equals("0"));
  70.     }
  71.    
  72.     @AfterEach
  73.     void tearDown() {
  74.         if (jedis != null) {
  75.             jedis.close();
  76.         }
  77.     }
  78. }
复制代码

  • 第三方工具


  • 利用第三方工具,如 Redis-Rdb-Tools 分析RDB快照文件,全面分析内存使用情况

  • 网络监控


  • 自定义工具,监控进出Redis的网络数据,超出预警值时主动告警
  • 一般阿里云搭建的云服务器就有相关监控页面

如何删除BigKey

BigKey内存占用较多,即便是删除这样的key也需要耗费很长时间,导致Redis主线程阻塞,引发一系列问题

  • redis 3.0 及以下版本:如果是集合类型,则遍历BigKey的元素,先逐个删除子元素,最后删除BigKey


  • Redis 4.0以后:使用异步删除的命令 unlink
  1. 127.0.0.1:7001> help UNLINK
  2. UNLINK key [key ...]
  3. summary: Delete a key asynchronously in another thread. Otherwise it is just as DEL, but non blocking.
  4. since: 4.0.0
  5. group: generic
复制代码
解决BigKey问题

上一节是删除BigKey,但是数据最终还是未解决
要解决BigKey:

  • 选择合适的数据结构(String、Hash、List、Set、ZSet、Stream、GEO、HyperLogLog、BitMap)
  • 将大数据拆为小数据,具体根据业务来
如:一个对象放在hash中,hash底层会使用ZipList压缩,但entry数量超过500时(看具体redis版本),会使用哈希表而不是ZipList
  1. # 查看redis的entry数量
  2. [root@zixq ~]# redis-cli
  3. 127.0.0.1:7001> config get hash-max-ziplist-entries
  4. # 修改redis的entry数量                别太离谱即可
  5. config set hash-max-ziplist-entries
复制代码
因此:一个hash的key中若是field-value约束在一定的entry以内即可,超出的就用另一个hash的key来存储,具体实现以业务来做
Hash命令

使用场景:

  • 易改变对象的保存
  • 分布式锁的保存(Redisson分布式锁的实现原理)
这个在工作中使用频率很高
Hash类型,也叫散列,其value是一个无序字典,类似于Java中的HashMap结构。
String结构是将对象序列化为JSON字符串后存储,当需要修改对象某个字段时很不方便:

Hash结构可以将对象中的每个字段独立存储,可以针对单个字段做CRUD:

Hash类型的常见命令

  • HSET key field value:添加或者修改hash类型key的field的值。同理:操作不存在数据是为新增,存在则为修改
  • HGET key field:获取一个hash类型key的field的值
  • HMSET:批量添加多个hash类型key的field的值
  • HMGET:批量获取多个hash类型key的field的值
  • HGETALL:获取一个hash类型的key中的所有的field和value
  • HKEYS:获取一个hash类型的key中的所有的field
  • HINCRBY:让一个hash类型key的field的value值自增并指定步长
  • HSETNX:添加一个hash类型的key的field值,前提是这个field不存在,否则不执行
List命令 - 命令规律开始变化

Redis中的List类型与Java中的LinkedList类似,可以看做是一个双向链表结构。既可以支持正向检索,也可以支持反向检索。
特征也与LinkedList类似:

  • 有序
  • 元素可以重复
  • 插入和删除快
  • 查询速度一般
使用场景:

  • 朋友圈点赞列表
  • 评论列表
List的常见命令有:

  • LPUSH key element ... :向列表左侧插入一个或多个元素
  • LPOP key:移除并返回列表左侧的第一个元素,没有则返回nil
  • RPUSH key element ... :向列表右侧插入一个或多个元素
  • RPOP key:移除并返回列表右侧的第一个元素
  • LRANGE key star end:返回一段角标范围内的所有元素
  • BLPOP和BRPOP:与LPOP和RPOP类似,只不过在没有元素时等待指定时间,而不是直接返回nil

Set命令

Redis的Set结构与Java中的HashSet类似,可以看做是一个value为null的HashMap。因为也是一个hash表,因此具备与HashSet类似的特征:

  • 无序
  • 元素不可重复
  • 查找快
  • 支持交集.并集.差集等功能
使用场景:

  • 一人一次的业务。如:某商品一个用户只能买一次
  • 共同拥有的业务,如:关注、取关与共同关注
Set类型的常见命令

  • SADD key member ... :向set中添加一个或多个元素
  • SREM key member ... :移除set中的指定元素
  • SCARD key:返回set中元素的个数
  • SISMEMBER key member:判断一个元素是否存在于set中
  • SMEMBERS:获取set中的所有元素
  • SINTER key1 key2 ... :求key1与key2的交集
  • SDIFF key1 key2 ... :求key1与key2的差集
  • SUNION key1 key2 ..:求key1和key2的并集
SortedSet / ZSet 命令

Redis的SortedSet是一个可排序的set集合,与Java中的TreeSet有些类似,但底层数据结构却差别很大。SortedSet中的每一个元素都带有一个score属性,可以基于score属性对元素排序,底层的实现是一个跳表(SkipList)加 hash表。
SortedSet具备下列特性:

  • 可排序
  • 元素不重复
  • 查询速度快
使用场景:

  • 排行榜
SortedSet的常见命令有:

  • ZADD key score member:添加一个或多个元素到sorted set ,如果已经存在则更新其score值
  • ZREM key member:删除sorted set中的一个指定元素
  • ZSCORE key member : 获取sorted set中的指定元素的score值
  • ZRANK key member:获取sorted set 中的指定元素的排名
  • ZCARD key:获取sorted set中的元素个数
  • ZCOUNT key min max:统计score值在给定范围内的所有元素的个数
  • ZINCRBY key increment member:让sorted set中的指定元素自增,步长为指定的increment值
  • ZRANGE key min max:按照score排序后,获取指定排名范围内的元素
  • ZRANGEBYSCORE key min max:按照score排序后,获取指定score范围内的元素
  • ZDIFF.ZINTER.ZUNION:求差集.交集.并集
注意:所有的排名默认都是升序,如果要降序则在命令的Z后面添加REV即可,例如:

  • 升序获取sorted set 中的指定元素的排名:ZRANK key member
  • 降序获取sorted set 中的指定元素的排名:ZREVRANK key memeber
Stream命令

基于Stream的消息队列-消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:


  • 创建消费者组
  1. XGROUP CREATE key groupName ID [MKSTREAM]
复制代码

  • key:队列名称
  • groupName:消费者组名称
  • ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
  • MKSTREAM:队列不存在时自动创建队列

  • 删除指定的消费者组
  1. XGROUP DESTORY key groupName
复制代码

  • 给指定的消费者组添加消费者
  1. XGROUP CREATECONSUMER key groupname consumername
复制代码

  • 删除消费者组中的指定消费者
  1. XGROUP DELCONSUMER key groupname consumername
复制代码

  • 从消费者组读取消息
  1. XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
复制代码

  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID:

    • ">":从下一个未消费的消息开始
    • 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次
消费者监听消息的基本思路:

GEO命令

GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据
常见的命令有:

  • GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
  • GEODIST:计算指定的两个点之间的距离并返回
  • GEOHASH:将指定member的坐标转为hash字符串形式并返回
  • GEOPOS:返回指定member的坐标
  • GEORADIUS:指定圆心、半径,找到该圆内包含的所有member,并按照与圆心之间的距离排序后返回。6.以后已废弃
  • GEOSEARCH:在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2.新功能
  • GEOSEARCHSTORE:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。 6.2.新功能
BitMap命令

bit:指的就是bite,二进制,里面的内容就是非0即1咯
map:就是说将适合使用0或1的业务进行关联。如:1为签到、0为未签到,这样就直接使用某bite就可表示出一个用户一个月的签到情况,减少内存花销了
BitMap底层是基于String实现的,因此:在Java中BitMap相关的操作封装到了redis的String操作中
BitMap的操作命令有:

  • SETBIT:向指定位置(offset)存入一个0或1
  • GETBIT :获取指定位置(offset)的bit值
  • BITCOUNT :统计BitMap中值为1的bit位的数量
  • BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
  • BITFIELD_RO :获取BitMap中bit数组,并以十进制形式返回
  • BITOP :将多个BitMap的结果做位运算(与 、或、异或)
  • BITPOS :查找bit数组中指定范围内第一个0或1出现的位置
HyperLogLog 命令

UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次
PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量
Hyperloglog(HLL)是从Loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值
相关算法原理大家可以参考:https://juejin.cn/post/6844903785744056333#heading-0
Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb。作为代价,其测量结果是概率性的,有小于0.81%的误差,同时此结构自带去重
常用命令如下:目前也只有这些命令

PubSub 发布订阅

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息

  • SUBSCRIBE channel [channel] :订阅一个或多个频道
  • PUBLISH channel msg :向一个频道发送消息
  • PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道。pattern支持的通配符如下:
  1. ?                        表示 一个 字符                        如:h?llo 则可以为 hallo、hxllo
  2. *                        表示 0个或N个 字符                   如:h*llo 则可以为 hllo、heeeello.........
  3. [ae]                表示 是a或e都行                        如:h[ae]llo 则可以为  hello、hallo
复制代码
优点:

  • 采用发布订阅模型,支持多生产、多消费
缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失
Java操作:Jedis

官网:https://redis.io/docs/clients/
其中Java客户端也包含很多:

标记为❤的就是推荐使用的Java客户端,包括:

  • Jedis和Lettuce:这两个主要是提供了“Redis命令对应的API”,方便我们操作Redis,而SpringDataRedis又对这两种做了抽象和封装
  • Redisson:是在Redis基础上实现了分布式的可伸缩的Java数据结构,例如Map.Queue等,而且支持跨进程的同步机制:Lock.Semaphore等待,比较适合用来实现特殊的功能需求
入门Jedis

创建Maven项目

  • 依赖
  1. <dependency>
  2.     <groupId>redis.clients</groupId>
  3.     <artifactId>jedis</artifactId>
  4.     <version>3.7.0</version>
  5. </dependency>
复制代码

  • 测试:其他类型如Hash、Set、List、SortedSet和下面String是一样的用法
  1. package com.zixieqing.redis;
  2. import org.junit.After;
  3. import org.junit.Before;
  4. import org.junit.Test;
  5. import redis.clients.jedis.Jedis;
  6. /**
  7. * jedis操作redis:redis的命令就是jedis对应的API
  8. *
  9. * @author : ZiXieqing
  10. */
  11. public class QuickStartTest {
  12.     private Jedis jedis;
  13.     @Before
  14.     public void setUp() throws Exception {
  15.         jedis = new Jedis("host", 6379);
  16.         // 设置密码
  17.         jedis.auth("072413");
  18.         // 设置库
  19.         jedis.select(0);
  20.     }
  21.     @After
  22.     public void tearDown() throws Exception {
  23.         if (null != jedis) jedis.close();
  24.     }
  25.     /**
  26.      * String类型
  27.      */
  28.     @Test
  29.     public void stringTest() {
  30.         // 添加key-value
  31.         String result = jedis.set("name", "zixieqing");
  32.         System.out.println("result = " + result);
  33.         // 通过key获取value
  34.         String value = jedis.get("name");
  35.         System.out.println("value = " + value);
  36.         // 批量添加或修改
  37.         String mset = jedis.mset("age", "18", "sex", "girl");
  38.         System.out.println("mset = " + mset);
  39.         System.out.println("jedis.keys() = " + jedis.keys("*"));
  40.         // 给key自增并指定步长
  41.         long incrBy = jedis.incrBy("age", 5L);
  42.         System.out.println("incrBy = " + incrBy);
  43.         // 若key不存在,则添加,存在则不执行
  44.         long setnx = jedis.setnx("city", "hangzhou");
  45.         System.out.println("setnx = " + setnx);
  46.         // 添加key-value,并指定有效期
  47.         String setex = jedis.setex("job", 10L, "Java");
  48.         System.out.println("setex = " + setex);
  49.         // 获取key的有效期
  50.         long ttl = jedis.ttl("job");
  51.         System.out.println("ttl = " + ttl);
  52.     }
  53. }
复制代码
连接池

Jedis本身是线程不安全的,并且频繁的创建和销毁连接会有性能损耗,推荐使用Jedis连接池代替Jedis的直连方式
  1. package com.zixieqing.redis.util;
  2. import redis.clients.jedis.Jedis;
  3. import redis.clients.jedis.JedisPool;
  4. import redis.clients.jedis.JedisPoolConfig;
  5. import java.time.Duration;
  6. /**
  7. * Jedis连接池
  8. *
  9. * @author : ZiXieqing
  10. */
  11. public class JedisConnectionFactory {
  12.     private static JedisPool jedisPool;
  13.     static {
  14.         // 设置连接池
  15.         JedisPoolConfig PoolConfig = new JedisPoolConfig();
  16.         PoolConfig.setMaxTotal(30);
  17.         PoolConfig.setMaxIdle(30);
  18.         PoolConfig.setMinIdle(0);
  19.         PoolConfig.setMaxWait(Duration.ofSeconds(1));
  20.         /*
  21.             设置链接对象
  22.             JedisPool(GenericObjectPoolConfig<Jedis> poolConfig, String host, int port, int timeout, String password)
  23.          */
  24.         jedisPool = new JedisPool(PoolConfig, "192.168.46.128", 6379, 1000, "072413");
  25.     }
  26.     public static Jedis getJedis() {
  27.         return jedisPool.getResource();
  28.     }
  29. }
复制代码
Java操作:SpringDataRedis

SpringData是Spring中数据操作的模块,包含对各种数据库的集成,其中对Redis的集成模块就叫做SpringDataRedis
官网:https://spring.io/projects/spring-data-redis

  • 提供了对不同Redis客户端的整合(Lettuce和Jedis)
  • 提供了RedisTemplate统一API来操作Redis
  • 支持Redis的发布订阅模型
  • 支持Redis哨兵和Redis集群
  • 支持基于JDK.JSON、字符串、Spring对象的数据序列化及反序列化
  • 支持基于Redis的JDKCollection实现
SpringDataRedis中提供了RedisTemplate工具类,其中封装了各种对Redis的操作。并且将不同数据类型的操作API封装到了不同的类型中:

入门SpringDataRedis

创建SpringBoot项目

  • pom.xml配置
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4.     <modelVersion>4.0.0</modelVersion>
  5.     <parent>
  6.         <groupId>org.springframework.boot</groupId>
  7.         <artifactId>spring-boot-starter-parent</artifactId>
  8.         <version>2.3.9.RELEASE</version>
  9.         <relativePath/>
  10.     </parent>
  11.     <groupId>com.zixieqing</groupId>
  12.     <artifactId>02-spring-data-redis</artifactId>
  13.     <version>0.0.1-SNAPSHOT</version>
  14.     <name>02-spring-data-redis</name>
  15.     <description>Demo project for Spring Boot</description>
  16.     <properties>
  17.         <java.version>8</java.version>
  18.     </properties>
  19.     <dependencies>
  20.         
  21.         <dependency>
  22.             <groupId>org.springframework.boot</groupId>
  23.             <artifactId>spring-boot-starter-data-redis</artifactId>
  24.         </dependency>
  25.         
  26.         <dependency>
  27.             <groupId>org.apache.commons</groupId>
  28.             <artifactId>commons-pool2</artifactId>
  29.         </dependency>
  30.         
  31.         <dependency>
  32.             <groupId>com.fasterxml.jackson.core</groupId>
  33.             <artifactId>jackson-databind</artifactId>
  34.         </dependency>
  35.         <dependency>
  36.             <groupId>org.projectlombok</groupId>
  37.             <artifactId>lombok</artifactId>
  38.             <optional>true</optional>
  39.         </dependency>
  40.         <dependency>
  41.             <groupId>org.springframework.boot</groupId>
  42.             <artifactId>spring-boot-starter-test</artifactId>
  43.             <scope>test</scope>
  44.         </dependency>
  45.     </dependencies>
  46.     <build>
  47.         <plugins>
  48.             <plugin>
  49.                 <groupId>org.springframework.boot</groupId>
  50.                 <artifactId>spring-boot-maven-plugin</artifactId>
  51.                 <configuration>
  52.                     <excludes>
  53.                         <exclude>
  54.                             <groupId>org.projectlombok</groupId>
  55.                             <artifactId>lombok</artifactId>
  56.                         </exclude>
  57.                     </excludes>
  58.                 </configuration>
  59.             </plugin>
  60.         </plugins>
  61.     </build>
  62. </project>
复制代码

  • YAML文件配置
  1. spring:
  2.   redis:
  3.     host: 192.168.46.128
  4.     port: 6379
  5.     password: "072413"
  6.     jedis:
  7.       pool:
  8.         max-active: 100 # 最大连接数
  9.         max-idle: 100 # 最大空闲数
  10.         min-idle: 0 # 最小空闲数
  11.         max-wait: 5 # 最大链接等待时间 单位:ms
复制代码

  • 测试:其他如Hash、List、Set、SortedSet的方法和下面String差不多
  1. package com.zixieqing.springdataredis;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.boot.test.context.SpringBootTest;
  4. import org.springframework.data.redis.core.RedisTemplate;
  5. import javax.annotation.Resource;
  6. import java.util.ArrayList;
  7. import java.util.Objects;
  8. import java.util.concurrent.TimeUnit;
  9. @SpringBootTest(classes = App.class)
  10. class ApplicationTests {
  11.     @Resource
  12.     private RedisTemplate<String, Object> redisTemplate;
  13.     /**
  14.      * SpringDataRedis操作redis:String类型  其他类型都是同理操作
  15.      *
  16.      * String:opsForValue
  17.      * Hash:opsForHash
  18.      * List:opsForList
  19.      * Set:opsForSet
  20.      * SortedSet:opsForZSet
  21.      */
  22.     @Test
  23.     void stringTest() {
  24.         // 添加key-value
  25.         redisTemplate.opsForValue().set("name", "紫邪情");
  26.         // 根据key获取value
  27.         String getName = Objects.requireNonNull(redisTemplate.opsForValue().get("name")).toString();
  28.         System.out.println("getName = " + getName);
  29.         // 添加key-value 并 指定有效期
  30.         redisTemplate.opsForValue().set("job", "Java", 10L, TimeUnit.SECONDS);
  31.         String getJob = Objects.requireNonNull(redisTemplate.opsForValue().get("job")).toString();
  32.         System.out.println("getJob = " + getJob);
  33.         // 就是 setnx 命令,key不存在则添加,存在则不执行
  34.         redisTemplate.opsForValue().setIfAbsent("city", "杭州");
  35.         redisTemplate.opsForValue().setIfAbsent("info", "脸皮厚,欠揍", 10L, TimeUnit.SECONDS);
  36.         ArrayList<String> keys = new ArrayList<>();
  37.         keys.add("name");
  38.         keys.add("job");
  39.         keys.add("city");
  40.         keys.add("info");
  41.         redisTemplate.delete(keys);
  42.     }
  43. }
复制代码
数据序列化

RedisTemplate可以接收Object类型作为值写入Redis:

只不过写入前会把Object序列化为字节形式,默认是采用JDK序列化,得到的结果是这样的:

缺点:

  • 可读性差
  • 内存占用较大
Jackson序列化

我们可以自定义RedisTemplate的序列化方式,代码如下:
  1. package com.zixieqing.springdataredis.config;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.data.redis.connection.RedisConnectionFactory;
  5. import org.springframework.data.redis.core.RedisTemplate;
  6. import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
  7. import org.springframework.data.redis.serializer.RedisSerializer;
  8. /**
  9. * redis自定义序列化方式
  10. *
  11. * @author : ZiXieqing
  12. */
  13. @Configuration
  14. public class RedisSerializeConfig {
  15.     @Bean
  16.     public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory){
  17.         // 创建RedisTemplate对象
  18.         RedisTemplate<String, Object> template = new RedisTemplate<>();
  19.         // 设置连接工厂
  20.         template.setConnectionFactory(connectionFactory);
  21.         // 创建JSON序列化工具
  22.         GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
  23.         // 设置Key的序列化
  24.         template.setKeySerializer(RedisSerializer.string());
  25.         template.setHashKeySerializer(RedisSerializer.string());
  26.         // 设置Value的序列化
  27.         template.setValueSerializer(jsonRedisSerializer);
  28.         template.setHashValueSerializer(jsonRedisSerializer);
  29.         // 返回
  30.         return template;
  31.     }
  32. }
复制代码
这里采用了JSON序列化来代替默认的JDK序列化方式。最终结果如图:

整体可读性有了很大提升,并且能将Java对象自动的序列化为JSON字符串,并且查询时能自动把JSON反序列化为Java对象
不过,其中记录了序列化时对应的class名称,目的是为了查询时实现自动反序列化。这会带来额外的内存开销。
StringRedisTemplate

尽管JSON的序列化方式可以满足我们的需求,但依然存在一些问题

为了在反序列化时知道对象的类型,JSON序列化器会将类的class类型写入json结果中,存入Redis,会带来额外的内存开销。
为了减少内存的消耗,我们可以采用手动序列化的方式,换句话说,就是不借助默认的序列化器,而是我们自己来控制序列化的动作,同时,我们只采用String的序列化器,这样,在存储value时,我们就不需要在内存中多存储数据,从而节约我们的内存空间

这种用法比较普遍,因此SpringDataRedis就提供了RedisTemplate的子类:StringRedisTemplate,它的key和value的序列化方式默认就是String方式


使用示例:
  1. package com.zixieqing.springdataredis;
  2. import com.fasterxml.jackson.core.JsonProcessingException;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.zixieqing.springdataredis.entity.User;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.junit.jupiter.api.Test;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.boot.test.context.SpringBootTest;
  9. import org.springframework.data.redis.core.StringRedisTemplate;
  10. import javax.annotation.Resource;
  11. import java.util.ArrayList;
  12. import java.util.Objects;
  13. import java.util.concurrent.TimeUnit;
  14. @Slf4j
  15. @SpringBootTest(classes = App.class)
  16. class ApplicationTests {
  17.     @Autowired
  18.     private StringRedisTemplate stringRedisTemplate;
  19.     // 是jackson中的
  20.     private final ObjectMapper mapper = new ObjectMapper();
  21.     /**
  22.      * 使用StringRedisTemplate操作Redis 和 序列化与反序列化
  23.      *
  24.      * 操作redis和String类型一样的
  25.      */
  26.     @Test
  27.     void serializeTest() throws JsonProcessingException {
  28.         User user = new User();
  29.         user.setName("zixieqing")
  30.                 .setJob("Java");
  31.         // 序列化
  32.         String userStr = mapper.writeValueAsString(user);
  33.         stringRedisTemplate.opsForValue().set("com:zixieqing:springdataredis:user", userStr);
  34.         // 反序列化
  35.         String userStr2 = stringRedisTemplate.opsForValue().get("com:zixieqing:springdataredis:user");
  36.         User user2 = mapper.readValue(userStr2, User.class);
  37.         log.info("反序列化结果:{}", user2);
  38.     }
  39. }
复制代码

缓存更新策略

缓存更新是redis为了节约内存而设计出来的一个东西,主要是因为内存数据宝贵,当我们向redis插入太多数据,此时就可能会导致缓存中的数据过多,所以redis会对部分数据进行淘汰

内存淘汰:redis自动进行,当redis内存达到咱们设定的max-memery的时候,会自动触发淘汰机制,淘汰掉一些不重要的数据(可以自己设置策略方式)
超时剔除:当我们给redis设置了过期时间ttl之后,redis会将超时的数据进行删除,方便咱们继续使用缓存
主动更新:我们可以手动调用方法把缓存删掉,通常用于解决缓存和数据库不一致问题
业务场景:先说结论,后面分析这些结论是怎么来的

  • 低一致性需求:使用Redis自带的内存淘汰机制
  • 高一致性需求:主动更新,并以超时剔除作为兜底方案

    • 读操作:

      • 缓存命中则直接返回
      • 缓存未命中则查询数据库,并写入缓存,设定超时时间

    • 写操作:

      • 先写数据库,然后再删除缓存
      • 要确保数据库与缓存操作的原子性(单体系统写库操作和删除缓存操作放入一个事务;分布式系统使用分布式事务管理这二者)


主动更新策略:数据库与缓存不一致问题

由于我们的缓存的数据源来自于数据库,而数据库的数据是会发生变化的。因此,如果当数据库中数据发生变化,而缓存却没有同步,此时就会有一致性问题存在,其后果是:
用户使用缓存中的过时数据,就会产生类似多线程数据安全问题,从而影响业务,产品口碑等;怎么解决呢?有如下几种方案

Cache Aside Pattern 人工编码方式:缓存调用者在更新完数据库后再去更新缓存,也称之为双写方案。这种由我们自己编写,所以可控,因此此种方式胜出
Read/Write Through Pattern : 由系统本身完成,数据库与缓存的问题交由系统本身去处理
Write Behind Caching Pattern :调用者只操作缓存,其他线程去异步处理数据库,实现最终一致
Cache Aside 人工编码 解决数据库与缓存不一致

由上一节知道数据库与缓存不一致的解决方案是 Cache Aside 人工编码,但是这个玩意儿需要考虑几个问题:

  • 删除缓存还是更新缓存?

    • 更新缓存:每次更新数据库都更新缓存,无效写操作较多
    • 删除缓存:更新数据库时让缓存失效,查询时再更新缓存(胜出)

  • 如何保证缓存与数据库的操作的同时成功或失败?

    • 单体系统,将缓存与数据库操作放在一个事务
    • 分布式系统,利用TCC等分布式事务方案

  • 先操作缓存还是先操作数据库?

    • 先删除缓存,再操作数据库
    • 先操作数据库,再删除缓存(胜出)

为什么是先操作数据库,再删除缓存?
操作数据库和操作缓存在“串行”情况下没什么太大区别,问题不大,但是:在“并发”情况下,二者就有区别,就会产生数据库与缓存数据不一致的问题
先看“先删除缓存,再操作数据库”:

再看“先操作数据库,再删除缓存”:redis操作几乎是微秒级,所以下图线程1会很快完成,然后线程2业务一般都慢一点,所以缓存中能极快地更新成数据库中的最新数据,因此这种方式虽也会发生数据不一致,但几率很小(数据库操作一般不会在微秒级别内完成)

因此:胜出的是“先操作数据库,再删除缓存”

缓存穿透及解决方式

缓存穿透:指客户端请求的数据在缓存中和数据库中都不存在。这样缓存永远不会生效,这些请求都会打到数据库
场景:如别人模仿id,然后发起大量请求,而这些id对应的数据redis中没有,然后全跑去查库,数据库压力就会增大,导致数据库扛不住而崩掉
解决方式

  • 缓存空对象:就是缓存和数据库中都没有时,直接放个空对象到缓存中,并设置有效期即可

    • 优点:实现简单,维护方便
    • 缺点:

      • 额外的内存消耗
      • 可能造成短期的不一致。一开始redis和数据库都没有,后面新增了数据,而此数据的id可能恰好对上,这样redis中存的这id的数据还是空对象


  • 布隆过滤:采用的是哈希思想来解决这个问题,通过一个庞大的二进制数组,用哈希思想去判断当前这个要查询的数据是否存在,如果布隆过滤器判断存在,则放行,这个请求会去访问redis,哪怕此时redis中的数据过期了,但是数据库中一定存在这个数据,在数据库中查询出来这个数据后,再将其放入到redis中,假设布隆过滤器判断这个数据不存在,则直接返回

    • 优点:内存占用较少,没有多余key
    • 缺点:

      • 实现复杂
      • 存在误判可能。布隆过滤器判断存在,可数据库中不一定真存在,因它采用的是哈希算法,就会产生哈希冲突


  • 增加主键id的复杂度,从而提前做好基础数据校验
  • 做用户权限认证
  • 做热点参数限流
空对象和布隆过滤的架构如下:左为空对象,右为布隆过滤

缓存空对象示例:
  1. @Service
  2. public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
  3.     @Resource
  4.     private StringRedisTemplate stringRedisTemplate;
  5.     @Override
  6.     public Result findShopById(Long id) {
  7.         String cacheKey = CACHE_SHOP_KEY + id;
  8.         // 查 redis
  9.         Map<Object, Object> shopMap = stringRedisTemplate.opsForHash().entries(cacheKey);
  10.         // 有则返回 同时需要看是否命中的是:空对象
  11.         if (!shopMap.isEmpty()) {
  12.             return Result.ok(JSONUtil.toJsonStr(shopMap));
  13.         }
  14.         // 无则查库
  15.         Shop shop = getById(id);
  16.         // 库中无
  17.         if (null == shop) {
  18.             // 向 redis 中放入 空对象,且设置有效期
  19.             Map<String, String> hashMap = new HashMap<>(16);
  20.             hashMap.put("", "");
  21.             stringRedisTemplate.opsForHash().putAll(cacheKey, hashMap);
  22.             // CACHE_NULL_TTL = 2L
  23.             stringRedisTemplate.expire(cacheKey, CACHE_NULL_TTL, TimeUnit.MINUTES);
  24.             
  25.             return Result.fail("商铺不存在");
  26.         }
  27.         // 库中有        BeanUtil 使用的是hutool工具
  28.         // 这步意思:因为Shop实例类中字段类型不是均为String,因此需要将字段值转成String,否则存入Redis时会发生 造型异常
  29.         Map<String, Object> shopMapData = BeanUtil.beanToMap(shop, new HashMap<>(16),
  30.                 CopyOptions.create()
  31.                         .ignoreNullValue()
  32.                         .setIgnoreError(false)
  33.                         .setFieldValueEditor((filedKey, filedValue) -> filedValue = filedValue + "")
  34.         );
  35.         // 写入 redis
  36.         stringRedisTemplate.opsForHash().putAll(cacheKey, shopMapData);
  37.         // 设置有效期    CACHE_SHOP_TTL = 30L
  38.         stringRedisTemplate.expire(cacheKey, CACHE_SHOP_TTL, TimeUnit.MINUTES);
  39.         // 返回客户端
  40.         return Result.ok(JSONUtil.toJsonStr(shop));
  41.     }
  42. }
复制代码
缓存雪崩及解决方式

缓存雪崩:指在同一时段大量的缓存key同时失效 或 Redis服务宕机,导致大量请求到达数据库,带来巨大压力

解决方案

  • 给不同的Key的TTL添加随机值
  • 利用Redis集群提高服务的可用性
  • 给缓存业务添加降级限流策略
  • 给业务添加多级缓存
缓存击穿及解决方式

缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击

常见的解决方案有两种:

  • 互斥锁
  • 逻辑过期

互斥锁 - 保一致

互斥锁:保一致性,会让线程阻塞,有死锁风险
本质:利用了String的setnx指令;key不存在则添加,存在则不操作

示例:下列逻辑该封装则封装即可
  1. public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
  2.     @Resource
  3.     private StringRedisTemplate stringRedisTemplate;
  4.     @Override
  5.     public Result queryShopById(Long id) {
  6.         String cacheKey = CACHE_SHOP_KEY + id;
  7.         // 查 redis
  8.         Map<Object, Object> shopMap = stringRedisTemplate.opsForHash().entries(cacheKey);
  9.         // redis 中有责返回
  10.         if (!shopMap.isEmpty()) {
  11.             Shop shop = BeanUtil.fillBeanWithMap(shopMap, new Shop(), false);
  12.             return Result.ok(shop);
  13.         }
  14.         Shop shop = null;
  15.         try {
  16.             // 无则获取 互斥锁
  17.             Boolean res = stringRedisTemplate
  18.                     .opsForValue()
  19.                     .setIfAbsent(LOCK_SHOP_KEY + id, UUID.randomUUID().toString(true), LOCK_SHOP_TTL, TimeUnit.MINUTES);
  20.             boolean flag = BooleanUtil.isTrue(res);
  21.             // 获取失败则等一会儿再试
  22.             if (!flag) {
  23.                 Thread.sleep(20);
  24.                 return queryShopById(id);
  25.             }
  26.             // 获取锁成功则查 redis 此时有没有,从而减少缓存重建
  27.             Map<Object, Object> shopMa = stringRedisTemplate.opsForHash().entries(cacheKey);
  28.             // redis 中有责返回
  29.             if (!shopMa.isEmpty()) {
  30.                 shop = BeanUtil.fillBeanWithMap(shopMa, new Shop(), false);
  31.                 return Result.ok(shop);
  32.             }
  33.             // 有则返回,无则查库
  34.             shop = getById(id);
  35.             // 库中无
  36.             if (null == shop) {
  37.                 // 向 redis放入 空值,并设置有效期
  38.                 Map<String, String> hashMap = new HashMap<>(16);
  39.                 hashMap.put("", "");
  40.                 stringRedisTemplate.opsForHash().putAll(cacheKey, hashMap);
  41.                 stringRedisTemplate.expire(cacheKey, 2L, TimeUnit.MINUTES);
  42.                 return Result.fail("无此数据");
  43.             }
  44.             // 库中有则写入 redis,并设置有效期
  45.             Map<String, Object> sMap = BeanUtil.beanToMap(shop, new HashMap<>(16),
  46.                     CopyOptions.create()
  47.                             .ignoreNullValue()
  48.                             .setIgnoreError(false)
  49.                             .setFieldValueEditor((filedKey, filedValue) -> filedValue = filedValue + "")
  50.             );
  51.             stringRedisTemplate.opsForHash().putAll(cacheKey, sMap);
  52.             stringRedisTemplate.expire(cacheKey, CACHE_SHOP_TTL, TimeUnit.MINUTES);
  53.         } catch (InterruptedException e) {
  54.             throw new RuntimeException(e);
  55.         } finally {
  56.             // 释放锁
  57.             stringRedisTemplate.delete(LOCK_SHOP_KEY + id);
  58.         }
  59.         //返回客户端
  60.         return Result.ok(shop);
  61.     }
  62. }
复制代码
逻辑过期 - 保性能

这玩意儿在互斥锁的基础上再变动一下即可
逻辑过期:不保一致性,性能好,有额外内存消耗,会造成短暂的数据不一致
本质:数据不过期,一直在Redis中,只是程序员自己使用过期字段和当前时间来判定是否过期,过期则获取“互斥锁”,获取锁成功(此时可以再判断一下Redis中的数据是否过期,减少缓存重建),则开线程重建缓存即可

示例:
  1. @Data
  2. @Accessors(chain = true)
  3. public class RedisData {
  4.     private LocalDateTime expireTime;
  5.     private Object data;
  6. }
复制代码
  1. @Service
  2. public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
  3.     @Resource
  4.     private StringRedisTemplate stringRedisTemplate;
  5.     private static final ExecutorService EXECUTORS = Executors.newFixedThreadPool(10);
  6.     @Override
  7.     public Result queryShopById(Long id) {
  8.         // 使用互斥锁解决 缓存击穿
  9.         // return cacheBreakDownWithMutex(id);
  10.         String cacheKey = CACHE_SHOP_KEY + id;
  11.         // 查 redis
  12.         String shopJson = stringRedisTemplate.opsForValue().get(cacheKey);
  13.         // redis 中没有则报错(理论上是一直存在redis中的,逻辑过期而已,所以这一步不用判断都可以)
  14.         if (StrUtil.isBlank(shopJson)) {
  15.             return Result.fail("无此数据");
  16.         }
  17.         // redis 中有,则看是否过期
  18.         RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
  19.         LocalDateTime expireTime = redisData.getExpireTime();
  20.         Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
  21.         // 没过期,直接返回数据
  22.         if (expireTime.isAfter(LocalDateTime.now())) {
  23.             return Result.ok(shop);
  24.         }
  25.         try {
  26.             // 获取互斥锁    LOCK_SHOP_TTL = 10L
  27.             Boolean res = stringRedisTemplate
  28.                     .opsForValue()
  29.                     .setIfAbsent(LOCK_SHOP_KEY + id, UUID.randomUUID().toString(true),
  30.                             LOCK_SHOP_TTL, TimeUnit.SECONDS);
  31.             boolean flag = BooleanUtil.isTrue(res);
  32.             // 获取锁失败则眯一会儿再尝试
  33.             if (!flag) {
  34.                 Thread.sleep(20);
  35.                 return queryShopById(id);
  36.             }
  37.             // 获取锁成功
  38.             // 再看 redis 中的数据是否过期,减少缓存重建
  39.             shopJson = stringRedisTemplate.opsForValue().get(cacheKey);
  40.             redisData = JSONUtil.toBean(shopJson, RedisData.class);
  41.             expireTime = redisData.getExpireTime();
  42.             shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
  43.             // 已过期
  44.             if (expireTime.isBefore(LocalDateTime.now())) {
  45.                 EXECUTORS.submit(() -> {
  46.                     // 重建缓存
  47.                     this.buildCache(id, 20L);
  48.                 });
  49.             }
  50.         } catch (InterruptedException e) {
  51.             throw new RuntimeException(e);
  52.         } finally {
  53.             // 释放锁
  54.             stringRedisTemplate.delete(LOCK_SHOP_KEY + id);
  55.         }
  56.         // 返回客户端
  57.         return Result.ok(shop);
  58.     }
  59.    
  60.     /**
  61.      * 重建缓存
  62.      */
  63.     public void buildCache(Long id, Long expireTime) {
  64.         String key = LOCK_SHOP_KEY + id;
  65.         // 重建缓存
  66.         Shop shop = getById(id);
  67.         if (null == shop) {
  68.             // 库中没有则放入 空对象
  69.             stringRedisTemplate.opsForValue().set(key, "", 10L, TimeUnit.SECONDS);
  70.         }
  71.         RedisData redisData = new RedisData();
  72.         redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireTime))
  73.                 .setData(shop);
  74.         stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
  75.     }
  76. }
复制代码
简单认识Lua脚本

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性
基本语法可以参考网站:https://www.runoob.com/lua/lua-tutorial.html
Redis提供的调用函数,语法如下:
  1. redis.call('命令名称', 'key', '其它参数', ...)
复制代码
如:先执行set name Rose,再执行get name,则脚本如下:
  1. # 先执行 set name jack
  2. redis.call('set', 'name', 'Rose')
  3. # 再执行 get name
  4. local name = redis.call('get', 'name')
  5. # 返回
  6. return name
复制代码
写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:

例如,我们要执行 redis.call('set', 'name', 'jack') 这个脚本,语法如下:

如果脚本中的key、value不想写死,可以作为参数传递
key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:

Java+Redis调用Lua脚本

RedisTemplate中,可以利用execute方法去执行lua脚本,参数对应关系就如下图股

示例:
  1. private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
  2.     static {
  3.         // 搞出脚本对象        DefaultRedisScript是RedisTemplate的实现类
  4.         UNLOCK_SCRIPT = new DefaultRedisScript<>();
  5.         // 脚本在哪个旮旯地方
  6.         UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
  7.         // 返回值类型
  8.         UNLOCK_SCRIPT.setResultType(Long.class);
  9.     }
  10. public void unlock() {
  11.     // 调用lua脚本
  12.     stringRedisTemplate.execute(
  13.             UNLOCK_SCRIPT,        // lua脚本
  14.             Collections.singletonList(KEY_PREFIX + name),        // 对应key参数的数值:KEYS数组
  15.             ID_PREFIX + Thread.currentThread().getId());        // 对应其他参数的数值:ARGV数组
  16. }
复制代码
Redisson

官网地址: https://redisson.org
GitHub地址: https://github.com/redisson/redisson
分布式锁需要解决几个问题:而下图的问题可以通过Redisson这个现有框架解决

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现

使用Redisson


  • 依赖2
  1. <dependency>
  2.         <groupId>org.redisson</groupId>
  3.         <artifactId>redisson</artifactId>
  4.         <version>3.13.6</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>org.redisson</groupId>
  8.     <artifactId>redisson-spring-boot-starter</artifactId>
  9.     <version>3.13.6</version>
  10. </dependency>
复制代码

  • 创建redisson客户端
YAML配置:常用参数戳这里
  1. spring:
  2.   application:
  3.     name: springboot-redisson
  4.   redis:
  5.     redisson:
  6.       config: |
  7.         singleServerConfig:
  8.           password: "redis服务密码"
  9.           address: "redis:/redis服务ip:6379"
  10.           database: 1
  11.         threads: 0
  12.         nettyThreads: 0
  13.         codec: !<org.redisson.codec.FstCodec> {}
  14.         transportMode: "NIO"
复制代码
代码配置
  1. import org.redisson.Redisson;
  2. import org.redisson.api.RedissonClient;
  3. import org.redisson.config.Config;
  4. import org.redisson.config.SingleServerConfig;
  5. import org.redisson.config.TransportMode;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class RedissonConfig {
  10.     @Bean
  11.     public RedissonClient redissonClient(){
  12.         Config config = new Config();
  13.         config.setTransportMode(TransportMode.NIO);
  14.         // 添加redis地址,这里添加了单点的地址,也可以使用 config.useClusterServers() 添加集群地址
  15.         config.useSingleServer()
  16.             .setAddress("redis://redis服务ip:6379")
  17.             .setPassword("redis密码");
  18.         
  19.         // 创建RedissonClient对象
  20.         return Redisson.create(config);
  21.     }
  22. }
复制代码

  • 使用redisson客户端
  1. @Resource
  2. private RedissionClient redissonClient;
  3. @Test
  4. void testRedisson() throws Exception{
  5.     // 获取锁(可重入),指定锁的名称
  6.     RLock lock = redissonClient.getLock("lockName");
  7.     /*
  8.      * 尝试获取锁
  9.      *
  10.      * 参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
  11.      */
  12.     boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
  13.     // 判断获取锁成功
  14.     if(isLock){
  15.         try{
  16.             System.out.println("执行业务");         
  17.         }finally{
  18.             //释放锁
  19.             lock.unlock();
  20.         }
  21.         
  22.     }
  23. }
复制代码
Redisson 可重入锁原理


  • 采用Hash结构
  • key为锁名
  • field为线程标识
  • value就是一个计数器count,同一个线程再来获取锁就让此值 +1,同线程释放一次锁此值 -1

    • PS:Java中使用的是state,C语言中用的是count,作用差不多

源码在:lock.tryLock(waitTime, leaseTime, TimeUnit)中,leaseTime这个参数涉及到WatchDog机制,所以可以直接看 lock.tryLock(waitTime, TimeUnit) 这个的源码

核心点在里面的lua脚本中:
  1. "if (redis.call('exists', KEYS[1]) == 0) then " +        -- KEYS[1] : 锁名称                判断锁是否存在
  2.         -- ARGV[2] = id + ":" + threadId                锁的小key        充当 field
  3.         "redis.call('hset', KEYS[1], ARGV[2], 1); " +        -- 当前这把锁不存在则添加锁,value=count=1 是hash结构
  4.         "redis.call('pexpire', KEYS[1], ARGV[1]); " +        -- 并给此锁设置有效期
  5.         "return nil; " +        -- 获取锁成功,返回nil,即:null
  6. "end; " +
  7. "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +        -- 判断 key+field 是否存在。即:判断是否是同一线程来获取锁
  8.     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +        -- 是自己,让value +1
  9.     "redis.call('pexpire', KEYS[1], ARGV[1]); " +                -- 给锁重置有效期
  10.     "return nil; " +        -- 成功,返回nil,即:null
  11. "end; " +
  12. "return redis.call('pttl', KEYS[1]);"        -- 获取锁失败(含失效),返回锁的TTL有效期
复制代码
Redission 锁重试 和 WatchDog机制

看源码时选择:RedissonLock
锁重试

这里的 tryLock(long waitTime, long leaseTime, TimeUnit unit)选择的是带参的,无参的 tryLock()是,默认不会重试的
  1. public class RedissonLock extends RedissonExpirable implements RLock {
  2.     @Override
  3.     public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  4.         // 将 waitTime 最大等待时间转成 毫秒
  5.         long time = unit.toMillis(waitTime);
  6.         // 获取此时的毫秒值
  7.         long current = System.currentTimeMillis();
  8.         // 获取当前线程ID
  9.         long threadId = Thread.currentThread().getId();
  10.         // 抢锁逻辑:涉及到WatchDog,待会儿再看
  11.         Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
  12.         // lock acquired        表示在上一步 tryAcquire() 中抢锁成功
  13.         if (ttl == null) {
  14.             return true;
  15.         }
  16.         
  17.         // 有获取当前时间
  18.         time -= System.currentTimeMillis() - current;
  19.         // 看执行上面的逻辑之后,是否超出了waitTime最大等待时间
  20.         if (time <= 0) {
  21.             // 超出waitTime最大等待时间,则获取锁失败
  22.             acquireFailed(waitTime, unit, threadId);
  23.             return false;
  24.         }
  25.         // 再精确时间,看经过上面逻辑之后,是否超出waitTime最大等待时间
  26.         current = System.currentTimeMillis();
  27.         // 订阅        发布逻辑是在 lock.unLock() 的逻辑中,里面有一个lua脚本,使用了 publiser 命令
  28.         RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
  29.         // 若订阅在waitTime最大等待时间内未完成,即超出waitTime最大等待时间
  30.         if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
  31.             // 同时订阅也未取消
  32.             if (!subscribeFuture.cancel(false)) {
  33.                 subscribeFuture.onComplete((res, e) -> {
  34.                     if (e == null) {
  35.                         // 则取消订阅
  36.                         unsubscribe(subscribeFuture, threadId);
  37.                     }
  38.                 });
  39.             }
  40.             // 订阅在waitTime最大等待时间内未完成,则获取锁失败
  41.             acquireFailed(waitTime, unit, threadId);
  42.             return false;
  43.         }
  44.         try {
  45.             // 继续精确时间,经过上面逻辑之后,是否超出waitTime最大等待时间
  46.             time -= System.currentTimeMillis() - current;
  47.             if (time <= 0) {
  48.                 acquireFailed(waitTime, unit, threadId);
  49.                 return false;
  50.             }
  51.         
  52.             // 还在waitTime最大等待时间内                这里面就是重试的逻辑
  53.             while (true) {
  54.                 long currentTime = System.currentTimeMillis();
  55.                 // 抢锁
  56.                 ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
  57.                 // lock acquired        获取锁成功
  58.                 if (ttl == null) {
  59.                     return true;
  60.                 }
  61.                 time -= System.currentTimeMillis() - currentTime;
  62.                 if (time <= 0) {        // 经过上面逻辑之后,时间已超出waitTime最大等待时间,则获取锁失败
  63.                     acquireFailed(waitTime, unit, threadId);
  64.                     return false;
  65.                 }
  66.                 // waiting for message
  67.                 currentTime = System.currentTimeMillis();
  68.                 // 未失效 且 失效期还在 waitTime最大等待时间 以内
  69.                 if (ttl >= 0 && ttl < time) {
  70.                     // 同时该进程也未被中断,则通过该信号量继续获取锁
  71.                     subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  72.                 } else {
  73.                     // 否则处于任务调度的目的,从而禁用当前线程,让其处于休眠状态
  74.                     // 除非其他线程调用当前线程的 release方法 或 当前线程被中断 或 waitTime已过
  75.                     subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
  76.                 }
  77.                 time -= System.currentTimeMillis() - currentTime;
  78.                 if (time <= 0) {
  79.                     // 还未获取锁成功,那就真的是获取锁失败了
  80.                     acquireFailed(waitTime, unit, threadId);
  81.                     return false;
  82.                 }
  83.             }
  84.         } finally {
  85.             // 取消订阅
  86.             unsubscribe(subscribeFuture, threadId);
  87.         }
  88. //        return get(tryLockAsync(waitTime, leaseTime, unit));
  89.     }
  90. }
复制代码
Redis持久化

分为两种:RDB和AOF

Redis的持久化虽然可以保证数据安全,但也会带来很多额外的开销,因此持久化请遵循下列建议:

  • 用来做缓存的Redis实例尽量不要开启持久化功能
  • 建议关闭RDB持久化功能,使用AOF持久化
  • 利用脚本定期在slave节点做RDB,实现数据备份
  • 设置合理的rewrite阈值,避免频繁的bgrewrite
  • [不是绝对,依情况而行]配置no-appendfsync-on-rewrite = yes,禁止在rewrite / fork期间做aof,避免因AOF引起的阻塞
部署有关建议:

  • Redis实例的物理机要预留足够内存,应对fork和rewrite
  • 单个Redis实例内存上限不要太大,如4G或8G。可以加快fork的速度、减少主从同步、数据迁移压力
  • 不要与CPU密集型应用部署在一起。如:一台虚拟机中部署了很多应用
  • 不要与高硬盘负载应用一起部署。例如:数据库、消息队列.........,这些应用会不断进行磁盘IO
RDB 持久化

RDB全称Redis Database Backup file(Redis数据备份文件),也被叫做Redis数据快照。里面的内容是二进制。简单来说就是把内存中的所有数据都记录到磁盘中。当Redis实例故障重启后,从磁盘读取快照文件,恢复数据。快照文件称为RDB文件,默认是保存在当前运行目录(redis.conf的dir配置)
RDB持久化在四种情况下会执行::

  • save命令:save命令会导致主进程执行RDB,这个过程中其它所有命令都会被阻塞。只有在数据迁移时可能用到。执行下面的命令,可以立即执行一次RDB


  • bgsave命令:这个命令执行后会开启独立进程完成RDB,主进程可以持续处理用户请求,不受影响。下面的命令可以异步执行RDB


  • Redis停机时:Redis停机时会执行一次save命令,实现RDB持久化


  • 触发RDB条件:redis.conf文件中配置的条件满足时就会触发RDB,如下列样式
  1. import org.junit.jupiter.api.Test;
  2. import org.redisson.api.RLock;
  3. import org.redisson.api.RedissonClient;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. import javax.annotation.Resource;
  6. import java.util.concurrent.TimeUnit;
  7. @SpringBootTest
  8. class ApplicationTests {
  9.     @Resource
  10.     private RedissonClient redissonClient;
  11.     /**
  12.      * 伪代码
  13.      */
  14.     @Test
  15.     void buildCache() throws InterruptedException {
  16.         // 获取 锁名字
  17.         RLock lock = redissonClient.getLock("");
  18.         // 获取锁
  19.         boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
  20.         // 释放锁
  21.         lock.unlock();
  22.     }
  23. }
复制代码
RDB的其它配置也可以在redis.conf文件中设置:
  1. public class RedissonLock extends RedissonExpirable implements RLock {
  2.         protected RFuture<Boolean> unlockInnerAsync(long threadId) {
  3.         
  4.         return evalWriteAsync(
  5.             getName(),
  6.             LongCodec.INSTANCE,
  7.             RedisCommands.EVAL_BOOLEAN,
  8.             "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
  9.                     "return nil;" +
  10.             "end; " +
  11.             
  12.             "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
  13.             
  14.             "if (counter > 0) then " +
  15.                     "redis.call('pexpire', KEYS[1], ARGV[2]); " +
  16.                     "return 0; " +
  17.             "else " +
  18.                     "redis.call('del', KEYS[1]); " +
  19.                     "redis.call('publish', KEYS[2], ARGV[1]); " +        // 发布,从而在上面的 重试 中进行订阅
  20.                     "return 1; " +
  21.             "end; " +
  22.             
  23.             "return nil;",
  24.             Arrays.asList(
  25.                         getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE,
  26.                             internalLockLeaseTime, getLockName(threadId)
  27.                 );
  28.     }
  29. }
复制代码
RDB的原理

RDB方式bgsave的基本流程?

  • fork主进程得到一个子进程,共享内存空间
  • 子进程读取内存数据并写入新的RDB文件
  • 用新RDB文件替换旧的RDB文件
fork采用的是copy-on-write技术:

  • 当主进程执行读操作时,访问共享内存;
  • 当主进程执行写操作时,则会拷贝一份数据,执行写操作。

所以1可以得知:RDB的缺点

  • RDB执行间隔时间长,两次RDB之间写入数据有丢失的风险
  • fork子进程、压缩、写出RDB文件都比较耗时
AOF 持久化

AOF全称为Append Only File(追加文件)。Redis处理的每一个写命令都会记录在AOF文件,可以看做是命令日志文件

AOF默认是关闭的,需要修改redis.conf配置文件来开启AOF:
  1. // 抢锁逻辑:涉及到WatchDog,待会儿再看
  2. Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
复制代码
AOF的命令记录的频率也可以通过redis.conf文件来配:
  1. public class RedissonLock extends RedissonExpirable implements RLock {
  2.     private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  3.         return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
  4.     }
  5.     /**
  6.      * 异步获取锁
  7.      */
  8.     private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  9.         // leaseTime到期时间 等于 -1 否,此值决定着是否开启watchDog机制
  10.         if (leaseTime != -1) {
  11.             return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
  12.         }
  13.         RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
  14.             waitTime,
  15.             /*
  16.              * getLockWatchdogTimeout() 就是获取 watchDog 时间,即:
  17.              *                 private long lockWatchdogTimeout = 30 * 1000;
  18.              */
  19.             commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
  20.             TimeUnit.MILLISECONDS,
  21.             threadId,
  22.             RedisCommands.EVAL_LONG
  23.         );
  24.         // 上一步ttlRemainingFuture异步执行完时
  25.         ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
  26.             // 若出现异常了,则说明获取锁失败,直接滚犊子了
  27.             if (e != null) {
  28.                 return;
  29.             }
  30.             // lock acquired        获取锁成功
  31.             if (ttlRemaining == null) {
  32.                 // 过期了,则重置到期时间,进入这个方法瞄一下
  33.                 scheduleExpirationRenewal(threadId);
  34.             }
  35.         });
  36.         return ttlRemainingFuture;
  37.     }
  38.    
  39.    
  40.    
  41.     /**
  42.      * 重新续约到期时间
  43.      */
  44.         private void scheduleExpirationRenewal(long threadId) {
  45.         ExpirationEntry entry = new ExpirationEntry();
  46.         /*
  47.          * private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
  48.          *
  49.          * getEntryName() 就是 this.entryName = id + ":" + name
  50.          *                         而 this.id = commandExecutor.getConnectionManager().getId()
  51.          *
  52.          * putIfAbsent() key未有value值则进行关联,相当于:
  53.          *                 if (!map.containsKey(key))
  54.          *                        return map.put(key, value);
  55.          *                 else
  56.          *                        return map.get(key);
  57.          */
  58.         ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
  59.         if (oldEntry != null) {
  60.             oldEntry.addThreadId(threadId);
  61.         } else {
  62.             entry.addThreadId(threadId);
  63.             // 续订到期                续约逻辑就在这里面
  64.             renewExpiration();
  65.         }
  66.     }
  67.    
  68.    
  69.    
  70.     /**
  71.      * 续约
  72.      */
  73.         private void renewExpiration() {
  74.         ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  75.         if (ee == null) {
  76.             return;
  77.         }
  78.         
  79.         /*
  80.          * 这里 newTimeout(new TimerTask(), 参数2, 参数3) 指的是:参数2,参数3去描述什么时候去做参数1的事情
  81.          * 这里的参数2:internalLockLeaseTime / 3 就是前面的 lockWatchdogTimeout = (30 * 1000) / 3 = 1000ms = 10s
  82.          *
  83.          * 锁的失效时间是30s,当10s之后,此时这个timeTask 就触发了,它就去进行续约,把当前这把锁续约成30s,
  84.          * 如果操作成功,那么此时就会递归调用自己,再重新设置一个timeTask(),于是再过10s后又再设置一个timerTask,
  85.          * 完成不停的续约
  86.          */
  87.         Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
  88.             @Override
  89.             public void run(Timeout timeout) throws Exception {
  90.                 ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  91.                 if (ent == null) {
  92.                     return;
  93.                 }
  94.                 Long threadId = ent.getFirstThreadId();
  95.                 if (threadId == null) {
  96.                     return;
  97.                 }
  98.                
  99.                 // renewExpirationAsync() 里面就是一个lua脚本,脚本中使用 pexpire 指令重置失效时间,
  100.                 // pexpire 此指令是以 毫秒 进行,expire是以 秒 进行
  101.                 RFuture<Boolean> future = renewExpirationAsync(threadId);
  102.                 future.onComplete((res, e) -> {
  103.                     if (e != null) {
  104.                         log.error("Can't update lock " + getName() + " expiration", e);
  105.                         return;
  106.                     }
  107.                     
  108.                     if (res) {
  109.                         // reschedule itself        递归此方法,从而完成不停的续约
  110.                         renewExpiration();
  111.                     }
  112.                 });
  113.             }
  114.         }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
  115.         
  116.         ee.setTimeout(task);
  117.     }
  118. }
复制代码
三种策略对比:

AOF文件重写

因为是记录命令,AOF文件会比RDB文件大的多。而且AOF会记录对同一个key的多次写操作,但只有最后一次写操作才有意义。通过执行bgrewriteaof命令,可以让AOF文件执行重写功能,用最少的命令达到相同效果。

Redis也会在触发阈值时自动去重写AOF文件。阈值也可以在redis.conf中配置:
  1. # 900秒内,如果至少有1个key被修改,则执行bgsave,如果是save "" 则表示禁用RDB
  2. save 900 1  
  3. save 300 10  
  4. save 60 10000
复制代码
主从数据同步原理

全量同步

完整流程描述:先说完整流程,然后再说怎么来的

  • slave节点请求增量同步
  • master节点判断replid,发现不一致,拒绝增量同步
  • master将完整内存数据生成RDB,发送RDB到slave
  • slave清空本地数据,加载master的RDB
  • master将RDB期间的命令记录在repl_baklog,并持续将log中的命令发送给slave
  • slave执行接收到的命令,保持与master之间的同步
主从第一次建立连接时,会执行全量同步,将master节点的所有数据都拷贝给slave节点,流程:

master如何得知salve是第一次来连接??
有两个概念,可以作为判断依据:

  • Replication Id:简称replid,是数据集的标记,id一致则说明是同一数据集。每一个master都有唯一的replid,slave则会继承master节点的replid
  • offset:偏移量,随着记录在repl_baklog中的数据增多而逐渐增大。slave完成同步时也会记录当前同步的offset。如果slave的offset小于master的offset,说明slave数据落后于master,需要更新
因此slave做数据同步,必须向master声明自己的replication id 和 offset,master才可以判断到底需要同步哪些数据
因为slave原本也是一个master,有自己的replid和offset,当第一次变成slave与master建立连接时,发送的replid和offset是自己的replid和offset
master判断发现slave发送来的replid与自己的不一致,说明这是一个全新的slave,就知道要做全量同步了
master会将自己的replid和offset都发送给这个slave,slave保存这些信息。以后slave的replid就与master一致了。
因此,master判断一个节点是否是第一次同步的依据,就是看replid是否一致
如图:

增量同步

全量同步需要先做RDB,然后将RDB文件通过网络传输个slave,成本太高了。因此除了第一次做全量同步,其它大多数时候slave与master都是做增量同步
增量同步:就是只更新slave与master存在差异的部分数据

这种方式会出现失效的情况:原因就在repl_baklog中
repl_baklog原理

master怎么知道slave与自己的数据差异在哪里呢?
这就要说到全量同步时的repl_baklog文件了。
这个文件是一个固定大小的数组,只不过数组是环形,也就是说角标到达数组末尾后,会再次从0开始读写,这样数组头部的数据就会被覆盖。
repl_baklog中会记录Redis处理过的命令日志及offset,包括master当前的offset,和slave已经拷贝到的offset:

slave与master的offset之间的差异,就是salve需要增量拷贝的数据了。
随着不断有数据写入,master的offset逐渐变大,slave也不断的拷贝,追赶master的offset:

直到数组被填满:

此时,如果有新的数据写入,就会覆盖数组中的旧数据。不过,旧的数据只要是绿色的,说明是已经被同步到slave的数据,即便被覆盖了也没什么影响。因为未同步的仅仅是红色部分。
但是,如果slave出现网络阻塞,导致master的offset远远超过了slave的offset:

如果master继续写入新数据,其offset就会覆盖旧的数据,直到将slave现在的offset也覆盖:

棕色框中的红色部分,就是尚未同步,但是却已经被覆盖的数据。此时如果slave恢复,需要同步,却发现自己的offset都没有了,无法完成增量同步了。只能做全量同步。

主从同步优化

可以从以下几个方面来优化Redis主从集群:

  • 在master中配置repl-diskless-sync yes启用无磁盘复制,避免全量同步时的磁盘IO。
  • Redis单节点上的内存占用不要太大,减少RDB导致的过多磁盘IO
  • 适当提高repl_baklog的大小,发现slave宕机时尽快实现故障恢复,尽可能避免全量同步
  • 限制一个master上的slave节点数量,如果实在是太多slave,则可以采用主-从-从链式结构,减少master压力

哨兵集群

Redis提供了哨兵(Sentinel)机制来实现主从集群的自动故障恢复
哨兵的作用


  • 监控:Sentinel 会不断检查您的master和slave是否按预期工作
  • 自动故障恢复:如果master故障,Sentinel会将一个slave提升为master。当故障实例恢复后也以新的master为主
  • 通知:Sentinel充当Redis客户端的服务发现来源,当集群发生故障转移时,会将最新信息推送给Redis的客户端
集群监控原理

Sentinel基于心跳机制监测服务状态,每隔1秒向集群的每个实例发送ping命令:

  • 主观下线:如果某sentinel节点发现某实例未在规定时间响应,则认为该实例主观下线
  • 客观下线:若超过指定数量(quorum)的sentinel都认为该实例主观下线,则该实例客观下线。quorum值最好超过Sentinel实例数量的一半

集群故障恢复原理

一旦发现master故障,sentinel需要在salve中选择一个作为新的master,选择依据是这样的:

  • 首先会判断slave节点与master节点断开时间长短,如果超过指定值(redis.conf配置的down-after-milliseconds * 10)则会排除该slave节点
  • 然后判断slave节点的slave-priority值,越小优先级越高,如果是0则永不参与选举
  • 若slave-prority一样,则判断slave节点的offset值,越大说明数据越新,优先级越高
  • 若offset一样,则最后判断slave节点的运行id(redis开启时都会分配一个)大小,越小优先级越高
当选出一个新的master后,该如何实现切换?流程如下:

  • sentinel链接备选的slave节点,让其执行 slaveof no one(翻译:不要当奴隶) 命令,让该节点成为master
  • sentinel给所有其它slave发送 slaveof <masterip> <masterport> 命令,让这些slave成为新master的从节点,开始从新的master上同步数据
  • 最后,sentinel将故障节点标记为slave,当故障节点恢复后会自动成为新的master的slave节点

搭建哨兵集群



  • 创建目录
  1. # 是否压缩 ,建议不开启,压缩也会消耗cpu,磁盘的话不值钱
  2. rdbcompression yes
  3. # RDB文件名称
  4. dbfilename dump.rdb  
  5. # 文件保存的路径目录
  6. dir ./
复制代码

  • 在s1目录中创建sentinel.conf文件,编辑如下内容
  1. # 是否开启AOF功能,默认是no
  2. appendonly yes
  3. # AOF文件的名称
  4. appendfilename "appendonly.aof"
复制代码

  • port 27001:是当前sentinel实例的端口
  • sentinel monitor mymaster 192.168.150.101 7001 2:指定主节点信息

    • mymaster:主节点名称,自定义,任意写
    • 192.168.150.101 7001:主节点的ip和端口
    • 2:选举master时的quorum值


  • 将s1/sentinel.conf文件拷贝到s2、s3两个目录中(在/tmp目录执行下列命令):
  1. # 表示每执行一次写命令,立即记录到AOF文件
  2. appendfsync always
  3. # 写命令执行完先放入AOF缓冲区,然后表示每隔1秒将缓冲区数据写到AOF文件,是默认方案
  4. appendfsync everysec
  5. # 写命令执行完先放入AOF缓冲区,由操作系统决定何时将缓冲区内容写回磁盘
  6. appendfsync no
复制代码

  • 修改s2、s3两个文件夹内的配置文件,将端口分别修改为27002、27003:
  1. # AOF文件比上次文件 增长超过多少百分比则触发重写
  2. auto-aof-rewrite-percentage 100
  3. # AOF文件体积最小多大以上才触发重写
  4. auto-aof-rewrite-min-size 64mb
复制代码

  • 启动
  1. # 进入/tmp目录
  2. cd /tmp
  3. # 创建目录
  4. mkdir s1 s2 s3
复制代码
RedisTemplate的哨兵模式


  • 依赖
  1. port 27001
  2. sentinel announce-ip 192.168.150.101
  3. sentinel monitor mymaster 192.168.150.101 7001 2
  4. sentinel down-after-milliseconds mymaster 5000
  5. sentinel failover-timeout mymaster 60000
  6. dir "/tmp/s1"
复制代码

  • YAML配置哨兵集群
  1. # 方式一:逐个拷贝
  2. cp s1/sentinel.conf s2
  3. cp s1/sentinel.conf s3
  4. # 方式二:管道组合命令,一键拷贝
  5. echo s2 s3 | xargs -t -n 1 cp s1/sentinel.conf
复制代码

  • 配置读写分离
  1. sed -i -e 's/27001/27002/g' -e 's/s1/s2/g' s2/sentinel.conf
  2. sed -i -e 's/27001/27003/g' -e 's/s1/s3/g' s3/sentinel.conf
复制代码
这个bean中配置的就是读写策略,包括四种:

  • MASTER:从主节点读取
  • MASTER_PREFERRED:优先从master节点读取,master不可用才读取replica
  • REPLICA:从slave(replica)节点读取
  • REPLICA _PREFERRED:优先从slave(replica)节点读取,所有的slave都不可用才读取master

  • 就可以在需要的地方正常使用RedisTemplate了,如前面玩的StringRedisTemplate
分片集群

主从和哨兵可以解决高可用、高并发读的问题。但是依然有两个问题没有解决:

  • 海量数据存储问题
  • 高并发写的问题
分片集群可解决上述问题,分片集群特征:

  • 集群中有多个master,每个master保存不同数据
  • 每个master都可以有多个slave节点
  • master之间通过ping监测彼此健康状态
  • 客户端请求可以访问集群任意节点,最终都会被转发到正确节点
搭建分片集群



  • 创建目录
  1. # 第1个
  2. redis-sentinel s1/sentinel.conf
  3. # 第2个
  4. redis-sentinel s2/sentinel.conf
  5. # 第3个
  6. redis-sentinel s3/sentinel.conf
复制代码

  • 在temp目录下新建redis.conf文件,编辑内容如下:
  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-data-redis</artifactId>
  4. </dependency>
复制代码

  • 将文件拷贝到每个目录下
  1. spring:
  2.   redis:
  3.     sentinel:
  4.       master: mymaster        # 前面哨兵集群搭建时的名字
  5.       nodes:
  6.         - 192.168.150.101:27001        # 这里的ip:port是sentinel哨兵的,而不用关注哨兵监管下的那些节点
  7.         - 192.168.150.101:27002
  8.         - 192.168.150.101:27003
复制代码

  • 将每个目录下的第redis.conf中的6379改为所在目录一致
  1. @Bean
  2. public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){
  3.     return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
  4. }
复制代码

  • 启动
  1. # 进入/tmp目录
  2. cd /tmp
  3. # 创建目录
  4. mkdir 7001 7002 7003 8001 8002 8003
复制代码

  • 创建集群:上一步启动了redis,但这几个redis之间还未建立联系,以下命令要求redis版本大于等于5.0
  1. port 6379
  2. # 开启集群功能
  3. cluster-enabled yes
  4. # 集群的配置文件名称,不需要我们创建,由redis自己维护
  5. cluster-config-file /tmp/6379/nodes.conf
  6. # 节点心跳失败的超时时间
  7. cluster-node-timeout 5000
  8. # 持久化文件存放目录
  9. dir /tmp/6379
  10. # 绑定地址
  11. bind 0.0.0.0
  12. # 让redis后台运行
  13. daemonize yes
  14. # 注册的实例ip
  15. replica-announce-ip 192.168.146.100
  16. # 保护模式
  17. protected-mode no
  18. # 数据库数量
  19. databases 1
  20. # 日志
  21. logfile /tmp/6379/run.log
复制代码

  • redis-cli --cluster或者./redis-trib.rb:代表集群操作命令
  • create:代表是创建集群
  • --replicas 1或者--cluster-replicas 1 :指定集群中每个master的副本个数为1,此时节点总数 ÷ (replicas + 1) 得到的就是master的数量。因此节点列表中的前n个就是master,其它节点都是slave节点,随机分配到不同master


  • 查看集群状态
  1. # 进入/tmp目录
  2. cd /tmp
  3. # 执行拷贝
  4. echo 7001 7002 7003 8001 8002 8003 | xargs -t -n 1 cp redis.conf
复制代码

  • 命令行链接集群redis注意点
  1. # 进入/tmp目录
  2. cd /tmp
  3. # 修改配置文件
  4. printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t sed -i 's/6379/{}/g' {}/redis.conf
复制代码
散列插槽

Redis会把每一个master节点映射到0~16383共16384个插槽(hash slot)上,查看集群信息时就能看到

数据key不是与节点绑定,而是与插槽绑定。redis会根据key的有效部分计算插槽值,分两种情况:

  • key中包含"{}",且“{}”中至少包含1个字符,“{}”中的部分是有效部分
  • key中不包含“{}”,整个key都是有效部分
好处:节点挂了,但插槽还在,将插槽分配给健康的节点,那数据就恢复了
如:key是num,那么就根据num计算;如果是{name}num,则根据name计算。计算方式是利用CRC16算法得到一个hash值,然后对16384取余,得到的结果就是slot值

如上:在7001存入name,对name做hash运算,之后对16384取余,得到的5798就是name=zixq要存储的位置,而5798在7002节点中,所以跳入了7002节点;而 set job java 也是同样的道理
利用上述的原理可以做到:将同一类数据固定地保存在同一个Redis实例/节点(即:这一类数据使用相同的有效部分,如key都以{typeId}为前缀)
分片集群下的故障转移

分片集群下,虽然没有哨兵,但是也可以进行故障转移

  • 自动故障转移:master挂了、选一个slave为主........,和前面玩过的主从一样
  • 手动故障转移:后续新增的节点(此时是slave),性能比原有的节点(master)性能好,故而将新节点弄为master
  1. # 进入/tmp目录
  2. cd /tmp
  3. # 一键启动所有服务
  4. printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-server {}/redis.conf
  5. # 查看启动状态
  6. ps -ef | grep redis
  7. # 关闭所有进程
  8. printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-cli -p {} shutdown
复制代码
failover命令可以指定三种模式:

  • 缺省:默认的流程,如下图1~6歩(推荐使用)


  • force:省略了对offset的一致性校验
  • takeover:直接执行第5歩,忽略数据一致性、忽略master状态和其它master的意见
RedisTemplate访问分片集群

RedisTemplate底层同样基于lettuce实现了分片集群的支持、所以和哨兵集群的RedisTemplate一样,区别就是YAML配置

  • 依赖
  1. port 27001
  2. sentinel announce-ip 192.168.150.101
  3. sentinel monitor mymaster 192.168.150.101 7001 2
  4. sentinel down-after-milliseconds mymaster 5000
  5. sentinel failover-timeout mymaster 60000
  6. dir "/tmp/s1"
复制代码

  • YAML配置分片集群
  1. redis-cli -p 7001 cluster nodes
复制代码

  • 配置读写分离
  1. sed -i -e 's/27001/27002/g' -e 's/s1/s2/g' s2/sentinel.conf
  2. sed -i -e 's/27001/27003/g' -e 's/s1/s3/g' s3/sentinel.conf
复制代码
这个bean中配置的就是读写策略,包括四种:

  • MASTER:从主节点读取
  • MASTER_PREFERRED:优先从master节点读取,master不可用才读取replica
  • REPLICA:从slave(replica)节点读取
  • REPLICA _PREFERRED:优先从slave(replica)节点读取,所有的slave都不可用才读取master

  • 就可以在需要的地方正常使用RedisTemplate了,如前面玩的StringRedisTemplate
集群伴随的问题

集群虽然具备高可用特性,能实现自动故障恢复,但是如果使用不当,也会存在一些问题

  • 集群完整性问题:在Redis的默认配置中,如果发现任意一个插槽不可用,则整个集群都会停止对外服务,即就算有一个slot不能用了,那么集群也会不可用,像什么set、get......命令也用不了了,因此开发中,最重要的是可用性,因此修改 redis.conf文件中的内容
  1. # 在新增节点一方执行下列命令,就会让此新增节点与其master节点身份对调
  2. cluster failover
复制代码

  • 集群带宽问题:集群节点之间会不断的互相Ping来确定集群中其它节点的状态。每次Ping携带的信息至少包括


  • 插槽信息
  • 集群状态信息
集群中节点越多,集群状态信息数据量也越大,10个节点的相关信息可能达到1kb,此时每次集群互通需要的带宽会非常高,这样会导致集群中大量的带宽都会被ping信息所占用,这是一个非常可怕的问题,所以我们需要去解决这样的问题
解决途径:

  • 避免大集群,集群节点数不要太多,最好少于1000,如果业务庞大,则建立多个集群。
  • 避免在单个物理机中运行太多Redis实例
  • 配置合适的cluster-node-timeout值

  • lua和事务的问题:这两个都是为了保证原子性,这就要求执行的所有key都落在一个节点上,而集群则会破坏这一点,集群中无法保证lua和事务问题
  • 数据倾斜问题:因为hash_tag(散列插槽中说的hash算的slot值)落在一个节点上,就导致大量数据都在一个redis节点中了
  • 集群和主从选择问题:单体Redis(主从Redis+哨兵)已经能达到万级别的QPS,并且也具备很强的高可用特性。如果主从能满足业务需求的情况下,若非万不得已的情况下,尽量不搭建Redis集群
批处理

单机redis批处理:mxxx命令 与 Pipeline

Mxxx虽然可以批处理,但是却只能操作部分数据类型(String是mset、Hash是hmset、Set是sadd key member.......),因此如果有对复杂数据类型的批处理需要,建议使用Pipeline
注意点:mxxx命令可以保证原子性,一堆命令一起执行;而Pipeline是非原子性的,这是将一堆命令一起发给redis服务器,然后把这些命令放入服务器的一个队列中,最后慢慢执行而已,因此执行命令不一定是一起执行的
  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-data-redis</artifactId>
  4. </dependency>
复制代码
慢查询

Redis执行时耗时超过某个阈值的命令,称为慢查询
慢查询的危害:由于Redis是单线程的,所以当客户端发出指令后,他们都会进入到redis底层的queue来执行,如果此时有一些慢查询的数据,就会导致大量请求阻塞,从而引起报错,所以我们需要解决慢查询问题

慢查询的阈值可以通过配置指定:
slowlog-log-slower-than:慢查询阈值,单位是微秒。默认是10000,建议1000
慢查询会被放入慢查询日志中,日志的长度有上限,可以通过配置指定:
slowlog-max-len:慢查询日志(本质是一个队列)的长度。默认是128,建议1000


  • 临时配置
  1. spring:
  2.   redis:
  3.     cluster:
  4.       nodes:
  5.         - 192.168.146.100:7001        # 和哨兵集群的区别:此集群没有哨兵,这里使用的是分片集群的每个节点的ip:port
  6.         - 192.168.146.100:7002
  7.         - 192.168.146.100:7003
  8.         - 192.168.146.100:8001
  9.         - 192.168.146.100:8002
  10.         - 192.168.146.100:8003
复制代码

  • 永久配置:在redis.conf文件中添加相应内容即可
  1. @Bean
  2. public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){
  3.     return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
  4. }
复制代码
查看慢查询


  • 命令方式
  1. # 集群全覆盖                默认值是 yes,改为no即可
  2. cluster-require-full-coverage no
复制代码


  • 客户端工具:不同客户端操作不一样

内存配置

当Redis内存不足时,可能导致Key频繁被删除、响应时间变长、QPS不稳定等问题。当内存使用率达到90%以上时就需要我们警惕,并快速定位到内存占用的原因
redis中的内存划分:
内存占用说明备注数据内存是Redis最主要的部分,存储Redis的键值信息。主要问题是BigKey问题、内存碎片问题内存碎片问题:Redis底层分配并不是这个key有多大,他就会分配多大,而是有他自己的分配策略,比如8,16,20等等,假定当前key只需要10个字节,此时分配8肯定不够,那么他就会分配16个字节,多出来的6个字节就不能被使用,这就是我们常说的 碎片问题。这种一般重启redis就解决了进程内存Redis主进程本身运⾏肯定需要占⽤内存,如代码、常量池等等;这部分内存⼤约⼏兆,在⼤多数⽣产环境中与Redis数据占⽤的内存相⽐可以忽略这部分内存一般都可以忽略不计缓冲区内存一般包括客户端缓冲区、AOF缓冲区、复制缓冲区等。客户端缓冲区又包括输入缓冲区和输出缓冲区两种。这部分内存占用波动较大,不当使用BigKey,可能导致内存溢出一般包括客户端缓冲区、AOF缓冲区、复制缓冲区等。客户端缓冲区又包括输入缓冲区和输出缓冲区两种。这部分内存占用波动较大,所以这片内存也是需要重点分析的内存问题查看内存情况


  • 查看内存分配的情况
  1. @Test
  2. void testPipeline() {
  3.     // 创建管道         Pipeline 此对象基本上可以进行所有的redis操作,如:pipeline.set()、pipeline.hset().....
  4.     Pipeline pipeline = jedis.pipelined();
  5.    
  6.     for (int i = 1; i <= 100000; i++) {
  7.         // 放入命令到管道
  8.         pipeline.set("test:key_" + i, "value_" + i);
  9.         if (i % 1000 == 0) {
  10.             // 每放入1000条命令,批量执行
  11.             pipeline.sync();
  12.         }
  13.     }
  14. }
复制代码

  • 查看key的主要占用情况
  1. @Test
  2. void testMSetInCluster() {
  3.     Map<String, String> map = new HashMap<>(4);
  4.     map.put("name", "Rose");
  5.     map.put("age", "21");
  6.     map.put("sex", "Female");
  7.    
  8.     stringRedisTemplate.opsForValue().multiSet(map);
  9.     List<String> strings = stringRedisTemplate
  10.         .opsForValue()
  11.         .multiGet(Arrays.asList("name", "age", "sex"));
  12.    
  13.     strings.forEach(System.out::println);
  14. }
复制代码
MEMORY STATS查询结果解读
  1. public class RedisAdvancedClusterAsyncCommandsImpl {
  2.     @Override
  3.     public RedisFuture<String> mset(Map<K, V> map) {
  4.         // 算key的slot值,然后key相同的分在一组
  5.         Map<Integer, List<K>> partitioned = SlotHash.partition(codec, map.keySet());
  6.         if (partitioned.size() < 2) {
  7.             return super.mset(map);
  8.         }
  9.         Map<Integer, RedisFuture<String>> executions = new HashMap<>();
  10.         for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
  11.             Map<K, V> op = new HashMap<>();
  12.             entry.getValue().forEach(k -> op.put(k, map.get(k)));
  13.             // 异步发送:即mset()中的逻辑就是并行slot方式
  14.             RedisFuture<String> mset = super.mset(op);
  15.             executions.put(entry.getKey(), mset);
  16.         }
  17.         return MultiNodeExecution.firstOfAsync(executions);
  18.     }
  19. }
复制代码

  • 客户端链接工具查看:不同redis客户端链接工具不一样,操作不一样

解决内存问题

由前面铺垫得知:内存缓冲区常见的有三种

  • 复制缓冲区:主从复制的repl_backlog_buf,如果太小可能导致频繁的全量复制,影响性能。通过replbacklog-size来设置,默认1mb
  • AOF缓冲区:AOF刷盘之前的缓存区域,AOF执行rewrite的缓冲区。无法设置容量上限
  • 客户端缓冲区:分为输入缓冲区和输出缓冲区,输入缓冲区最大1G且不能设置。输出缓冲区可以设置
其他的基本上可以忽略,最关键的其实是客户端缓冲区的问题:
客户端缓冲区:指的就是我们发送命令时,客户端用来缓存命令的一个缓冲区,也就是我们向redis输入数据的输入端缓冲区和redis向客户端返回数据的响应缓存区
输入缓冲区最大1G且不能设置,所以这一块我们根本不用担心,如果超过了这个空间,redis会直接断开,因为本来此时此刻就代表着redis处理不过来了,我们需要担心的就是输出端缓冲区

我们在使用redis过程中,处理大量的big value,那么会导致我们的输出结果过多,如果输出缓存区过大,会导致redis直接断开,而默认配置的情况下, 其实它是没有大小的,这就比较坑了,内存可能一下子被占满,会直接导致咱们的redis断开,所以解决方案有两个

  • 设置一个大小
  • 增加我们带宽的大小,避免我们出现大量数据从而直接超过了redis的承受能力
原理篇

涉及的Redis源码采用的版本为redis-6.2.6
Redis数据结构:SDS

Redis是C语言实现的,但C语言本身的字符串有缺陷,因此Redis就自己弄了一个字符串SDS(Simple Dynamic String 简单动态字符串)
C语言本身的字符串缺陷如下:

  • 二级制不安全
C 语⾔的字符串其实就是⼀个字符数组,即数组中每个元素是字符串中的⼀个字符

为什么最后⼀个字符是“\0”?
在 C 语⾔⾥,对字符串操作时,char * 指针只是指向字符数组的起始位置,⽽字符数组的结尾位置就⽤“\0”表示,意思是指字符串的结束
所以,C 语⾔标准库中的字符串操作函数就通过判断字符是不是 “\0” 来决定要不要停⽌操作,如果当前字符不是 “\0” ,说明字符串还没结束,可以继续操作,如果当前字符是 “\0” 是则说明字符串结束了,就要停⽌操作
因此,C 语⾔字符串⽤ “\0” 字符作为结尾标记有个缺陷。假设有个字符串中有个 “\0” 字符,这时在操作这个字符串时就会提早结束
故C语言字符串缺陷:字符串⾥⾯不能含有 “\0” 字符,否则最先被程序读⼊的 “\0” 字符将被误认为是字符串结尾,这个限制使得 C 语⾔的字符串只能保存⽂本数据,不能保存像图⽚、⾳频、视频文件这样的⼆进制数据;同时C 语⾔获取字符串⻓度的时间复杂度是 O(N)

  • 字符串操作函数不⾼效且不安全(可能导致发⽣缓冲区溢出)
举个例⼦,strcat 函数是可以将两个字符串拼接在⼀起
  1. config set slowlog-log-slower-than 1000                # 临时配置:慢查询阈值,重启redis则失效
  2. config set slowlog-max-len 1000                                # 慢查询日志长度
复制代码
strcat 函数和 strlen 函数类似,时间复杂度也很⾼,也都需要先通过遍历字符串才能得到⽬标字符串的末尾。对于 strcat 函数来说,还要再遍历源字符串才能完成追加,对字符串的操作效率不⾼
C 语⾔的字符串是不会记录⾃身的缓冲区⼤⼩的,所以 strcat 函数假定程序员在执⾏这个函数时,已经为dest 分配了⾜够多的内存,可以容纳 src 字符串中的所有内容,⽽⼀旦这个假定不成⽴,就会发⽣缓冲区溢出将可能会造成程序运⾏终⽌
SDS 结构体

Redis源码:官网下载redis x.x.x.tar.gz,然后解压,src目录下即为源码所在
Redis中SDS是一个结构体,源码如下:


  • len,记录了字符串⻓度。这样获取字符串⻓度的时候,只需要返回这个成员变量值就⾏,时间复杂度只需要 O(1)
  • alloc,分配给字符数组的空间⻓度。这样在修改字符串的时候,可以通过 alloc - len 计算出剩余的空间⼤⼩,可以⽤来判断空间是否满⾜修改需求,如果不满⾜的话,就会⾃动将 SDS 的空间扩展⾄执⾏修改所需的⼤⼩,然后才执⾏实际的修改操作,所以使⽤ SDS 既不需要⼿动修改 SDS 的空间⼤⼩,也不会出现前⾯所说的缓冲区溢出的问题。
  • flags,⽤来表示不同类型的 SDS。⼀共设计了 5 种类型,分别是 sdshdr5、sdshdr8、sdshdr16、sdshdr32 和 sdshdr64
  • buf[],字符数组,⽤来保存实际数据。不仅可以保存字符串,也可以保存⼆进制数据

  • 解决C语言原来字符串效率不高问题
Redis 的 SDS 结构因为加⼊了 len 成员变量,那么获取字符串⻓度的时候,直接返回这个成员变量的值就⾏,所以复杂度只有 O1

  • 解决二进制不安全问题
SDS 不需要⽤ “\0” 字符来标识字符串结尾了,⽽是有个专⻔的 len 成员变量来记录⻓度,所以可存储包含 “\0” 的数据。但是 SDS 为了兼容部分 C 语⾔标准库的函数, SDS 字符串结尾还是会加上 “\0” 字符
因此, SDS 的 API 都是以处理⼆进制的⽅式来处理 SDS 存放在 buf[] ⾥的数据,程序不会对其中的数据做任何限制,数据写⼊的时候时什么样的,它被读取时就是什么样的
通过使⽤⼆进制安全的 SDS,⽽不是 C 字符串,使得 Redis 不仅可以保存⽂本数据,也可以保存任意格式的⼆进制数据

  • 解决缓冲区可能溢出问题
Redis 的 SDS 结构⾥引⼊了 alloc 和 len 成员变量,这样 SDS API 通过 alloc - len 计算,可以算出剩余可⽤的空间⼤⼩,这样在对字符串做修改操作的时候,就可以由程序内部判断缓冲区⼤⼩是否⾜够⽤
⽽且,当判断出缓冲区⼤⼩不够⽤时,Redis 会⾃动将扩⼤ SDS 的空间⼤⼩,扩容方式如下:

  • 若新字符串小于1M,则新空间为扩展后字符串长度的两倍 +1
PS:+ 1的原因是“\0”字符

  • 若新字符串大于1M,则新空间为扩展后字符串长度 +1M +1。称为内存预分配
PS:内存预分配好处,下次在操作 SDS 时,如果 SDS 空间够的话,API 就会直接使⽤「未使⽤空间」,⽽⽆须执⾏内存分配,有效的减少内存分配次数

  • 节省内存空间
SDS 结构中有个 flags 成员变量,表示的是 SDS 类型
Redos ⼀共设计了 5 种类型,分别是 sdshdr5、sdshdr8、sdshdr16、sdshdr32 和 sdshdr64
这 5 种类型的主要区别就在于,它们数据结构中的 len 和 alloc 成员变量的数据类型不同
如 sdshdr16 和 sdshdr32 这两个类型,它们的定义分别如下:
  1. # 慢查询阈值
  2. slowlog-log-slower-than 1000
  3. # 慢查询日志长度
  4. slowlog-max-len 1000
复制代码
可以看到:

  • sdshdr16 类型的 len 和 alloc 的数据类型都是 uint16_t,表示字符数组⻓度和分配空间⼤⼩不能超过2 的 16 次⽅
  • sdshdr32 则都是 uint32_t,表示表示字符数组⻓度和分配空间⼤⼩不能超过 2 的 32 次⽅
之所以 SDS 设计不同类型的结构体,是为了能灵活保存不同⼤⼩的字符串,从⽽有效节省内存空间。如,在保存⼩字符串时,结构头占⽤空间也⽐较少
除了设计不同类型的结构体,Redis 在编程上还使⽤了专⻔的编译优化来节省内存空间,即在 struct 声明了 __attribute__ ((packed)) ,它的作⽤是:告诉编译器取消结构体在编译过程中的优化对⻬,按照实际占⽤字节数进⾏对⻬
⽐如,sdshdr16 类型的 SDS,默认情况下,编译器会按照 16 字节对⻬的⽅式给变量分配内存,这意味着,即使⼀个变量的⼤⼩不到 16 个字节,编译器也会给它分配 16 个字节
举个例⼦,假设下⾯这个结构体,它有两个成员变量,类型分别是 char 和 int,如下所示:
  1. slowlog len                                # 查询慢查询日志长度
  2. slowlog get [n]                        # 读取n条慢查询日志
  3. slowlog reset                        # 清空慢查询列表
复制代码
默认情况下,这个结构体⼤⼩计算出来就会是 8

这是因为默认情况下,编译器是使⽤「字节对⻬」的⽅式分配内存,虽然 char 类型只占⼀个字节,但是由于成员变量⾥有 int 类型,它占⽤了 4 个字节,所以在成员变量为 char 类型分配内存时,会分配 4 个字节,其中这多余的 3 个字节是为了字节对⻬⽽分配的,相当于有 3 个字节被浪费掉了。
如果不想编译器使⽤字节对⻬的⽅式进⾏分配内存,可以采⽤了 __attribute__ ((packed)) 属性定义结构体,这样⼀来,结构体实际占⽤多少内存空间,编译器就分配多少空间
Redis数据结构:intset

IntSet是Redis中set集合的一种实现方式,基于整数数组来实现,并且如下特征:

  • Redis会确保Intset中的元素唯一、有序
  • 具备类型升级机制,可以节省内存空间
  • 底层采用二分查找方式来查询
Redis中intset的结构体源码如下:

其中的encoding包含三种模式,表示存储的整数大小不同:

为了方便查找,Redis会将intset中所有的整数按照升序依次保存在contents数组中,结构如图:

现在,数组中每个数字都在int16_t的范围内,因此采用的编码方式是INTSET_ENC_INT16,每部分占用的字节大小为:

  • encoding:4字节
  • length:4字节
  • contents:2字节 * 3  = 6字节

假如,现在向其中添加一个数字:50000,这个数字超出了int16_t的范围,intset会自动升级编码方式到合适的大小
以当前案例来说流程如下:

  • 升级编码为INTSET_ENC_INT32, 每个整数占4字节,并按照新的编码方式及元素个数扩容数组
  • 倒序依次将数组中的元素拷贝到扩容后的正确位置。PS:倒序保证数据不乱
  • 将待添加的元素放入数组末尾
  • 最后,将inset的encoding属性改为INTSET_ENC_INT32,将length属性改为4

上述逻辑的源码如下:


  • 问题:intset支持降级操作吗?
不⽀持降级操作,⼀旦对数组进⾏了升级,就会⼀直保持升级后的状态。如:前面已经从INTSET_ENC_INT16(2字节整数)升级到INTSET_ENC_INT32(4字节整数),就算删除50000元素,intset集合的类型也还是INTSET_ENC_INT32类型,不会降级为INTSET_ENC_INT16类型
Redis数据结构:Dict

Redis是一个键值型(Key-Value Pair)的数据库,我们可以根据键实现快速的增删改查。而键与值的映射关系正是通过Dict来实现的。
Dict由三部分组成,分别是:哈希表(DictHashTable)、哈希节点(DictEntry)、字典(Dict)

哈希表(Dictht)与哈希节点(DictEntry)


  • 哈希节点(DictEntry)


  • 结构⾥不仅包含指向键和值的指针,还包含了指向下⼀个哈希表节点的指针,这个指针可以将多个哈希值相同的键值对链接起来,以此来解决哈希冲突的问题,这就是链式哈希
  • dictEntry 结构⾥键值对中的值是⼀个「联合体 v」定义的,因此,键值对中的值可以是⼀个指向实际值的指针,或者是⼀个⽆符号的 64 位整数或有符号的 64 位整数或double 类的值。这么做的好处是可以节省内存空间,因为当「值」是整数或浮点数时,就可以将值的数据内嵌在 dictEntry
    结构⾥,⽆需再⽤⼀个指针指向实际的值,从⽽节省了内存空间

  • 哈希表(Dictht)

其中:

  • 哈希表大小size初始值为4,而且其值总等于2^n


  • 为什么sizemask = size -1?
因为size总等于2^n,所以size -1就为奇数,这样”key利用hash函数得到的哈希值h & sizemask”做与运算就正好得到的是size的低位,而这个低位值也正好和“哈希值 % 哈希表⼤⼩”取模一样
当我们向Dict添加键值对时,Redis首先根据key计算出hash值(h),然后利用 h & sizemask 来计算元素应该存储到数组中的哪个索引位置
假如存储k1=v1,假设k1的哈希值h =1,则1&3 =1,因此k1=v1要存储到数组角标1位置
假如此时又添加了一个k2=v2,那么就会添加到链表的队首
哈希冲突解决方式:链式哈希


就是每个哈希表节点都有⼀个 next 指针,⽤于指向下⼀个哈希表节点,因此多个哈希表节点可以⽤ next 指针构成⼀个单项链表,被分配到同⼀个哈希桶上的多个节点可以⽤这个单项链表连接起来
不过,链式哈希局限性也很明显,随着链表⻓度的增加,在查询这⼀位置上的数据的耗时就会增加,毕竟链表的查询的时间复杂度是 O(n),需要解决就得扩容
Dict的扩容与收缩

扩容

Dict中的HashTable就是数组结合单向链表的实现,当集合中元素较多时,必然导致哈希冲突增多,链表过长,则查询效率会大大降低
Dict在每次新增键值对时都会检查负载因子(LoadFactor = used/size,即负载因子=哈希表已保存节点数量 / 哈希表大小) ,满足以下两种情况时会触发哈希表扩容:

  • 哈希表的 LoadFactor >= 1;并且服务器没有执行 bgsave或者 bgrewiteaof 等后台进程。也就是没有执⾏ RDB 快照或没有进⾏ AOF 重写的时候
  • 哈希表的 LoadFactor > 5 ;此时说明哈希冲突⾮常严重了,不管有没有有在执⾏ RDB 快照或 AOF重写都会强制执行哈希扩容
源码逻辑如下:

收缩

Dict除了扩容以外,每次删除元素时,也会对负载因子做检查,当LoadFactor < 0.1 时,会做哈希表收缩

字典(Dict)


Redis 定义⼀个 dict 结构体,这个结构体⾥定义了两个哈希表(dictht ht[2])
在正常服务请求阶段,插⼊的数据,都会写⼊到「哈希表 1」,此时的「哈希表 2 」 并没有被分配空间(这个哈希表涉及到渐进式rehash)

Dict的渐进式rehash

rehash基本流程


不管是扩容还是收缩,必定会创建新的哈希表,导致哈希表的size和sizemask(sizemask  = size -1)变化,而key的查询与sizemask有关(key通过hash函数计算得到哈希值h,数据存储的角标值 = h & sizemask)。因此必须对哈希表中的每一个key重新计算索引,插入新的哈希表,这个过程称为rehash
过程如下:

  • 计算新hash表的realeSize,值取决于当前要做的是扩容还是收缩:


  • 如果是扩容,则新size为第一个大于等于dict.ht[0].used + 1的2^n
  • 如果是收缩,则新size为第一个大于等于dict.ht[0].used的2^n (不得小于4)

  • 按照新的realeSize申请内存空间,创建dictht,并赋值给dict.ht[1]
  • 设置dict.rehashidx = 0,标示开始rehash
  • 将dict.ht[0]中的每一个dictEntry都rehash到dict.ht[1]
  • 将dict.ht[1]赋值给dict.ht[0],给dict.ht[1]初始化为空哈希表,释放原来的dict.ht[0]的内存
以上过程对于小数据影响小,但是对于大数据来说就有问题了,如果「哈希表 1 」的数据量⾮常⼤,那么在迁移⾄「哈希表 2 」的时候,因为会涉及⼤量的数据拷⻉,此时可能会对 Redis 造成阻塞,⽆法服务其他请求,因此就需要渐进式rehash
渐进式rehash

为了避免 rehash 在数据迁移过程中,因拷⻉数据的耗时,影响 Redis 性能的情况,所以 Redis 采⽤了渐进式 rehash,也就是将数据的迁移的⼯作不再是⼀次性迁移完成,⽽是分多次迁移
Dict的rehash并不是一次性完成的。试想一下,如果Dict中包含数百万的entry,要在一次rehash完成,极有可能导致主线程阻塞。所以Dict的rehash是分多次、渐进式的完成,因此称为渐进式rehash。过程如下:

  • 计算新hash表的realeSize,值取决于当前要做的是扩容还是收缩:


  • 如果是扩容,则新size为第一个大于等于dict.ht[0].used + 1的2^n
  • 如果是收缩,则新size为第一个大于等于dict.ht[0].used的2^n (不得小于4)

  • 按照新的realeSize申请内存空间,创建dictht,并赋值给dict.ht[1]
  • 设置dict.rehashidx = 0,标示开始rehash
  • 将dict.ht[0]中的每一个dictEntry都rehash到dict.ht[1]
  • 每次执行新增、删除、查询、修改操作时,除了执行对应操作之外,还会都检查一下dict.rehashidx是否大于 -1;若是则按顺序将dict.ht[0].table[rehashidx]的entry链表rehash到dict.ht[1],并且将rehashidx++,直至dict.ht[0]的所有资源都rehash到dict.ht[1]
PS:随着处理客户端发起的哈希表操作请求数量越多,最终在某个时间点,会把「哈希表 1 」的所有key-value 迁移到「哈希表 2」

  • 将dict.ht[1]赋值给dict.ht[0],给dict.ht[1]初始化为空哈希表,释放原来的dict.ht[0]的内存
  • 将rehashidx赋值为-1,代表rehash结束
  • 在rehash过程中,新增操作,则直接写入ht[1],查询、修改和删除则会在dict.ht[0]和dict.ht[1]依次查找并执行。这样可以确保ht[0]的数据只减不增,随着rehash最终为空
上述流程动画图如下:

Redis数据结构:ZipList

压缩列表的最⼤特点,就是它被设计成⼀种内存紧凑型的数据结构,占⽤⼀块连续的内存空间,不仅可以利⽤ CPU 缓存,⽽且会针对不同⻓度的数据,进⾏相应编码,这种⽅法可以有效地节省内存开销
但是,压缩列表的缺陷也是有的:

  • 不能保存过多的元素,否则查询效率就会降低;
  • 新增或修改某个元素时,压缩列表占⽤的内存空间需要重新分配,甚⾄可能引发连锁更新的问题
压缩列表是 Redis 为了节约内存⽽开发的,它是由连续内存块组成的顺序型数据结构,有点类似于数组:

属性类型长度用途zlbytesuint32_t4 字节记录整个压缩列表占⽤对内存字节数zltailuint32_t4 字节记录压缩列表尾节点距离压缩列表的起始地址有多少字节,通过这个偏移量,可以确定表尾节点的地址zllenuint16_t2 字节记录了压缩列表包含的节点数量。
最大值为UINT16_MAX (65534),如果超过这个值,此处会记录为65535,但节点的真实数量需要遍历整个压缩列表才能计算得出。entry列表节点不定压缩列表包含的各个节点,节点的长度由节点保存的内容决定。zlenduint8_t1 字节特殊值 0xFF (十进制 255 ),用于标记压缩列表的末端。
在压缩列表中,如果我们要查找定位第⼀个元素和最后⼀个元素,可以通过表头三个字段的⻓度直接定位,复杂度是 O(1)。⽽查找其他元素时,就没有这么⾼效了,只能逐个查找,此时的复杂度就是 O(N)了,因此压缩列表不适合保存过多的元素
ZipList的Entry结构

ZipList 中的Entry并不像普通链表那样记录前后节点的指针,因为记录两个指针要占用16个字节,浪费内存。而是采用了下面的结构:


  • previous_entry_length:前一节点的长度,占1个或5个字节。PS:这个点涉及到“连锁更新”问题

    • 如果前一节点的长度小于254字节,则采用1个字节来保存这个长度值
    • 如果前一节点的长度大于254字节,则采用5个字节来保存这个长度值,第一个字节为0xfe,后四个字节才是真实长度数据

  • encoding:编码属性,记录content的数据类型(字符串还是整数)以及长度,占用1个、2个或5个字节
  • contents:负责保存节点的数据,可以是字符串或整数
ZipList中所有存储长度的数值均采用小端字节序,即低位字节在前,高位字节在后。例如:数值0x1234,采用小端字节序后实际存储值为:0x3412
PS:人的阅读习惯是从左到右,即大端字节序,机器读取数据是反着的,所以采用小端字节序,从而先处理低位,再处理高位
ZipList的Entry中的encoding编码

当我们往压缩列表中插⼊数据时,压缩列表就会根据数据是字符串还是整数,以及数据的⼤⼩,会使⽤不同空间⼤⼩的 previous_entry_length和 encoding 这两个元素保存的信息
previous_entry_length的规则上一节中已经提到了,接下来看看encoding
encoding 属性的空间⼤⼩跟数据是字符串还是整数,以及字符串的⻓度有关:

  • 如果当前节点的数据是整数,则 encoding 会使⽤ 1 字节的空间进⾏编码。
  • 如果当前节点的数据是字符串,根据字符串的⻓度⼤⼩,encoding 会使⽤ 1 字节/2字节/5字节的空间进⾏编码

  • 当前节点的数据是整数
如果encoding是以“11”开始,则证明content是整数,且encoding固定只占用1个字节
编码编码长度整数类型110000001int16_t(2 bytes)110100001int32_t(4 bytes)111000001int64_t(8 bytes)11110000124位有符整数(3 bytes)1111111018位有符整数(1 bytes)1111xxxx1直接在xxxx位置保存数值,范围从0001~1101,减1后结果为实际值

  • 当前节点的数据是字符串
如果encoding是以“00”、“01”或者“10”开头(即整数是11开头,排除这种情况剩下的就是字符串),则证明content是字符串
[table][tr]编码编码长度字符串大小[/tr][tr][td]|00pppppp|[/td][td]1 bytes[/td][td]

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

守听

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表