集群下的锁失效题目
Synchronized中的重量级锁,底层就是基于锁监视器(Monitor)来实现的。简单来说就是锁对象头会指向一个锁监视器,而在监视器中则会记录一些信息,比如:
- _owner:持有锁的线程
- _recursions:锁重入次数
每一个锁对象,都会指向一个锁监视器,而每一个锁监视器,同一时刻只能被一个线程持有,这样就实现了互斥效果。但前提是,多个线程使用的是同一把锁。
但题目来了,我们的服务将来肯定会多实例不是,形成集群。每一个实例都会有一个自己的JVM运行环境,因此即便是同一个用户,如果并发的发起了多个请求,由于请求进入了多个JVM,就会出现多个锁对象(用户id对象),自然就有多个锁监视器。此时就会出现每个JVM内部都有一个线程获取锁乐成的环境,没有达到互斥的效果,并发安全题目就大概再次发生了:
我们不能让每个实例去使用各自的JVM内部锁监视器,而是应该在多个实例外部寻找一个锁监视器,多个实例争抢同一把锁。像这样的锁,就称为分布式锁。
分布式锁必须要满足的特征:
能满足上述特征的组件有很多,因此实现分布式锁的方式也非常多,比方:
- 基于MySQL
- 基于Redis
- 基于Zookeeper
- 基于ETCD
但目前使用最广泛的还应该是基于Redis的分布式锁。
Redis中的setnx命令实现分布式锁
Redis本身可以被恣意JVM实例访问,同时Redis中的setnx命令具备互斥性,因此符合分布式锁的需求
setnx基本原理
Redis的setnx命令是对string范例数据的操作,语法如下:
给key赋值为value: SETNX key value
当前仅当key不存在的时间,setnx才能实行乐成,而且返回1,其它环境都会实行失败,而且返回0.我们就可以认为返回值是1就是获取锁乐成,返回值是0就是获取锁失败,实现互斥效果。
而当业务实行完成时,我们只需要删除这个key即可开释锁。这个时间其它线程又可以再次获取锁(实行setnx乐成)了。
删除指定key,用来开释锁: DEL key
死锁题目
不过我们要考虑一种极端环境,比如我们获取锁乐成,还未开释锁呢当前实例突然宕机了!那么开释锁的逻辑自然就永远不会被实行,这样lock就永远存在,再也不会有其它线程获取锁乐成了!出现了死锁题目: 利用Redis的KEY过期时间机制,在获取锁时给锁添加一个超时时间:
获取锁,并记录持有锁的线程: SETNX lock thread1
设置过期时间,避免死锁: EXPIRE lock 20
这里我们设置超时时间为20秒,远超任务实行时间。当业务正常实行时,这个过期时间不起作用
但是如果当前服务实例宕机,DEL无法实行。但由于我们设置了20秒的过期时间,当超过这个时间时,锁会由于过期被删除,因此就等于开释锁了,从而避免了死锁题目。这种策略就是超时开释锁策略。
但新的题目来了,SETNX和EXPIRE是两条命令,如果我实行完SETNX,还没来得急实行EXPIRE时服务已经宕机了,这样加锁乐成,但锁超时时间依然没能设置!死锁题目岂不是再次发生了?!
所以,必须包管SETNX和EXPIRE两个操作的原子性。究竟上,Redis中的set命令就能同时实现setnx和expire的效果:
NX 等同于SETNX lock thread1效果, EX 20 等同于 EXPIRE lock 20效果
SET lock thread1 NX EX 20
利用Redis实现的简单分布式锁流程
- public class RedisLock {
- private final String key;
- private final StringRedisTemplate redisTemplate;
- /**
- * 尝试获取锁
- * @param leaseTime 锁自动释放时间
- * @param unit 时间单位
- * @return 是否获取成功,true:获取锁成功;false:获取锁失败
- */
- public boolean tryLock(long leaseTime, TimeUnit unit){
- // 1.获取线程名称
- String threadValue = Thread.currentThread().getName();
- // 2.获取锁
- Boolean success = redisTemplate.opsForValue().setIfAbsent(key, threadValue, leaseTime, unit);
- // 3.返回结果
- return BooleanUtils.isTrue(success);
- }
- /**
- * 释放锁
- */
- public void unlock(){
- redisTemplate.delete(key);
- }
- }
复制代码 setnx的分布式锁的题目
锁误删题目
比方,有线程1获取锁乐成,而且实行完任务,正准备开释锁,但是由于某种原因导致线程1开释锁的操作被阻塞了,直到锁被超时开释。就在此时,有一个新的线程2来尝试获取锁。由于线程1的锁被超时开释,因此线程2是可以获取锁乐成的。而就在此时,线程1醒来,继续实行开释锁的操作,也就是DEL.结果就把线程2的锁给删除了。然而此时线程2还在实行任务,如果有其它线程再来获取锁,就会认为无人持有锁从而获取锁乐成,于是多个线程再次并行实行,并发安全题目就大概再次发生了
办理方法:开释锁前要检查是不是自己的锁
超时开释题目
线程1获取锁乐成,而且实行业务完成,而且也判定了锁标示,确实与自己一致:
接下来,线程1应该去开释自己的锁了,可就在此时发生了阻塞!直到锁超时开释:然后,线程2来获取锁,又和上面一样了。
总结一下,误删的原因归根结底是由于什么?
操作锁的多行命令又该如何确保原子性?
其它题目
除了上述题目以外,分布式锁还会遇到一些其它题目:
- 锁的重入题目:同一个线程多次获取锁的场景,目前不支持,大概会导致死锁
- 锁失败的重试题目:获取锁失败后要不要重试?目前是直接失败,不支持重试
- Redis主从的一致性题目:由于主从同步存在延迟,当线程在主节点获取锁后,从节点大概未同步锁信息。如果此时主宕机,会出现锁失效环境。此时会有其它线程也获取锁乐成。从而出现并发安全题目。
- …
固然,上述题目并非无法办理,只不过会比力麻烦。比方:
- 原子性题目:可以利用Redis的LUA脚本来编写锁操作,确保原子性
- 超时题目:利用WatchDog(看门狗)机制,获取锁乐成时开启一个定时任务,在锁到期前自动续期,避免超时开释。而当服务宕机后,WatchDog跟着停止运行,不会导致死锁。
- 锁重入题目:可以模拟Synchronized原理,放弃setnx,而是利用Redis的Hash结构来记录锁的持有者以及重入次数,获取锁时重入次数+1,开释锁是重入次数-1,次数为0则锁删除
- 主从一致性题目:可以利用Redis官网推荐的RedLock机制来办理
这些办理方案实现起来比力复杂,因此我们通常会使用一些开源框架来实现分布式锁,而不是自己来编码实现。目前对这些办理方案实现的比力完满的一个第三方组件:Redisson
Redisson
首先引入依赖:
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson</artifactId>
- </dependency>
复制代码 然后是设置:
- @Configuration
- public class RedisConfig {
- @Bean
- public RedissonClient redissonClient() {
- // 配置类
- Config config = new Config();
- // 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
- config.useSingleServer()
- .setAddress("redis://192.168.150.101:6379")
- .setPassowrd("123321");
- // 创建客户端
- return Redisson.create(config);
- }
- }
复制代码 tj-common内里已经设置了,所以不需要重复设置
最后是基本用法:
- @Autowired
- private RedissonClient redissonClient;
- @Test
- void testRedisson() throws InterruptedException {
- // 1.获取锁对象,指定锁名称
- RLock lock = redissonClient.getLock("anyLock");//anylock是锁的名字也是redis的键值
- try {
- // 2.尝试获取锁,参数:waitTime、leaseTime、时间单位
- boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
- if (!isLock) {
- // 获取锁失败处理 ..
- } else {
- // 获取锁成功处理
- }
- } finally {
- // 4.释放锁
- lock.unlock();//判断锁是否属于自己+原子性都有实现
- }
- }
复制代码
Watch Dog看门狗不能设置失效时间,会设置默认的失效时间。
Redisson办理上面的题目:
- 原子性: Lua包管 判定锁是不是自己的,操作的原子性
- 超时题目:Watch Dog看门狗,会专门创建一个线程,监控当前的分布式锁有没有结束,(如果正在使用着锁)没有结束的话会每10s调解过期时间。就检测到正在使用着会把过期时间重置回30s, 不消担心“我正在用着锁开释了”导致的安全题目
- 不可重入:Redis的Hash结构来记录锁的持有者以及重入次数
- 失败重试:redis的发布订阅(Pub/Sub)
- 主从一致性题目:可以利用Redis官网推荐的RedLock机制来办理。
向games频道发送消息,上面订阅了games频道的就会收到消息’hello’
基于注解的分布式锁
基于AOP的思想,将业务部分作为切入点,将业务前后的锁操作作为环绕加强。注解的焦点作用是两个:
注解本身起到标记作用,同时还要带上锁参数:
Step1:自定义注解锁:
- package com.tianji.promotion.annotation;
- import com.tianji.promotion.enums.MyLockStrategy;
- import com.tianji.promotion.enums.MyLockType;
- import java.lang.annotation.ElementType;
- import java.lang.annotation.Retention;
- import java.lang.annotation.RetentionPolicy;
- import java.lang.annotation.Target;
- import java.util.concurrent.TimeUnit;
- @Retention(RetentionPolicy.RUNTIME) // 运行时生效
- @Target(ElementType.METHOD) // 作用于方法
- public @interface MyLock {
- String name(); // 锁名称
- long waitTime() default 1; // 申请锁的等待时间
- long leaseTime() default -1; // 持有锁的TTL有效时间
- TimeUnit unit() default TimeUnit.SECONDS; // 时间单位
- }
复制代码 Step2:定义切面类:
Step3:定义好了锁注解和切面,接下来就可以改造业务了:
怎么定义@Transactional和@MyLock的顺序,默认事务的实行顺序比力靠后(其注解内里的order值较高顺序靠后)。 所以 能包管是 先获取锁再实行事务
工厂模式 选择锁范例
Step1: 在注解 锁MyLock内里加入一个属性
锁的范例,默认为可重入锁,由工厂模式根据lockType进行创建
- MyLockType lockType() default MyLockType.RE_ENTRANT_LOCK;
复制代码 Step2: 在切面类中创建锁对象(更新为工厂模式创建)
- private final MyLockFactory myLockFactory;
- //RLock lock = redissonClient.getLock(myLock.name()); // 只能获取可重入锁
- RLock lock = myLockFactory.getLock(myLock.lockType(), lockName);
复制代码 工厂模式MyLockFactory:
- import com.tianji.promotion.enums.MyLockType;
- import org.redisson.api.RLock;
- import org.redisson.api.RedissonClient;
- @Component
- public class MyLockFactory {
- // 锁对象类型,方法引用
- private final Map<MyLockType, Function<String, RLock>> lockHandlers;
- /**
- * 使用工厂模式,来实现不同的锁类型
- *
- * @param redissonClient Redisson 客户端实例
- */
- public MyLockFactory(RedissonClient redissonClient) {
- // 初始化锁处理器映射表
- this.lockHandlers = new EnumMap<>(MyLockType.class);
- // 添加不同类型的锁处理器到映射表中
- this.lockHandlers.put(MyLockType.RE_ENTRANT_LOCK, redissonClient::getLock);
- this.lockHandlers.put(MyLockType.FAIR_LOCK, redissonClient::getFairLock);
- this.lockHandlers.put(MyLockType.READ_LOCK, name -> redissonClient.getReadWriteLock(name).readLock());
- this.lockHandlers.put(MyLockType.WRITE_LOCK, name -> redissonClient.getReadWriteLock(name).writeLock());
- }
- /**
- * 获取指定类型的锁实例
- *
- * @param lockType 锁类型
- * @param name 锁名称
- * @return 对应类型的锁实例
- */
- public RLock getLock(MyLockType lockType, String name){
- // get获取锁类型的引用,apply调用对应的创建方法
- return lockHandlers.get(lockType).apply(name);
- }
- }
复制代码- public enum MyLockType {
- RE_ENTRANT_LOCK, // 可重入锁
- FAIR_LOCK, // 公平锁
- READ_LOCK, // 读锁
- WRITE_LOCK, // 写锁
- ;
- }
复制代码 策略模式提供 重试策略 + 失败策略组合
定义罗列类,罗列Redisson分布式锁的锁失败的处置惩罚策略:
- import com.tianji.promotion.annotation.MyLock;
- import org.redisson.api.RLock;
- public enum MyLockStrategy {
- SKIP_FAST(){ // 枚举项,快速结束 = 不重试+快速失败
- @Override
- public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {
- return lock.tryLock(0, prop.leaseTime(), prop.unit());
- }
- },
- FAIL_FAST(){ // 枚举项,快速失败 = 不重试+抛出异常
- @Override
- public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {
- boolean isLock = lock.tryLock(0, prop.leaseTime(), prop.unit());
- if (!isLock) {
- throw new BizIllegalException("请求太频繁");
- }
- return true;
- }
- },
- KEEP_TRYING(){ // 枚举项,无限重试
- @Override
- public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {
- lock.lock( prop.leaseTime(), prop.unit());
- return true;
- }
- },
- SKIP_AFTER_RETRY_TIMEOUT(){ // 枚举项,重试超时后结束 = 有限重试+直接结束
- @Override
- public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {
- return lock.tryLock(prop.waitTime(), prop.leaseTime(), prop.unit());
- }
- },
- FAIL_AFTER_RETRY_TIMEOUT(){ // 枚举项,重试超时后失败 = 有限重试+抛出异常
- @Override
- public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {
- boolean isLock = lock.tryLock(prop.waitTime(), prop.leaseTime(), prop.unit());
- if (!isLock) {
- throw new BizIllegalException("请求太频繁");
- }
- return true;
- }
- },
- ;
- public abstract boolean tryLock(RLock lock, MyLock prop) throws InterruptedException;
- }
复制代码 和工厂模式 选择锁范例一样, 在注解 锁MyLock内里加入一个属性,实现可选策略:
- // 锁的失败策略,默认为重试超时后失败(有限重试,失败后抛出异常),由工厂模式根据lockType进行创建
- MyLockStrategy lockStrategy() default MyLockStrategy.FAIL_AFTER_RETRY_TIMEOUT;
复制代码 修改切面代码,基于用户选择的策略来处置惩罚:
就可以在使用锁的时间自由选择锁范例、锁策略了:
基于SPEL的动态锁名
现在实现的锁版本还没有userID

