ToB企服应用市场:ToB评测及商务社交产业平台

标题: 面试官问我:线程锁导致的kafka客户端超时,如何解决? [打印本页]

作者: 河曲智叟    时间: 2024-2-26 20:51
标题: 面试官问我:线程锁导致的kafka客户端超时,如何解决?
本文分享自华为云社区《线程锁导致的kafka客户端超时问题》,作者: 张俭 。
问题背景

有一个环境的kafka client发送数据有部分超时,拓扑图也非常简单

定位历程

我们先对客户端的环境及JVM情况进行了排查,从JVM所在的虚拟机到kafka server的网络正常,垃圾回收(GC)时间也在预期范围内,没有出现异常。
紧接着,我们把目光转向了kafka 服务器,进行了一些基础的检查,同时也查看了kafka处理请求的超时日志,其中我们关心的metadata和produce请求都没有超时。
问题就此陷入了僵局,虽然也搜到了一些kafka server会对连上来的client反解导致超时的问题( https://github.com/apache/kafka/pull/10059),但通过一些简单的分析,我们确定这并非是问题所在。
同时,我们在环境上也发现一些异常情况,当时觉得不是核心问题/解释不通,没有深入去看
当排查陷入僵局,我们开始考虑其他可能的调查手段。我们尝试抓包来找线索,这里的抓包是SASL鉴权+SSL加密的,非常难读,只能靠长度和响应时间勉强来推断报文的内容。
在这个过程中,我们发现了一个非常重要的线索,客户端竟然发起了超时断链,并且超时的那条消息,实际服务端是有响应回复的。
随后我们将kafka client的trace级别日志打开,这里不禁感叹kafka client日志打的相对较少,发现的确有log.debug(“Disconnecting from node {} due to request timeout.”, nodeId);的日志打印。
与网络相关的流程:
  1. try {
  2. // 这里发出了请求
  3. client.send(request, time.milliseconds());
  4. while (client.active()) {
  5. List<ClientResponse> responses = client.poll(Long.MAX_VALUE, time.milliseconds());
  6. for (ClientResponse response : responses) {
  7. if (response.requestHeader().correlationId() == request.correlationId()) {
  8. if (response.wasDisconnected()) {
  9. throw new IOException("Connection to " + response.destination() + " was disconnected before the response was read");
  10. }
  11. if (response.versionMismatch() != null) {
  12. throw response.versionMismatch();
  13. }
  14. return response;
  15. }
  16. }
  17. }
  18. throw new IOException("Client was shutdown before response was read");
  19. } catch (DisconnectException e) {
  20. if (client.active())
  21. throw e;
  22. else
  23. throw new IOException("Client was shutdown before response was read");
  24. }
复制代码
这个poll方法,不是简单的poll方法,而在poll方法中会进行超时判断,查看poll方法中调用的handleTimedOutRequests方法
  1. @Override
  2. public List<ClientResponse> poll(long timeout, long now) {
  3. ensureActive();
  4. if (!abortedSends.isEmpty()) {
  5. // If there are aborted sends because of unsupported version exceptions or disconnects,
  6. // handle them immediately without waiting for Selector#poll.
  7. List<ClientResponse> responses = new ArrayList<>();
  8. handleAbortedSends(responses);
  9. completeResponses(responses);
  10. return responses;
  11. }
  12. long metadataTimeout = metadataUpdater.maybeUpdate(now);
  13. try {
  14. this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
  15. } catch (IOException e) {
  16. log.error("Unexpected error during I/O", e);
  17. }
  18. // process completed actions
  19. long updatedNow = this.time.milliseconds();
  20. List<ClientResponse> responses = new ArrayList<>();
  21. handleCompletedSends(responses, updatedNow);
  22. handleCompletedReceives(responses, updatedNow);
  23. handleDisconnections(responses, updatedNow);
  24. handleConnections();
  25. handleInitiateApiVersionRequests(updatedNow);
  26. // 关键的超时判断
  27. handleTimedOutRequests(responses, updatedNow);
  28. completeResponses(responses);
  29. return responses;
  30. }
复制代码
由此我们推断,问题可能在于客户端hang住了一段时间,从而导致超时断链。我们通过工具Arthas深入跟踪了Kafka的相关代码,甚至发现一些简单的操作(如A.field)也需要数秒的时间。这进一步确认了我们的猜想:问题可能出在JVM。JVM可能在某个时刻出现问题,导致系统hang住,但这并非由GC引起。

为了解决这个问题,我们又检查了监控线程CPU较高的问题。我们发现线程的执行热点是从"sun.management.ThreadImpl"中的"getThreadInfo"方法。
  1. "metrics-1@746" prio=5 tid=0xf nid=NA runnable
  2. java.lang.Thread.State: RUNNABLE
  3. at sun.management.ThreadImpl.getThreadInfo(Native Method)
  4. at sun.management.ThreadImpl.getThreadInfo(ThreadImpl.java:185)
  5. at sun.management.ThreadImpl.getThreadInfo(ThreadImpl.java:149)
复制代码
进一步发现,在某些版本的JDK8中,读取线程信息是需要加锁的。
至此,问题的根源已经清晰明了:过高的线程数以及线程监控时JVM全局锁的存在导致了这个问题。您可以使用如下的demo来复现这个问题
  1. import java.lang.management.ManagementFactory;
  2. import java.lang.management.ThreadInfo;
  3. import java.lang.management.ThreadMXBean;
  4. import java.util.concurrent.Executors;
  5. import java.util.concurrent.ScheduledExecutorService;
  6. import java.util.concurrent.TimeUnit;
  7. public class ThreadLockSimple {
  8. public static void main(String[] args) {
  9. for (int i = 0; i < 15_000; i++) {
  10. new Thread(new Runnable() {
  11. @Override
  12. public void run() {
  13. try {
  14. Thread.sleep(200_000);
  15. } catch (InterruptedException e) {
  16. throw new RuntimeException(e);
  17. }
  18. }
  19. }).start();
  20. }
  21. ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
  22. executorService.scheduleAtFixedRate(new Runnable() {
  23. @Override
  24. public void run() {
  25. System.out.println("take " + " " + System.currentTimeMillis());
  26. }
  27. }, 1, 1, TimeUnit.SECONDS);
  28. ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
  29. ScheduledExecutorService metricsService = Executors.newSingleThreadScheduledExecutor();
  30. metricsService.scheduleAtFixedRate(new Runnable() {
  31. @Override
  32. public void run() {
  33. long start = System.currentTimeMillis();
  34. ThreadInfo[] threadInfoList = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds());
  35. System.out.println("threads count " + threadInfoList.length + " cost :" + (System.currentTimeMillis() - start));
  36. }
  37. }, 1, 1, TimeUnit.SECONDS);
  38. }
  39. }
复制代码
为了解决这个问题,我们有以下几个可能的方案:
这个问题的解决方案应根据实际情况进行选择,希望对你有所帮助。
 
点击关注,第一时间了解华为云新鲜技术~
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4