Redis 事件订阅在 Spring 中的使用

打印 上一主题 下一主题

主题 1683|帖子 1683|积分 5049

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
Redis 事件订阅功能

Redis 的 key 事件机制允许客户端订阅接收 Redis 数据事件. 可以接收的事件有

  • 所有影响指定 KEY 的命令
  • 所有 KEY 的 LPUSH 操作
  • 所有 KEY 的过期事件
事件是通过 Redis 的 Pub/Sub 层传递的, 因此实现 Pub/Sub 的客户端都可以使用此功能. Redis 的 Pub/Sub 是 即发即弃(fire and forget)模式, 也就是说, 假如 Pub/Sub 客户端断开再重新连接, 那么在断开期间的所有事件都会丢失. 所以这个功能不适合需要可靠的事件通知的场景.
Redis 事件的命令实现

Redis 开启键空间通知

两种方式

  • redis-client 登录后通过命令设置开启 config set notify-keyspace-events Ex, 服务重启后失效
  • 编辑配置文件 redis.conf, 设置 notify-keyspace-events "Ex", 之后重启服务生效
客户端订阅键空间通知事件

留意 keyevent@0 前后是两个下划线

  • psubscribe __keyevent@*__:*: @后面的第一个*位置表示 Redis 的 0-15个DB, *表示所有DB, 第二个 * 的位置表示要监听的事件, *表示所有事件, 假如要指定过期事件, 可以改为 expired.
  • psubscribe __keyevent@0__:expired: 订阅 DB0 的所有key过期事件
订阅
  1. ./redis-cli -h 192.168.50.128
  2. 192.168.50.128:6379> psubscribe __keyevent@0__:expired
  3. Reading messages... (press Ctrl-C to quit)
  4. 1) "psubscribe"
  5. 2) "__keyevent@0__:expired"
  6. 3) (integer) 1
复制代码
设置一个key和超时时间
  1. 192.168.50.128:6379> setex name 10 Kitty
复制代码
基于key过期事件实现延时队列

Redis 过期事件传的是 key, 是不包含value的, 假如使用redis过期事件机制实现延时队列, key和value 需要另存一份

  • 存 key value
  • 存 key, 设置过期时间, 相当于过期队列
  • 客户端订阅键过期通知
  • 客户端收到 key 过期事件后, 根据 key 读取 value, 处置惩罚业务逻辑
留意

  • key 过期通知事件是在 key 被删除时触发的
  • 在 key 过期前假如主动删除该 key 不会触发过期通知事件.
spring-data-redis 的事件机制

在分布式系统中, 由于 redis 一般会用于跨模块的缓存和临时数据, 因此可以通过 redis 实现分布式的消息传递
spring-data-redis 内建两个 listener, 分别是

  • KeyExpirationEventMessageListener 监听key的过期事件
  • KeyspaceEventMessageListener 监听key的操作事件
监听过期事件

KeyExpirationEventMessageListener 默认监听的是所有key的expired事件
  1. private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");
  2. protected void doRegister(RedisMessageListenerContainer listenerContainer) {
  3.     listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);
  4. }
复制代码

  • 通过扩展这个类自界说监听器可以省掉将监听器添加到 RedisMessageListenerContainer 的步骤, 由于在 doRegister() 中已经做了添加到容器的处置惩罚
  • addMessageListener 时需要指定topic, __keyevent@*__:expired表示监听所有数据库中所有key的 expired 事件, 可以自界说自己需要监听的事件, 例如

    • __keyevent@0__:expired 仅监听 db0 的过期事件
    • __keyevent@0__:del 仅监听 db0 的删除事件

可以自界说扩展类中覆盖这个方法, 监听特定key, 以及对消息中的key进行业务处置惩罚
  1. @Component
  2. public class CustomKeyExpirationListener extends KeyExpirationEventMessageListener {
  3.     public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
  4.         super(listenerContainer);
  5.     }
  6.    
  7.     protected void doRegister(RedisMessageListenerContainer listenerContainer) {
  8.         listenerContainer.addMessageListener(this, new PatternTopic("__keyevent@0__:expired"));
  9.     }
  10.     @Override
  11.     public void onMessage(Message message, byte[] pattern) {
  12.         // 得到过期key
  13.         String expiredKey = message.toString();
  14.         // 业务处理
  15.     }
复制代码
监听key操作

KeyspaceEventMessageListener 在初始化时, 会对发布配置进行调整, 并将自己注册到容器中
  1. private String keyspaceNotificationsConfigParameter = "EA";
  2. public void init() {
  3.     if (StringUtils.hasText(this.keyspaceNotificationsConfigParameter)) {
  4.         RedisConnection connection = this.listenerContainer.getConnectionFactory().getConnection();
  5.         try {
  6.             Properties config = connection.getConfig("notify-keyspace-events");
  7.             if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) {
  8.                 connection.setConfig("notify-keyspace-events", this.keyspaceNotificationsConfigParameter);
  9.             }
  10.         } finally {
  11.             connection.close();
  12.         }
  13.     }
  14.     this.doRegister(this.listenerContainer);
  15. }
复制代码
默认监听所有key的所有事件
  1. private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*");
  2. protected void doRegister(RedisMessageListenerContainer container) {
  3.     this.listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS);
  4. }
复制代码
可以在扩展这个监听器时, 可以通过重写 doRegister 自界说需要监听的事件类型
  1. public class KeyDeleteEventMessageListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware {
  2.     private static final Topic KEYEVENT_DELETE_TOPIC = new PatternTopic("__keyevent@*__:del");
  3.     public KeyDeleteEventMessageListener(RedisMessageListenerContainer listenerContainer) {
  4.         super(listenerContainer);
  5.     }
  6.     protected void doRegister(RedisMessageListenerContainer listenerContainer) {
  7.         listenerContainer.addMessageListener(this, KEYEVENT_DELETE_TOPIC);
  8.     }
  9.     protected void doHandleMessage(Message message) {
  10.         // 业务处理
  11.     }
  12. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

瑞星

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表