在当前业务中,我们的锁对象本来应该是当前登录用户,是动态获取的。而加锁是基于注解参数添加的,在编码时就需要指定。怎么办?
Spring中提供了一种表达式语法,称为SPEL表达式,可以实行java代码,获取恣意参数。
思路:
我们可以让用户指定锁名称参数时不要写死,而是基于SPEL表达式。在创建锁对象时,剖析SPEL表达式,动态获取锁名称。
首先,在使用锁注解时,锁名称可以利用SPEL表达式,比方我们指定锁名称中要包罗参数中的用户id,则可以这样写:

而如果是通过UserContext.getUser()获取,则可以利用下面的语法:
在这里插入图片形貌
这里T(类名).方法名()就是调用静态方法。
获取锁名称用的是getLockName()这个方法:
- /**
- * SPEL的正则规则
- */
- private static final Pattern pattern = Pattern.compile("\\#\\{([^\\}]*)\\}");
- /**
- * 方法参数解析器
- */
- private static final ParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer();
- /**
- * 解析锁名称
- * @param name 原始锁名称
- * @param pjp 切入点
- * @return 解析后的锁名称
- */
- private String getLockName(String name, ProceedingJoinPoint pjp) {
- // 1.判断是否存在spel表达式
- if (StringUtils.isBlank(name) || !name.contains("#")) {
- // 不存在,直接返回
- return name;
- }
- // 2.构建context,也就是SPEL表达式获取参数的上下文环境,这里上下文就是切入点的参数列表
- EvaluationContext context = new MethodBasedEvaluationContext(
- TypedValue.NULL, resolveMethod(pjp), pjp.getArgs(), parameterNameDiscoverer);
- // 3.构建SPEL解析器
- ExpressionParser parser = new SpelExpressionParser();
- // 4.循环处理,因为表达式中可以包含多个表达式
- Matcher matcher = pattern.matcher(name);
- while (matcher.find()) {
- // 4.1.获取表达式
- String tmp = matcher.group();
- String group = matcher.group(1);
- // 4.2.这里要判断表达式是否以 T字符开头,这种属于解析静态方法,不走上下文
- Expression expression = parser.parseExpression(group.charAt(0) == 'T' ? group : "#" + group);
- // 4.3.解析出表达式对应的值
- Object value = expression.getValue(context);
- // 4.4.用值替换锁名称中的SPEL表达式
- name = name.replace(tmp, ObjectUtils.nullSafeToString(value));
- }
- return name;
- }
- private Method resolveMethod(ProceedingJoinPoint pjp) {
- // 1.获取方法签名
- MethodSignature signature = (MethodSignature)pjp.getSignature();
- // 2.获取字节码
- Class<?> clazz = pjp.getTarget().getClass();
- // 3.方法名称
- String name = signature.getName();
- // 4.方法参数列表
- Class<?>[] parameterTypes = signature.getMethod().getParameterTypes();
- return tryGetDeclaredMethod(clazz, name, parameterTypes);
- }
- private Method tryGetDeclaredMethod(Class<?> clazz, String name, Class<?> ... parameterTypes){
- try {
- // 5.反射获取方法
- return clazz.getDeclaredMethod(name, parameterTypes);
- } catch (NoSuchMethodException e) {
- Class<?> superClass = clazz.getSuperclass();
- if (superClass != null) {
- // 尝试从父类寻找
- return tryGetDeclaredMethod(superClass, name, parameterTypes);
- }
- }
- return null;
- }
复制代码 剖析SPEL
在切面中,我们需要基于注解中的锁名称做动态剖析,而不是直接使用名称:

