redisson内存走漏问题排查

打印 上一主题 下一主题

主题 552|帖子 552|积分 1656

问题形貌

最近生产有个服务突然出现频繁告警,接口P99响应时间变长,运维同学观察到相应的pod cpu飙升,内存占用很高。
cpu升高问题排查是老生常谈的话题了,一般可以利用top -p pid -H查看是哪个线程占用cpu高,再结合jstack找到对应的java线程代码。
不外履历告诉我们,cpu升高另有另外一个更常见的原因,内存不足导致频繁gc。垃圾收集器回收内存后又很快不足,继续回收,循环这个过程,而gc期间涉及到STW,用户线程会被挂起,响应时间天然会增加。这里的内存不足可能是正常的服务本身内存就不够用,也可以是异常的程序bug导致内存溢出。
果不其然,其时节点的full gc时间陡增,通过jstat -gcutil pid 500 30也可以看到fc非常频繁。如图:

这个问题现实月初也出现过,其时研发同学和运维同学通过重启暂时解决,今天又出现了,看来不是简单通过“重启大法”能解决的,这次我们需要分析解决它。
排查过程

这次我们通过heap dump将堆导出分析,命令:
  1. jmap -dump:format=b,file=./pid.hprof pid
复制代码
用jdk自带的virsualvmidea virsualvm launcher插件打开堆文件可以看到

很显着,跟redisson相关,我们利用的版本是3.17.1!查找服务涉及到redisson的地方并不多,调用量高且可疑的只有一处,简化后的代码如下:
  1. RLock lock = this.redissonClient.getLock("mytest");
  2. lock.tryLock(50, 100, TimeUnit.MILLISECONDS);
  3.         
  4. //业务代码...
  5. RLock lock2 = this.redissonClient.getLock("mytest");
  6. if (lock2.isLocked() && lock2.isHeldByCurrentThread()) {
  7.   lock2.unlock();
  8. }
复制代码
起首我们先简单分析下RedissonLock tryLock和unlock的源码,主要地方添加了备注。
  1.     @Override
  2.     public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  3.         long time = unit.toMillis(waitTime);
  4.         long current = System.currentTimeMillis();
  5.         long threadId = Thread.currentThread().getId();
  6.         Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
  7.         // 获取到锁,返回成功
  8.         if (ttl == null) {
  9.             return true;
  10.         }
  11.         
  12.         time -= System.currentTimeMillis() - current;
  13.         if (time <= 0) {
  14.             //或取不到锁,且超过等待时间,返回失败
  15.             acquireFailed(waitTime, unit, threadId);
  16.             return false;
  17.         }
  18.         
  19.         current = System.currentTimeMillis();
  20.         //订阅锁释放消息,subscribe是本次的核心!!!
  21.         CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
  22.         try {
  23.             subscribeFuture.get(time, TimeUnit.MILLISECONDS);
  24.         } catch (ExecutionException | TimeoutException e) {
  25.             //超时,获取锁失败
  26.             if (!subscribeFuture.cancel(false)) {
  27.                 subscribeFuture.whenComplete((res, ex) -> {
  28.                     if (ex == null) {
  29.                         unsubscribe(res, threadId);
  30.                     }
  31.                 });
  32.             }
  33.             acquireFailed(waitTime, unit, threadId);
  34.             return false;
  35.         }
  36.         try {
  37.             time -= System.currentTimeMillis() - current;
  38.             if (time <= 0) {
  39.                 acquireFailed(waitTime, unit, threadId);
  40.                 return false;
  41.             }
  42.         
  43.             //锁释放了,还未超时,自旋尝试获取
  44.             while (true) {
  45.                 long currentTime = System.currentTimeMillis();
  46.                 ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
  47.                 // 获取到锁,返回成功
  48.                 if (ttl == null) {
  49.                     return true;
  50.                 }
  51.                 time -= System.currentTimeMillis() - currentTime;
  52.                 if (time <= 0) {
  53.                     //或取不到锁,且超过等待时间,返回失败
  54.                     acquireFailed(waitTime, unit, threadId);
  55.                     return false;
  56.                 }
  57.                 // 等待锁释放
  58.                 currentTime = System.currentTimeMillis();
  59.                 if (ttl >= 0 && ttl < time) {
  60.                     commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  61.                 } else {
  62.                     commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
  63.                 }
  64.                 time -= System.currentTimeMillis() - currentTime;
  65.                 if (time <= 0) {
  66.                     //或取不到锁,且超过等待时间,返回失败
  67.                     acquireFailed(waitTime, unit, threadId);
  68.                     return false;
  69.                 }
  70.             }
  71.         } finally {
  72.             //取消订阅
  73.             unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
  74.         }
  75.     }
