天机学堂7--Redisson自定义注解AOP以及SPEL表达式实现分布式锁 ...

锦通  论坛元老 | 2025-1-25 08:27:03 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1015|帖子 1015|积分 3045

集群下的锁失效题目

Synchronized中的重量级锁,底层就是基于锁监视器(Monitor)来实现的。简单来说就是锁对象头会指向一个锁监视器,而在监视器中则会记录一些信息,比如:


  • _owner:持有锁的线程
  • _recursions:锁重入次数
每一个锁对象,都会指向一个锁监视器,而每一个锁监视器,同一时刻只能被一个线程持有,这样就实现了互斥效果。但前提是,多个线程使用的是同一把锁。
但题目来了,我们的服务将来肯定会多实例不是,形成集群。每一个实例都会有一个自己的JVM运行环境,因此即便是同一个用户,如果并发的发起了多个请求,由于请求进入了多个JVM,就会出现多个锁对象(用户id对象),自然就有多个锁监视器。此时就会出现每个JVM内部都有一个线程获取锁乐成的环境,没有达到互斥的效果,并发安全题目就大概再次发生了:

我们不能让每个实例去使用各自的JVM内部锁监视器,而是应该在多个实例外部寻找一个锁监视器,多个实例争抢同一把锁。像这样的锁,就称为分布式锁。
分布式锁必须要满足的特征:


  • 多JVM实例都可以访问
  • 互斥
能满足上述特征的组件有很多,因此实现分布式锁的方式也非常多,比方:


  • 基于MySQL
  • 基于Redis
  • 基于Zookeeper
  • 基于ETCD
    但目前使用最广泛的还应该是基于Redis的分布式锁。
Redis中的setnx命令实现分布式锁

Redis本身可以被恣意JVM实例访问,同时Redis中的setnx命令具备互斥性,因此符合分布式锁的需求
setnx基本原理

Redis的setnx命令是对string范例数据的操作,语法如下:
给key赋值为value: SETNX key value
当前仅当key不存在的时间,setnx才能实行乐成,而且返回1,其它环境都会实行失败,而且返回0.我们就可以认为返回值是1就是获取锁乐成,返回值是0就是获取锁失败,实现互斥效果。
而当业务实行完成时,我们只需要删除这个key即可开释锁。这个时间其它线程又可以再次获取锁(实行setnx乐成)了。
删除指定key,用来开释锁: DEL key
死锁题目

不过我们要考虑一种极端环境,比如我们获取锁乐成,还未开释锁呢当前实例突然宕机了!那么开释锁的逻辑自然就永远不会被实行,这样lock就永远存在,再也不会有其它线程获取锁乐成了!出现了死锁题目: 利用Redis的KEY过期时间机制,在获取锁时给锁添加一个超时时间:
获取锁,并记录持有锁的线程: SETNX lock thread1
设置过期时间,避免死锁: EXPIRE lock 20
这里我们设置超时时间为20秒,远超任务实行时间。当业务正常实行时,这个过期时间不起作用
但是如果当前服务实例宕机,DEL无法实行。但由于我们设置了20秒的过期时间,当超过这个时间时,锁会由于过期被删除,因此就等于开释锁了,从而避免了死锁题目。这种策略就是超时开释锁策略。
但新的题目来了,SETNX和EXPIRE是两条命令,如果我实行完SETNX,还没来得急实行EXPIRE时服务已经宕机了,这样加锁乐成,但锁超时时间依然没能设置!死锁题目岂不是再次发生了?!
所以,必须包管SETNX和EXPIRE两个操作的原子性。究竟上,Redis中的set命令就能同时实现setnx和expire的效果:
NX 等同于SETNX lock thread1效果, EX 20 等同于 EXPIRE lock 20效果
SET lock thread1 NX EX 20
利用Redis实现的简单分布式锁流程


  1. public class RedisLock {
  2.     private final String key;
  3.     private final StringRedisTemplate redisTemplate;
  4.     /**
  5.      * 尝试获取锁
  6.      * @param leaseTime 锁自动释放时间
  7.      * @param unit 时间单位
  8.      * @return 是否获取成功,true:获取锁成功;false:获取锁失败
  9.      */
  10.     public boolean tryLock(long leaseTime, TimeUnit unit){
  11.         // 1.获取线程名称
  12.         String threadValue = Thread.currentThread().getName();
  13.         // 2.获取锁
  14.         Boolean success = redisTemplate.opsForValue().setIfAbsent(key, threadValue, leaseTime, unit);
  15.         // 3.返回结果
  16.         return BooleanUtils.isTrue(success);
  17.     }
  18.     /**
  19.      * 释放锁
  20.      */
  21.     public void unlock(){
  22.         redisTemplate.delete(key);
  23.     }
  24. }