相关知识点
自定义注解
- @Retention(RetentionPolicy.RUNTIME)
- @Target({ElementType.METHOD})
- @Documented
- public @interface xxxx{
- }
复制代码 通知类:内里的切点表达式,public是方法返回范例,路径…*是service及其子包下的类 后面的.*是恣意方法 (…)指恣意参数
不消切点表达式而是用注解 控制仅实现类中的一个方法:(@annotation(路径.实现类中方法的注解)
注解直接写到方法的参数上:@Around("@annotation(xxx类)")
以上图为例,只要方法加了@printTime注解,方法就是切点,再走这个方法之前就会走环绕通知@Around() around()方法
Redis 的 Pub/Sub (发布/订阅) 是一种消息传递机制,它允许客户端订阅一个或多个频道,并接收其他客户端发布到这些频道的消息。在使用 Redis Pub/Sub 的过程中,大概会遇到由于网络故障、订阅客户端崩溃或其他原因导致消息接收失败的环境。因此,失败重试机制可以资助包管消息在分布式环境下的可靠性。
Redis Pub/Sub 的工作原理
- 发布者 (Publisher) 将消息发布到指定的频道。
- 订阅者 (Subscriber) 订阅频道,并监听来自该频道的消息。
- 当有消息发布到订阅的频道时,Redis 会将这些消息推送到全部订阅了该频道的客户端。
失败重试机制在 Redis Pub/Sub 中的应用
在 Redis Pub/Sub 中,如果出现网络题目或客户端挂掉导致的消息丢失,默认环境下消息不会被重试或保存(Redis 本身不支持长期化消息)。因此,如果需要实现失败重试机制,可以采取以下几种策略:
1. 消息确认和重试机制
- 题目:如果消息在订阅者接收时失败(比如网络停止、订阅者崩溃等),这些消息会丢失。
- 办理方案:一种常见的做法是通过消息确认机制来实现重试。每个消息可以通过客户端进行确认,如果未乐成处置惩罚消息,则将其重新发布到一个队列或另一个频道,等待下一次重试。
实现方式:
- 订阅者在接收到消息时,需要向发布者或消息队列发送确认信号。如果在一定时间内没有收到确认,可以将该消息重新推送到某个死信队列(Dead Letter Queue,DLQ)大概一个等待重试的队列中,比及订阅者规复正常后,再进行重试。
示例:
- // Redis Pub/Sub 订阅者代码(使用 Jedis 客户端)
- public class MySubscriber extends JedisPubSub {
- @Override
- public void onMessage(String channel, String message) {
- try {
- // 处理消息
- processMessage(message);
- // 发送确认信号
- sendAcknowledgment(message);
- } catch (Exception e) {
- // 处理失败,重试机制
- handleFailure(message);
- }
- }
- private void handleFailure(String message) {
- // 如果处理失败,可以将消息重新推送到一个队列或保存到死信队列
- redisClient.lpush("retryQueue", message);
- }
- // 确认消息已处理
- private void sendAcknowledgment(String message) {
- redisClient.publish("acknowledgmentChannel", message); // 可选的确认机制
- }
- }
复制代码 2. 使用 Redis Streams 替换 Pub/Sub
Redis Streams 是一种基于日志的消息队列结构,适合需要消息长期化和重试的场景。与传统的 Pub/Sub 模式不同,Redis Streams 可以存储消息,而且订阅者可以从流的恣意位置读取消息,避免了消息丢失的题目。
特点:
- 长期化:Redis Streams 会长期化消息到磁盘,避免了消息丢失的风险。
- 消息确认和重试:消息的消费者可以通过 XACK 命令显式地确认消息,如果没有确认,Redis 可以将消息重新分配给其他消费者或重新发送给原消费者进行重试。
示例:
- // 发布消息到 Redis Stream
- redisClient.xadd("mystream", Map.of("message", "hello"));
- // 订阅者处理消息
- while (true) {
- List<Map.Entry<String, List<StreamEntry>>> messages = redisClient.xread(
- StreamEntryID.UNRECEIVED, Map.of("mystream", "0"));
-
- for (Map.Entry<String, List<StreamEntry>> streamEntry : messages) {
- for (StreamEntry entry : streamEntry.getValue()) {
- try {
- processMessage(entry.getFields().get("message"));
- redisClient.xack("mystream", "consumerGroup", entry.getID()); // 消息确认
- } catch (Exception e) {
- // 处理失败,可以重新发送消息进行重试
- redisClient.xadd("retryQueue", Map.of("message", entry.getFields().get("message")));
- }
- }
- }
- }
复制代码 使用 Redis Streams 的优势:
- 消息不丢失:Stream 中的消息会长期化在 Redis 中,订阅者可以在后续任何时间读取。
- 自动重试:可以通过消费组的方式来确保如果某个消费者失败,其他消费者会接管任务。
- 确认机制:消费者可以确认已处置惩罚的消息,如果没有确认,Redis 会重新分配任务。
3. 使用 Redis Pub/Sub 的失败重试方案
如果仍然希望使用 Redis 的传统 Pub/Sub 模式并实现某种程度的消息重试,可以结合一些外部机制,比方将消息发布到 Redis Pub/Sub 频道后,同时将消息也存储到一个队列中(如 Redis List、Redis Stream),然后通过定时任务或背景进程来检查未确认的消息。
步骤:
- 订阅者从 Redis 频道获取消息时,在处置惩罚前将消息的 ID 记录下来。
- 如果处置惩罚失败,订阅者会将消息 ID 添加到一个待重试的队列中(比方 Redis List 或 Stream)。
- 定期检查待重试队列并重试这些消息。
总结
固然 Redis Pub/Sub 本身并不直接支持消息的失败重试机制,但可以通过以下几种方式来实现:
- 使用 Redis Streams 取代 Pub/Sub,利用其消息长期化和消费确认功能来实现失败重试。
- 使用 消息确认机制,结合 Redis 队列(如 List、Stream)将失败的消息重新推送,进行后续重试。
- 如果不希望改变现有的 Pub/Sub 模式,可以通过背景任务周期性地重试失败消息,将消息记录在专门的队列中。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |