媒介
通过前篇文章 redis 分布式锁实现
我们发现简朴做一把分布式锁没啥标题,但是针对以往的锁来说,还存在一下两点必要思量。
- 1.一个线程如果多次重复拿锁,该怎样实现重入
- 2.由于防止死锁设置了逾期时间,那么如果锁的时间到期了,业务还没有实行完毕,导致新的业务进来造成的并发标题如那边理
一、分布式重入锁
1、单机重入锁
在单机锁期间,必要支持重入告急是为了制止在单线程环境下大概出现的死锁标题,同时简化编程模子,提升代码的机动性与可重用性。
- 制止死锁:如果一个线程在持有锁的环境下,再次实行获取同一把锁,非重入锁会导致该线程不停等候自己开释锁,从而引发死锁标题。重入锁答应同一个线程多次获取同一把锁,如许就可以制止这种环境的发生。
- 简化递归调用场景:在递归方法中,方法会多次调用自己,而每次调用都必要通过同一个锁掩护共享资源。重入锁可以或许确保这些调用不会由于锁的重复获取而出现壅闭环境。
- 支持锁的调用链:在面向对象编程中,一个持有锁的方法大概会调用对象中同样必要持有锁的其他方法。重入锁包管这些方法可以顺遂实行而不会由于锁的竞争而壅闭。
- 加强机动性:重入锁数据布局更复杂,可以纪录获取锁的次数。这使得锁可以机动用在较复杂的同步场景中。
综上所述,重入锁通过答应同一线程多次获取同一把锁,制止了很多潜伏的同步标题,使得同步代码的编写变得更加简朴和可靠。
比方synchronized、ReentrantLock
2、redis重入锁
参考重入锁的筹划头脑,我们在实现redis重入锁,应该要遵照一下原则
- 互斥条件:实现锁的须要条件,标记是否有线程已占用,差别线程不能重复占用
- 线程信息:纪录线程信息,来判定加锁的是不是同一个线程
- 重入次数:纪录重入次数,再开释锁的时间,淘汰相应的次数
二、redisson实现重入锁
1、 添加依赖
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson-spring-boot-starter</artifactId>
- <version>3.16.3</version> <!-- 根据需要选择合适的版本 -->
- </dependency>
复制代码 2、 设置 Redisson 客户端
平凡模式
- import org.redisson.Redisson;
- import org.redisson.api.RedissonClient;
- import org.redisson.config.Config;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class RedissonConfig {
- @Bean
- public RedissonClient redissonClient() {
- Config config = new Config();
- config.useSingleServer()
- .setAddress("redis://127.0.0.1:6379");
- return Redisson.create(config);
- }
- }
复制代码 集群模式
- @Bean
- public RedissonClient redissonClient() {
- Config config = new Config();
- config.useClusterServers()
- .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001");
- return Redisson.create(config);
- }
复制代码 哨兵模式
- @Bean
- public RedissonClient redissonClient() {
- Config config = new Config();
- config.useSentinelServers()
- .setMasterName("masterName")
- .addSentinelAddress("redis://127.0.0.1:26379", "redis://127.0.0.1:26380");
- return Redisson.create(config);
- }
复制代码 3、 使用 Redisson 实现重入锁
- import org.redisson.api.RLock;
- import org.redisson.api.RedissonClient;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- @Service
- public class MyService {
- @Autowired
- private RedissonClient redissonClient;
- private String lock = "myLock";
- public void outerMethod() {
- RLock lock = redissonClient.getLock(lock);
- lock.lock();
- try {
- System.out.println("Outer method acquired lock");
- innerMethod();
- } finally {
- lock.unlock();
- System.out.println("Outer method released lock");
- }
- }
- private void innerMethod() {
- RLock lock = redissonClient.getLock(lock);
- lock.lock();
- try {
- System.out.println("Inner method acquired lock");
- } finally {
- lock.unlock();
- System.out.println("Inner method released lock");
- }
- }
- }
复制代码 4、 验证
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
- @RestController
- public class MyController {
- @Autowired
- private MyService myService;
- @GetMapping("/test-lock")
- public String testLock() {
- myService.outerMethod();
- return "Lock tested successfully";
- }
- }
复制代码 5、运行项目
启动 Spring Boot 应用,并访问 http://localhost:8080/test-lock 以测试多次重入锁的实现。你应该可以或许在控制台上看到如下输出,表明锁多次重入的准确实行:
- Outer method acquired lock
- Inner method acquired lock
- Inner method released lock
- Outer method released lock
复制代码 三、redisson分布式锁分析
1、获取锁对象
- RLock lock = redissonClient.getLock(lock);
- public RLock getLock(String name) {
- return new RedissonLock(this.connectionManager.getCommandExecutor(), name, this.id);
- }
- public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
- super(commandExecutor, name);
- this.commandExecutor = commandExecutor;
- this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
- this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
- }
- public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
- super(commandExecutor, name);
- this.commandExecutor = commandExecutor;
- this.id = commandExecutor.getConnectionManager().getId();
- // 默认锁释放时间 30s
- this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
- this.entryName = id + ":" + name;
- }
- public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
- this.codec = codec;
- this.commandExecutor = commandExecutor;
- if (name == null) {
- throw new NullPointerException("name can't be null");
- }
- setName(name);
- }
复制代码 2、 加锁
org.redisson.RedissonLock#tryLock
- @Override
- public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
- // 等待时间转成MS
- long time = unit.toMillis(waitTime);
- long current = System.currentTimeMillis();
- // 当前线程
- long threadId = Thread.currentThread().getId();
- // 尝试获取锁 返回空标识回去锁
- Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
- // lock acquired
- if (ttl == null) {
- return true;
- }
-
- time -= System.currentTimeMillis() - current;
- // 超时,拿锁失败 返回false
- if (time <= 0) {
- acquireFailed(waitTime, unit, threadId);
- return false;
- }
-
- current = System.currentTimeMillis();
- // 订阅解锁消息,见org.redisson.pubsub.LockPubSub#onMessage
- // 订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
- // 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争
- // 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败
- // 当 this.await返回true,进入循环尝试获取锁
- CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
- try {
- subscribeFuture.get(time, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- if (!subscribeFuture.cancel(false)) {
- subscribeFuture.whenComplete((res, ex) -> {
- if (ex == null) {
- unsubscribe(res, threadId);
- }
- });
- }
- acquireFailed(waitTime, unit, threadId);
- return false;
- } catch (ExecutionException e) {
- acquireFailed(waitTime, unit, threadId);
- return false;
- }
- try {
- time -= System.currentTimeMillis() - current
- // 超时,拿锁失败 返回false;
- if (time <= 0) {
- acquireFailed(waitTime, unit, threadId);
- return false;
- }
- // 自旋获取锁
- while (true) {
- long currentTime = System.currentTimeMillis();
- // 再次加锁
- ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
- // lock acquired
- // 获得锁
- if (ttl == null) {
- return true;
- }
- time -= System.currentTimeMillis() - currentTime;
- // 超时,拿锁失败 返回false;
- if (time <= 0) {
- acquireFailed(waitTime, unit, threadId);
- return false;
- }
- // waiting for message
- currentTime = System.currentTimeMillis();
- // 下面的阻塞会在释放锁的时候,通过订阅发布及时relase
- if (ttl >= 0 && ttl < time) {
- // 如果锁的超时时间小于等待时间,通过SemaphorerelaseryAcquire阻塞锁的释放时间
- commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- } else {
- // 否则,通过Semaphore的tryAcquire阻塞传入的最大等待时间
- commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
- }
- time -= System.currentTimeMillis() - currentTime;
- if (time <= 0) {
- acquireFailed(waitTime, unit, threadId);
- return false;
- }
- }
- } finally {
- // 取消订阅
- unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
- }
- }
复制代码 实行获取锁
- private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
- return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
- }
- private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
- RFuture<Long> ttlRemainingFuture;
- if (leaseTime > 0) {
- // 释放时间同步
- ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
- } else {
- // 如果没有传入锁的释放时间,默认 internalLockLeaseTime = 30000 MS
- ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
- TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
- }
- CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
- // lock acquired
- // 如果返回null说明抢到了锁或者是可重入 否则直接返回还有多久过期
- if (ttlRemaining == null) {
- if (leaseTime > 0) {
- // 释放时间 赋值给 internalLockLeaseTime
- internalLockLeaseTime = unit.toMillis(leaseTime);
- } else {
- scheduleExpirationRenewal(threadId);
- }
- }
- // 没有抢到直接返回
- return ttlRemaining;
- });
- return new CompletableFutureWrapper<>(f);
- }
- <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
- return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
-
- "if (redis.call('exists', KEYS[1]) == 0) then " +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "return nil; " +
- "end; " +
- "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "return nil; " +
- "end; " +
- "return redis.call('pttl', KEYS[1]);",
- Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
- }
复制代码 LUA脚天职析
- # 加锁
- "if (redis.call('exists', KEYS[1]) == 0) then " # 判断我的锁是否存在 =0 为不存在 没人抢占锁
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + # 设置锁并计数重入次数
- "redis.call('pexpire', KEYS[1], ARGV[1]); " + # 设置过期时间 30S
- "return nil; " +
- "end; " +
- # 重入
- "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + # 进入该逻辑说明有线程抢占了锁 继续判断是否同一个线程 ==1 为同一线程
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + # 重入次数 + 1
- "redis.call('pexpire', KEYS[1], ARGV[1]); " + # 设置超时时间
- "return nil; " +
- "end; " +
- "return redis.call('pttl', KEYS[1]);", // 前面2个if都没进,说明锁被抢占并且不是同一线程,直接返回过期时间
复制代码 3、订阅
订阅锁状态,挂起叫醒线程
org.redisson.RedissonLock#subscribe
- public CompletableFuture<E> subscribe(String entryName, String channelName) {
- AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
- CompletableFuture<E> newPromise = new CompletableFuture<>();
- semaphore.acquire().thenAccept(c -> {
- if (newPromise.isDone()) {
- semaphore.release();
- return;
- }
- E entry = entries.get(entryName);
- if (entry != null) {
- entry.acquire();
- semaphore.release();
- entry.getPromise().whenComplete((r, e) -> {
- if (e != null) {
- newPromise.completeExceptionally(e);
- return;
- }
- newPromise.complete(r);
- });
- return;
- }
- E value = createEntry(newPromise);
- value.acquire();
- E oldValue = entries.putIfAbsent(entryName, value);
- if (oldValue != null) {
- oldValue.acquire();
- semaphore.release();
- oldValue.getPromise().whenComplete((r, e) -> {
- if (e != null) {
- newPromise.completeExceptionally(e);
- return;
- }
- newPromise.complete(r);
- });
- return;
- }
- // 创建监听,释放锁,会发送消息
- RedisPubSubListener<Object> listener = createListener(channelName, value);
- CompletableFuture<PubSubConnectionEntry> s = service.subscribeNoTimeout(LongCodec.INSTANCE, channelName, semaphore, listener);
- newPromise.whenComplete((r, e) -> {
- if (e != null) {
- s.completeExceptionally(e);
- }
- });
- s.whenComplete((r, e) -> {
- if (e != null) {
- value.getPromise().completeExceptionally(e);
- return;
- }
- value.getPromise().complete(value);
- });
- });
- return newPromise;
- }
复制代码 org.redisson.pubsub.PublishSubscribe#createListener
- private RedisPubSubListener<Object> createListener(String channelName, E value) {
- // 创建监听,当监听到消息回来的时候,进入onMessage进行处理
- RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {
- @Override
- public void onMessage(CharSequence channel, Object message) {
- if (!channelName.equals(channel.toString())) {
- return;
- }
- PublishSubscribe.this.onMessage(value, (Long) message);
- }
- };
- return listener;
- }
复制代码 org.redisson.pubsub.LockPubSub#onMessage
- @Override
- protected void onMessage(RedissonLockEntry value, Long message) {
- if (message.equals(UNLOCK_MESSAGE)) {
- Runnable runnableToExecute = value.getListeners().poll();
- if (runnableToExecute != null) {
- runnableToExecute.run();
- }
- // 释放 Semaphore
- value.getLatch().release();
- } else if (message.equals(READ_UNLOCK_MESSAGE)) {
- while (true) {
- Runnable runnableToExecute = value.getListeners().poll();
- if (runnableToExecute == null) {
- break;
- }
- runnableToExecute.run();
- }
- value.getLatch().release(value.getLatch().getQueueLength());
- }
- }
复制代码 4、锁续期
redisson watchDog 使用时间轮技能,请参考时间轮算法分析
org.redisson.RedissonBaseLock#scheduleExpirationRenewal
- protected void scheduleExpirationRenewal(long threadId) {
- ExpirationEntry entry = new ExpirationEntry();
- // 放入EXPIRATION_RENEWAL_MAP 这个MAP中
- ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
- // 第一次进来,里面没有
- if (oldEntry != null) {
- // 如果其他线程来抢占这个锁,进入将线程ID保存至ExpirationEntry的threadIds这个Map中
- oldEntry.addThreadId(threadId);
- } else {
- // 将线程ID保存至ExpirationEntry的threadIds这个Map中
- entry.addThreadId(threadId);
- try {
- // 执行
- renewExpiration();
- } finally {
- if (Thread.currentThread().isInterrupted()) {
- cancelExpirationRenewal(threadId);
- }
- }
- }
- }
复制代码 org.redisson.RedissonBaseLock#renewExpiration
- private void renewExpiration() {
- ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
- if (ee == null) {
- return;
- }
- // 开启时间轮,时间是10s之后执行我们的TimerTask任务
- Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- // 从EXPIRATION_RENEWAL_MAP中拿到锁的对象,有可能在定时的时候被移除取消
- ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
- if (ent == null) {
- return;
- }
- Long threadId = ent.getFirstThreadId();
- if (threadId == null) {
- return;
- }
- // 给锁续期
- CompletionStage<Boolean> future = renewExpirationAsync(threadId);
- future.whenComplete((res, e) -> {
- // 异常报错,从Map移除
- if (e != null) {
- log.error("Can't update lock " + getRawName() + " expiration", e);
- EXPIRATION_RENEWAL_MAP.remove(getEntryName());
- return;
- }
- // 如果返回的是1 代表线程还占有锁,递归调用自己
- if (res) {
- // 递归再次加入时间轮
- // reschedule itself
- renewExpiration();
- } else {
- // 所不存在,这取消任务,移除相关MAP信息
- cancelExpirationRenewal(null);
- }
- });
- }
- }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
- // 设置Timeout
- ee.setTimeout(task);
- }
复制代码 org.redisson.connection.MasterSlaveConnectionManager#newTimeout
- private HashedWheelTimer timer;
- @Override
- public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
- try {
- // 添加进入时间轮
- return timer.newTimeout(task, delay, unit);
- } catch (IllegalStateException e) {
- if (isShuttingDown()) {
- return DUMMY_TIMEOUT;
- }
-
- throw e;
- }
- }
复制代码- protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
- return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
- "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 如果当前锁存在
- "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 续期
- "return 1; " +
- "end; " +
- "return 0;",
- Collections.singletonList(getRawName()),
- internalLockLeaseTime, getLockName(threadId));
- }
复制代码 5、开释锁
org.redisson.RedissonLock#unlock
- @Override
- public void unlock() {
- try {
- get(unlockAsync(Thread.currentThread().getId()));
- } catch (RedisException e) {
- if (e.getCause() instanceof IllegalMonitorStateException) {
- throw (IllegalMonitorStateException) e.getCause();
- } else {
- throw e;
- }
- }
- }
复制代码 org.redisson.RedissonBaseLock.unlockAsync
- @Override
- public RFuture<Void> unlockAsync(long threadId) {
- // 进入释放锁逻辑
- RFuture<Boolean> future = unlockInnerAsync(threadId);
- CompletionStage<Void> f = future.handle((opStatus, e) -> {
- // 移除ExpirationEntry中的threadId 并且移除 EXPIRATION_RENEWAL_MAP中的ExpirationEntry watchDog就不会继续续期
- cancelExpirationRenewal(threadId);
-
- // 异常处理
- if (e != null) {
- throw new CompletionException(e);
- }
- // 不存在锁信息
- if (opStatus == null) {
- IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
- + id + " thread-id: " + threadId);
- throw new CompletionException(cause);
- }
- return null;
- });
- return new CompletableFutureWrapper<>(f);
- }
复制代码 org.redisson.RedissonBaseLock.unlockInnerAsync
- protected RFuture<Boolean> unlockInnerAsync(long threadId) {
- return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
- "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
- "return nil;" +
- "end; " +
- "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
- "if (counter > 0) then " +
- "redis.call('pexpire', KEYS[1], ARGV[2]); " +
- "return 0; " +
- "else " +
- "redis.call('del', KEYS[1]); " +
- "redis.call('publish', KEYS[2], ARGV[1]); " +
- "return 1; " +
- "end; " +
- "return nil;",
- Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
- }
复制代码 开释锁LUA
- "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + # 锁已经释放
- "return nil;" +
- "end; " +
- "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + #重入次数减一
- "if (counter > 0) then " +
- "redis.call('pexpire', KEYS[1], ARGV[2]); " + # 重入次数还不为0 那说明占有锁,设置过期时间
- "return 0; " +
- "else " +
- "redis.call('del', KEYS[1]); " + # 重入次数为0,释放锁
- "redis.call('publish', KEYS[2], ARGV[1]); " + # 发布订阅事件,唤醒其它线程,可以去竞争锁了
- "return 1; " +
- "end; " +
- "return nil;",
复制代码 6、流程图
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
|