复制代码
setnx的分布式锁的题目

锁误删题目

比方,有线程1获取锁乐成,而且实行完任务,正准备开释锁,但是由于某种原因导致线程1开释锁的操作被阻塞了,直到锁被超时开释。就在此时,有一个新的线程2来尝试获取锁。由于线程1的锁被超时开释,因此线程2是可以获取锁乐成的。而就在此时,线程1醒来,继续实行开释锁的操作,也就是DEL.结果就把线程2的锁给删除了。然而此时线程2还在实行任务,如果有其它线程再来获取锁,就会认为无人持有锁从而获取锁乐成,于是多个线程再次并行实行,并发安全题目就大概再次发生了
办理方法:开释锁前要检查是不是自己的锁
超时开释题目

线程1获取锁乐成,而且实行业务完成,而且也判定了锁标示,确实与自己一致:
接下来,线程1应该去开释自己的锁了,可就在此时发生了阻塞!直到锁超时开释:然后,线程2来获取锁,又和上面一样了。
总结一下,误删的原因归根结底是由于什么?


  • 超时开释
  • 判定锁标示、删除锁两个动作不是原子操作
操作锁的多行命令又该如何确保原子性?
其它题目

除了上述题目以外,分布式锁还会遇到一些其它题目:


  • 锁的重入题目:同一个线程多次获取锁的场景,目前不支持,大概会导致死锁
  • 锁失败的重试题目:获取锁失败后要不要重试?目前是直接失败,不支持重试
  • Redis主从的一致性题目:由于主从同步存在延迟,当线程在主节点获取锁后,从节点大概未同步锁信息。如果此时主宕机,会出现锁失效环境。此时会有其它线程也获取锁乐成。从而出现并发安全题目。

固然,上述题目并非无法办理,只不过会比力麻烦。比方:


  • 原子性题目:可以利用Redis的LUA脚本来编写锁操作,确保原子性
  • 超时题目:利用WatchDog(看门狗)机制,获取锁乐成时开启一个定时任务,在锁到期前自动续期,避免超时开释。而当服务宕机后,WatchDog跟着停止运行,不会导致死锁。
  • 锁重入题目:可以模拟Synchronized原理,放弃setnx,而是利用Redis的Hash结构来记录锁的持有者以及重入次数,获取锁时重入次数+1,开释锁是重入次数-1,次数为0则锁删除
  • 主从一致性题目:可以利用Redis官网推荐的RedLock机制来办理
这些办理方案实现起来比力复杂,因此我们通常会使用一些开源框架来实现分布式锁,而不是自己来编码实现。目前对这些办理方案实现的比力完满的一个第三方组件:Redisson
Redisson


首先引入依赖:
  1. <dependency>
  2.     <groupId>org.redisson</groupId>
  3.     <artifactId>redisson</artifactId>
  4. </dependency>
复制代码
然后是设置:
  1. @Configuration
  2. public class RedisConfig {
  3.    @Bean
  4.    public RedissonClient redissonClient() {
  5.        // 配置类
  6.        Config config = new Config();
  7.        // 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
  8.        config.useSingleServer()
  9.            .setAddress("redis://192.168.150.101:6379")
  10.            .setPassowrd("123321");
  11.        // 创建客户端
  12.        return Redisson.create(config);
  13.    }
  14. }
复制代码
tj-common内里已经设置了,所以不需要重复设置

最后是基本用法:
  1. @Autowired
  2. private RedissonClient redissonClient;
  3. @Test
  4. void testRedisson() throws InterruptedException {
  5.   // 1.获取锁对象,指定锁名称
  6.   RLock lock = redissonClient.getLock("anyLock");//anylock是锁的名字也是redis的键值
  7.   try {
  8.       // 2.尝试获取锁,参数:waitTime、leaseTime、时间单位
  9.       boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
  10.       if (!isLock) {
  11.           // 获取锁失败处理 ..
  12.       } else {
  13.           // 获取锁成功处理
  14.       }
  15.   } finally {
  16.       // 4.释放锁
  17.       lock.unlock();//判断锁是否属于自己+原子性都有实现
  18.   }
  19. }
复制代码

Watch Dog看门狗不能设置失效时间,会设置默认的失效时间。
Redisson办理上面的题目:

  • 原子性: Lua包管 判定锁是不是自己的,操作的原子性
  • 超时题目:Watch Dog看门狗,会专门创建一个线程,监控当前的分布式锁有没有结束,(如果正在使用着锁)没有结束的话会每10s调解过期时间。就检测到正在使用着会把过期时间重置回30s, 不消担心“我正在用着锁开释了”导致的安全题目
  • 不可重入:Redis的Hash结构来记录锁的持有者以及重入次数
  • 失败重试:redis的发布订阅(Pub/Sub)
  • 主从一致性题目:可以利用Redis官网推荐的RedLock机制来办理。



向games频道发送消息,上面订阅了games频道的就会收到消息’hello’

基于注解的分布式锁

基于AOP的思想,将业务部分作为切入点,将业务前后的锁操作作为环绕加强。注解的焦点作用是两个:


  • 标记切入点
  • 传递锁参数
注解本身起到标记作用,同时还要带上锁参数:


  • 锁名称
  • 锁等待时间
  • 锁超时时间
  • 时间单位
Step1:自定义注解锁:
  1. package com.tianji.promotion.annotation;
  2. import com.tianji.promotion.enums.MyLockStrategy;
  3. import com.tianji.promotion.enums.MyLockType;
  4. import java.lang.annotation.ElementType;
  5. import java.lang.annotation.Retention;
  6. import java.lang.annotation.RetentionPolicy;
  7. import java.lang.annotation.Target;
  8. import java.util.concurrent.TimeUnit;
  9. @Retention(RetentionPolicy.RUNTIME) // 运行时生效
  10. @Target(ElementType.METHOD) // 作用于方法
  11. public @interface MyLock {
  12.     String name();  // 锁名称
  13.     long waitTime() default 1;  // 申请锁的等待时间
  14.     long leaseTime() default -1;    // 持有锁的TTL有效时间
  15.     TimeUnit unit() default TimeUnit.SECONDS;   // 时间单位
  16. }
复制代码
Step2:定义切面类:

Step3:定义好了锁注解和切面,接下来就可以改造业务了:

怎么定义@Transactional和@MyLock的顺序,默认事务的实行顺序比力靠后(其注解内里的order值较高顺序靠后)。 所以 能包管是 先获取锁再实行事务
工厂模式 选择锁范例

Step1: 在注解 锁MyLock内里加入一个属性
锁的范例,默认为可重入锁,由工厂模式根据lockType进行创建
  1. MyLockType lockType() default MyLockType.RE_ENTRANT_LOCK;