复制代码
AsyncSemaphore主要代码如下,permits是1,listeners是一个无界队列。在我们dump出来的异常实例中有一个AsyncSemaphore lambda对象,也有CompletableFuture lambda对象,看起来和这里高度匹配,这里大概率就是问题所在了,应该是在某种情况下,acquire后没有调用release,导致其它线程调用decrementAndGet的时候是 listener.run());    }    private void tryRun() {        while (true) {            if (counter.decrementAndGet() >= 0) {                CompletableFuture future = listeners.poll();                if (future == null) {                    counter.incrementAndGet();                    return;                }                if (future.complete(null)) {                    return;                }            }            if (counter.incrementAndGet() ... listeners) {        CompletableFuture... listeners) {        //...        subscribeFuture.whenComplete((res, e) - {            if (e != null) {                lock.release();                return;            }            if (!promise.complete(connEntry)) {                if (!connEntry.hasListeners(channelName)) {                    unsubscribe(type, channelName)                        .whenComplete((r, ex) -> {                            //这里不会被执行,AsyncSemaphore release没有执行!                            lock.release();                        });                } else {                    lock.release();                }            } else {                lock.release();            }        });        return subscribeFuture;} public CompletableFuture unsubscribe(PubSubType topicType, ChannelName channelName) {        //...        BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {            @Override            public boolean onStatus(PubSubType type, CharSequence channel) {                //这个if不会进入...                if (type == topicType && channel.equals(channelName)) {                    executed.set(true);                    if (entry.release() == 1) {                        MasterSlaveEntry msEntry = getEntry(channelName);                        msEntry.returnPubSubConnection(entry.getConnection());                    }                    //触发外面whenComplete的执行                    result.complete(null);                    return true;                }                return false;            }        };        ChannelFuture future;        //这里是unsubscribe和punsubscribe,而前面传进来的topicType是subscribe,对不上了        if (topicType == PubSubType.UNSUBSCRIBE) {            future = entry.unsubscribe(channelName, listener);        } else {            future = entry.punsubscribe(channelName, listener);        }        return result;}[/code]问题复现

前面分析得头头是道,我们还得通过实践证明一下,有理有据才行。
我的复现代码如下,通过并发调用加锁,开始运行加个断点在org.redisson.pubsub.PublishSubscribeService#unsubscribe里的BaseRedisPubSubListener的onStatus方法,发现正如前面所说,topicType确实对不上。接着运行一段时间后,打一个断点在AsyncSemaphore.acquire方法,观察到listener属性的size不断增长,通过jmap pid GC.run触发gc后也不会回收,问题得以复现。
  1.     @Override
  2.     public RFuture<Void> unlockAsync(long threadId) {
  3.         RFuture<Boolean> future = unlockInnerAsync(threadId);
  4.         CompletionStage<Void> f = future.handle((opStatus, e) -> {
  5.             //取消锁续期
  6.             cancelExpirationRenewal(threadId);
  7.             //...
  8.         });
  9.         return new CompletableFutureWrapper<>(f);
  10.     }
  11.     protected RFuture<Boolean> unlockInnerAsync(long threadId) {
  12.         return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  13.                 "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
  14.                         "return nil;" +
  15.                         "end; " +
  16.                         "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
  17.                         "if (counter > 0) then " +
  18.                         "redis.call('pexpire', KEYS[1], ARGV[2]); " +
  19.                         "return 0; " +
  20.                         "else " +
  21.                         "redis.call('del', KEYS[1]); " +
  22.                         "redis.call('publish', KEYS[2], ARGV[1]); " +
  23.                         "return 1; " +
  24.                         "end; " +
  25.                         "return nil;",
  26.                 Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
  27.     }
复制代码


问题解决

在开始排查问题的时候,笔者就在github提issue咨询是什么原因,怎样解决。他们的回复是跟这个相关,并保举升级到3.21.2版本,不外里面提到的形貌跟我的不太一样,所以按照版本选择的履历,我决定将版本升级到3.17末了一个小版本3.17.7试一下,重新跑上面的测试代码,跑一段时间后,发现问题没有出现了。

查看org.redisson.pubsub.PublishSubscribeService#unsubscribe源码,发现出问题那段逻辑已经被修复了。

履历总结

遇到难啃问题几乎是每个开发不可制止的事变,解决问题的过程,方法和事后复盘,履历总结非常重要,对个人的学习和本领提升有很大的帮助。
以下几点是我本次的总结:

  • 及时止损
    当生产出现问题,许多开发同学起首会想怎样找到原因,解决根本问题,但现实情况应该是评估影响,及时止损,制止问题发散,扩大影响。
    例如不能在短时间内解决的,还要下来慢慢看日志,分析代码的,能回滚的先回滚,能重启的先重启,夺取在出现资损前解决问题,减少对业务产生影响。
  • 向上汇报
    遇到棘手问题不要闷声自己想办法解决,正确做法是先向你的leader汇报问题和风险。如果问题比力棘手和严重,可以哀求协助,制止因为个人本领不足迟迟不能解决问题,小问题拖成大问题。
  • 保存现场
    有时候问题是难以复现的,像我们本次的情况一个月可能就出现一次,如果直接重启服务,那么等下次问题出现就非常久了。所以正确的做法是保存现场,同时要不影响业务,可以保存一个节点,将其流量摘除,通过jstack/jmap dump出程序堆栈,其它节点重启。
  • 保持耐心
    有些问题不是一时半会就能解决的,有的以天为单位,有的可能要一个月才解决。所以保持耐心很重要,多看看官方文档,github issue,分析源码,尝试各种方式,排除各种可能,相信总会找到解决方法。
  • 版本选择
    我们选择的redisson版本是3.17.1,现实这个选择不是很好。按照x.y.z的版本规范,x表示大版本,通常是有重大更新,y表示小版本,通常是一些功能迭代,z表示修复版本,通常是修bug用的。例如springboot从2.x升级到3.0,jdk版本要求最低17,是一个非常重大的更新。
    上面我为什么选择3.17.7来测试,是因为3.17.7是3.17的末了一个小版本,看到这个版本的release报告你就知道是为什么了,它全部都是在修bug。
    固然本次的问题修复不一定在.7这个版本,可能是在1-7之间的某个版本,有兴趣的可以再细看下。

更多分享,接待关注我的github:https://github.com/jmilktea/jtea

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

李优秀

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表