复制代码
Step2: 在切面类中创建锁对象(更新为工厂模式创建)
  1. private final MyLockFactory myLockFactory;
  2. //RLock lock = redissonClient.getLock(myLock.name());  // 只能获取可重入锁
  3. RLock lock = myLockFactory.getLock(myLock.lockType(), lockName);
复制代码
工厂模式MyLockFactory:
  1. import com.tianji.promotion.enums.MyLockType;
  2. import org.redisson.api.RLock;
  3. import org.redisson.api.RedissonClient;
  4. @Component
  5. public class MyLockFactory {
  6.         // 锁对象类型,方法引用
  7.     private final Map<MyLockType, Function<String, RLock>> lockHandlers;
  8.     /**
  9.      * 使用工厂模式,来实现不同的锁类型
  10.      *
  11.      * @param redissonClient Redisson 客户端实例
  12.      */
  13.     public MyLockFactory(RedissonClient redissonClient) {
  14.         // 初始化锁处理器映射表
  15.         this.lockHandlers = new EnumMap<>(MyLockType.class);
  16.         // 添加不同类型的锁处理器到映射表中
  17.         this.lockHandlers.put(MyLockType.RE_ENTRANT_LOCK, redissonClient::getLock);
  18.         this.lockHandlers.put(MyLockType.FAIR_LOCK, redissonClient::getFairLock);
  19.         this.lockHandlers.put(MyLockType.READ_LOCK, name -> redissonClient.getReadWriteLock(name).readLock());
  20.         this.lockHandlers.put(MyLockType.WRITE_LOCK, name -> redissonClient.getReadWriteLock(name).writeLock());
  21.     }
  22.     /**
  23.      * 获取指定类型的锁实例
  24.      *
  25.      * @param lockType 锁类型
  26.      * @param name     锁名称
  27.      * @return 对应类型的锁实例
  28.      */
  29.     public RLock getLock(MyLockType lockType, String name){
  30.         // get获取锁类型的引用,apply调用对应的创建方法
  31.         return lockHandlers.get(lockType).apply(name);
  32.     }
  33. }
复制代码
  1. public enum MyLockType {
  2.     RE_ENTRANT_LOCK, // 可重入锁
  3.     FAIR_LOCK, // 公平锁
  4.     READ_LOCK, // 读锁
  5.     WRITE_LOCK, // 写锁
  6.     ;
  7. }
复制代码
策略模式提供 重试策略 + 失败策略组合

定义罗列类,罗列Redisson分布式锁的锁失败的处置惩罚策略:
  1. import com.tianji.promotion.annotation.MyLock;
  2. import org.redisson.api.RLock;
  3. public enum MyLockStrategy {
  4.     SKIP_FAST(){    // 枚举项,快速结束 = 不重试+快速失败
  5.         @Override
  6.         public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {
  7.             return lock.tryLock(0, prop.leaseTime(), prop.unit());
  8.         }
  9.     },
  10.     FAIL_FAST(){    // 枚举项,快速失败 = 不重试+抛出异常
  11.         @Override
  12.         public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {
  13.             boolean isLock = lock.tryLock(0, prop.leaseTime(), prop.unit());
  14.             if (!isLock) {
  15.                 throw new BizIllegalException("请求太频繁");
  16.             }
  17.             return true;
  18.         }
  19.     },
  20.     KEEP_TRYING(){  // 枚举项,无限重试
  21.         @Override
  22.         public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {
  23.             lock.lock( prop.leaseTime(), prop.unit());
  24.             return true;
  25.         }
  26.     },
  27.     SKIP_AFTER_RETRY_TIMEOUT(){ // 枚举项,重试超时后结束 = 有限重试+直接结束
  28.         @Override
  29.         public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {
  30.             return lock.tryLock(prop.waitTime(), prop.leaseTime(), prop.unit());
  31.         }
  32.     },
  33.     FAIL_AFTER_RETRY_TIMEOUT(){ // 枚举项,重试超时后失败 = 有限重试+抛出异常
  34.         @Override
  35.         public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {
  36.             boolean isLock = lock.tryLock(prop.waitTime(), prop.leaseTime(), prop.unit());
  37.             if (!isLock) {
  38.                 throw new BizIllegalException("请求太频繁");
  39.             }
  40.             return true;
  41.         }
  42.     },
  43.     ;
  44.     public abstract boolean tryLock(RLock lock, MyLock prop) throws InterruptedException;
  45. }
复制代码
和工厂模式 选择锁范例一样, 在注解 锁MyLock内里加入一个属性,实现可选策略:
  1. // 锁的失败策略,默认为重试超时后失败(有限重试,失败后抛出异常),由工厂模式根据lockType进行创建
  2. MyLockStrategy lockStrategy() default MyLockStrategy.FAIL_AFTER_RETRY_TIMEOUT;
复制代码
修改切面代码,基于用户选择的策略来处置惩罚:

就可以在使用锁的时间自由选择锁范例、锁策略了:

基于SPEL的动态锁名

现在实现的锁版本还没有userID

在当前业务中,我们的锁对象本来应该是当前登录用户,是动态获取的。而加锁是基于注解参数添加的,在编码时就需要指定。怎么办?
Spring中提供了一种表达式语法,称为SPEL表达式,可以实行java代码,获取恣意参数。
思路:
我们可以让用户指定锁名称参数时不要写死,而是基于SPEL表达式。在创建锁对象时,剖析SPEL表达式,动态获取锁名称。
首先,在使用锁注解时,锁名称可以利用SPEL表达式,比方我们指定锁名称中要包罗参数中的用户id,则可以这样写:

而如果是通过UserContext.getUser()获取,则可以利用下面的语法:
在这里插入图片形貌
这里T(类名).方法名()就是调用静态方法。
获取锁名称用的是getLockName()这个方法:
  1. /**
  2. * SPEL的正则规则
  3. */
  4. private static final Pattern pattern = Pattern.compile("\\#\\{([^\\}]*)\\}");
  5. /**
  6. * 方法参数解析器
  7. */
  8. private static final ParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer();
  9. /**
  10. * 解析锁名称
  11. * @param name 原始锁名称
  12. * @param pjp 切入点
  13. * @return 解析后的锁名称
  14. */
  15. private String getLockName(String name, ProceedingJoinPoint pjp) {
  16.     // 1.判断是否存在spel表达式
  17.     if (StringUtils.isBlank(name) || !name.contains("#")) {
  18.         // 不存在,直接返回
  19.         return name;
  20.     }
  21.     // 2.构建context,也就是SPEL表达式获取参数的上下文环境,这里上下文就是切入点的参数列表
  22.     EvaluationContext context = new MethodBasedEvaluationContext(
  23.             TypedValue.NULL, resolveMethod(pjp), pjp.getArgs(), parameterNameDiscoverer);
  24.     // 3.构建SPEL解析器
  25.     ExpressionParser parser = new SpelExpressionParser();
  26.     // 4.循环处理,因为表达式中可以包含多个表达式
  27.     Matcher matcher = pattern.matcher(name);
  28.     while (matcher.find()) {
  29.         // 4.1.获取表达式
  30.         String tmp = matcher.group();
  31.         String group = matcher.group(1);
  32.         // 4.2.这里要判断表达式是否以 T字符开头,这种属于解析静态方法,不走上下文
  33.         Expression expression = parser.parseExpression(group.charAt(0) == 'T' ? group : "#" + group);
  34.         // 4.3.解析出表达式对应的值
  35.         Object value = expression.getValue(context);
  36.         // 4.4.用值替换锁名称中的SPEL表达式
  37.         name = name.replace(tmp, ObjectUtils.nullSafeToString(value));
  38.     }
  39.     return name;
  40. }
  41. private Method resolveMethod(ProceedingJoinPoint pjp) {
  42.     // 1.获取方法签名
  43.     MethodSignature signature = (MethodSignature)pjp.getSignature();
  44.     // 2.获取字节码
  45.     Class<?> clazz = pjp.getTarget().getClass();
  46.     // 3.方法名称
  47.     String name = signature.getName();
  48.     // 4.方法参数列表
  49.     Class<?>[] parameterTypes = signature.getMethod().getParameterTypes();
  50.     return tryGetDeclaredMethod(clazz, name, parameterTypes);
  51. }
  52. private Method tryGetDeclaredMethod(Class<?> clazz, String name, Class<?> ... parameterTypes){
  53.     try {
  54.         // 5.反射获取方法
  55.         return clazz.getDeclaredMethod(name, parameterTypes);
  56.     } catch (NoSuchMethodException e) {
  57.         Class<?> superClass = clazz.getSuperclass();
  58.         if (superClass != null) {
  59.             // 尝试从父类寻找
  60.             return tryGetDeclaredMethod(superClass, name, parameterTypes);
  61.         }
  62.     }
  63.     return null;
  64. }
复制代码
剖析SPEL
在切面中,我们需要基于注解中的锁名称做动态剖析,而不是直接使用名称:

相关知识点

自定义注解

  1. @Retention(RetentionPolicy.RUNTIME)
  2. @Target({ElementType.METHOD})
  3. @Documented
  4. public @interface xxxx{
  5. }
复制代码
通知类:内里的切点表达式,public是方法返回范例,路径…*是service及其子包下的类 后面的.*是恣意方法 (…)指恣意参数

不消切点表达式而是用注解 控制仅实现类中的一个方法:(@annotation(路径.实现类中方法的注解)
注解直接写到方法的参数上:@Around("@annotation(xxx类)")

以上图为例,只要方法加了@printTime注解,方法就是切点,再走这个方法之前就会走环绕通知@Around() around()方法
Redis 的 Pub/Sub (发布/订阅) 是一种消息传递机制,它允许客户端订阅一个或多个频道,并接收其他客户端发布到这些频道的消息。在使用 Redis Pub/Sub 的过程中,大概会遇到由于网络故障、订阅客户端崩溃或其他原因导致消息接收失败的环境。因此,失败重试机制可以资助包管消息在分布式环境下的可靠性。
Redis Pub/Sub 的工作原理



  • 发布者 (Publisher) 将消息发布到指定的频道。
  • 订阅者 (Subscriber) 订阅频道,并监听来自该频道的消息。
  • 当有消息发布到订阅的频道时,Redis 会将这些消息推送到全部订阅了该频道的客户端。
失败重试机制在 Redis Pub/Sub 中的应用

在 Redis Pub/Sub 中,如果出现网络题目或客户端挂掉导致的消息丢失,默认环境下消息不会被重试或保存(Redis 本身不支持长期化消息)。因此,如果需要实现失败重试机制,可以采取以下几种策略:
1. 消息确认和重试机制



  • 题目:如果消息在订阅者接收时失败(比如网络停止、订阅者崩溃等),这些消息会丢失。
  • 办理方案:一种常见的做法是通过消息确认机制来实现重试。每个消息可以通过客户端进行确认,如果未乐成处置惩罚消息,则将其重新发布到一个队列或另一个频道,等待下一次重试。
实现方式


  • 订阅者在接收到消息时,需要向发布者或消息队列发送确认信号。如果在一定时间内没有收到确认,可以将该消息重新推送到某个死信队列(Dead Letter Queue,DLQ)大概一个等待重试的队列中,比及订阅者规复正常后,再进行重试。
示例
  1. // Redis Pub/Sub 订阅者代码(使用 Jedis 客户端)
  2. public class MySubscriber extends JedisPubSub {
  3.     @Override
  4.     public void onMessage(String channel, String message) {
  5.         try {
  6.             // 处理消息
  7.             processMessage(message);
  8.             // 发送确认信号
  9.             sendAcknowledgment(message);
  10.         } catch (Exception e) {
  11.             // 处理失败,重试机制
  12.             handleFailure(message);
  13.         }
  14.     }
  15.     private void handleFailure(String message) {
  16.         // 如果处理失败,可以将消息重新推送到一个队列或保存到死信队列
  17.         redisClient.lpush("retryQueue", message);
  18.     }
  19.     // 确认消息已处理
  20.     private void sendAcknowledgment(String message) {
  21.         redisClient.publish("acknowledgmentChannel", message); // 可选的确认机制
  22.     }
  23. }
复制代码
2. 使用 Redis Streams 替换 Pub/Sub

Redis Streams 是一种基于日志的消息队列结构,适合需要消息长期化和重试的场景。与传统的 Pub/Sub 模式不同,Redis Streams 可以存储消息,而且订阅者可以从流的恣意位置读取消息,避免了消息丢失的题目。
特点


  • 长期化:Redis Streams 会长期化消息到磁盘,避免了消息丢失的风险。
  • 消息确认和重试:消息的消费者可以通过 XACK 命令显式地确认消息,如果没有确认,Redis 可以将消息重新分配给其他消费者或重新发送给原消费者进行重试。
示例
  1. // 发布消息到 Redis Stream
  2. redisClient.xadd("mystream", Map.of("message", "hello"));
  3. // 订阅者处理消息
  4. while (true) {
  5.     List<Map.Entry<String, List<StreamEntry>>> messages = redisClient.xread(
  6.             StreamEntryID.UNRECEIVED, Map.of("mystream", "0"));
  7.    
  8.     for (Map.Entry<String, List<StreamEntry>> streamEntry : messages) {
  9.         for (StreamEntry entry : streamEntry.getValue()) {
  10.             try {
  11.                 processMessage(entry.getFields().get("message"));
  12.                 redisClient.xack("mystream", "consumerGroup", entry.getID()); // 消息确认
  13.             } catch (Exception e) {
  14.                 // 处理失败,可以重新发送消息进行重试
  15.                 redisClient.xadd("retryQueue", Map.of("message", entry.getFields().get("message")));
  16.             }
  17.         }
  18.     }
  19. }
复制代码
使用 Redis Streams 的优势


  • 消息不丢失:Stream 中的消息会长期化在 Redis 中,订阅者可以在后续任何时间读取。
  • 自动重试:可以通过消费组的方式来确保如果某个消费者失败,其他消费者会接管任务。
  • 确认机制:消费者可以确认已处置惩罚的消息,如果没有确认,Redis 会重新分配任务。
3. 使用 Redis Pub/Sub 的失败重试方案

如果仍然希望使用 Redis 的传统 Pub/Sub 模式并实现某种程度的消息重试,可以结合一些外部机制,比方将消息发布到 Redis Pub/Sub 频道后,同时将消息也存储到一个队列中(如 Redis List、Redis Stream),然后通过定时任务或背景进程来检查未确认的消息。
步骤


  • 订阅者从 Redis 频道获取消息时,在处置惩罚前将消息的 ID 记录下来。
  • 如果处置惩罚失败,订阅者会将消息 ID 添加到一个待重试的队列中(比方 Redis List 或 Stream)。
  • 定期检查待重试队列并重试这些消息。
总结

固然 Redis Pub/Sub 本身并不直接支持消息的失败重试机制,但可以通过以下几种方式来实现:


  • 使用 Redis Streams 取代 Pub/Sub,利用其消息长期化和消费确认功能来实现失败重试。
  • 使用 消息确认机制,结合 Redis 队列(如 List、Stream)将失败的消息重新推送,进行后续重试。
  • 如果不希望改变现有的 Pub/Sub 模式,可以通过背景任务周期性地重试失败消息,将消息记录在专门的队列中。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

锦通

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表