ToB企服应用市场:ToB评测及商务社交产业平台

标题: 页面查询多项数据组合的线程池设计 [打印本页]

作者: 丝    时间: 2023-11-10 19:30
标题: 页面查询多项数据组合的线程池设计
背景

我们应对并发场景时一般会采用下面方式去预估线程池的线程数量,比如QPS需求是1000,平均每个任务需要执行的时间是t秒,那么我们需要的线程数是t * 1000。
但是在一些情况下,这个t是不好估算的,即便是估算出来了,在实际的线程环境上也需要进行验证和微调。比如在本文所阐述分页查询的数据项组合场景中。
1、数据组合依赖不同的上游接接口, 它们的响应时间参差不齐,甚至差距还非常大。有些接口支持批量查询而另一些则不支持批量查询。有些接口因为性能问题还需要考虑降级和平滑方案。
2、为了提升用户体验,这里的查询设计了动态列,因此每一次访问所需要组合的数据项和数量也是不同的。
因此这里如果需要估算出一个合理的t是不太现实的。
方案

一种可动态调节的策略,根据监控的反馈对线程池进行微调。整体设计分为装配逻辑线程池封装设计。
1、装配逻辑

查询结果,拆分分片(水平拆分),并行装配(垂直拆分),获得装配项列表(动态列), 并行装配每一项。

2、线程池封装

可调节的核心线程数、最大线程数、线程保持时间,队列大小,提交任务重试等待时间,提交任务重试次数。 固定异常拒绝策略。
调节参数:
字段名称说明corePoolSize核心线程数参考线程池定义maximumPoolSize最大线程数参考线程池定义keepAliveTime线程存活时间参考线程池定义queueSize队列长度参考线程池定义resubmitSleepMillis提交任务重试等待时间添加任务被拒绝后重试时的等待时间resubmitTimes提交任务重试次数添加任务被拒绝后重试添加的最大次数
  1.     @Data
  2.         private static class PoolPolicy {
  3.                 /** 核心线程数 */
  4.                 private Integer corePoolSize;
  5.                 /** 最大线程数 */
  6.                 private Integer maximumPoolSize;
  7.                 /** 线程存活时间 */
  8.                 private Integer keepAliveTime;
  9.                 /** 队列容量 */
  10.                 private Integer queueSize;
  11.                 /** 重试等待时间 */
  12.                 private Long resubmitSleepMillis;
  13.                 /** 重试次数 */
  14.                 private Integer resubmitTimes;
  15.         }
复制代码
创建线程池:
线程池的创建考虑了动态的需求,满足根据压测结果进行微调的要求。首先缓存旧的线程池后再创建新的线程,当新的线程池创建成功后再去关闭旧的线程池。保证在这个替换过程中不影响正在执行的业务。线程池使用了中断策略,用户可以及时感知到系统繁忙并保证了系统资源占用的安全。
  1. public void reloadThreadPool(PoolPolicy poolPolicy) {
  2.     if (poolPolicy == null) {
  3.         throw new RuntimeException("The thread pool policy cannot be empty.");
  4.     }
  5.     if (poolPolicy.getCorePoolSize() == null) {
  6.         poolPolicy.setCorePoolSize(0);
  7.     }
  8.     if (poolPolicy.getMaximumPoolSize() == null) {
  9.         poolPolicy.setMaximumPoolSize(Runtime.getRuntime().availableProcessors() + 1);
  10.     }
  11.     if (poolPolicy.getKeepAliveTime() == null) {
  12.         poolPolicy.setKeepAliveTime(60);
  13.     }
  14.     if (poolPolicy.getQueueSize() == null) {
  15.         poolPolicy.setQueueSize(Runtime.getRuntime().availableProcessors() + 1);
  16.     }
  17.     if (poolPolicy.getResubmitSleepMillis() == null) {
  18.         poolPolicy.setResubmitSleepMillis(200L);
  19.     }
  20.     if (poolPolicy.getResubmitTimes() == null) {
  21.         poolPolicy.setResubmitTimes(5);
  22.     }
  23.     // - 线程池策略没有变化直接返回已有线程池。
  24.     ExecutorService original = this.executorService;
  25.     this.executorService = new ThreadPoolExecutor(
  26.             poolPolicy.getCorePoolSize(),
  27.             poolPolicy.getMaximumPoolSize(),
  28.             poolPolicy.getKeepAliveTime(), TimeUnit.SECONDS,
  29.             new ArrayBlockingQueue<>(poolPolicy.getQueueSize()),
  30.             new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").setDaemon(true).build(),
  31.             new ThreadPoolExecutor.AbortPolicy());
  32.     this.poolPolicy = poolPolicy;
  33.     if (original != null) {
  34.         original.shutdownNow();
  35.     }
  36. }
复制代码
任务提交:
线程池封装对象中使用的线程池拒绝策略是AbortPolicy,因此在线程数和阻塞队列到达上限后会触发异常。另外在这里为了保证提交的成功率利用重试策略实现了一定程度的延迟处理,具体场景中可以结合业务特点进行适当的调节和配置。
  1. public <T> Future<T> submit(Callable<T> task) {
  2.     RejectedExecutionException exception = null;
  3.     Future<T> future = null;
  4.     for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
  5.         try {
  6.             // - 添加任务
  7.             future = this.executorService.submit(task);
  8.             exception = null;
  9.             break;
  10.         } catch (RejectedExecutionException e) {
  11.             exception = e;
  12.             this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
  13.         }
  14.     }
  15.     if (exception != null) {
  16.         throw exception;
  17.     }
  18.     return future;
  19. }
复制代码
监控:
1、submit提交的监控
见代码中的「监控点①」,在submit方法中添加监控点,监控key的需要添线程池封装对象的线程名称前缀,用于区分具体的线程池对象。
「监控点①」用于监控添加任务的动作是否正常,以便对线程池对象及策略参数进行微调。
  1. public <T> Future<T> submit(Callable<T> task) {
  2.     // - 监控点①
  3.     CallerInfo callerInfo = Profiler.registerInfo(UmpConstant.THREAD_POOL_WAP + threadNamePrefix,
  4.                 UmpConstant.APP_NAME,
  5.                 UmpConstant.UMP_DISABLE_HEART,
  6.                 UmpConstant.UMP_ENABLE_TP);
  7.     RejectedExecutionException exception = null;
  8.     Future<T> future = null;
  9.     for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
  10.         try {
  11.             // - 添加任务
  12.             future = this.executorService.submit(task);
  13.             exception = null;
  14.             break;
  15.         } catch (RejectedExecutionException e) {
  16.             exception = e;
  17.             this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
  18.         }
  19.     }
  20.     if (exception != null) {
  21.         // - 监控点①
  22.         Profiler.functionError(callerInfo);
  23.         throw exception;
  24.     }
  25.     // - 监控点①
  26.     Profiler.registerInfoEnd(callerInfo);
  27.     return future;
  28. }
复制代码
2、线程池并行任务
见代码的「监控点②」,分别在添加任务和任务完成后。
「监控点②」实时统计在线程中执行的总任务数量,用于评估线程池的任务的数量的满载水平。
  1. /** 任务并行数量统计 */
  2. private AtomicInteger parallelTaskCount = new AtomicInteger(0);
  3. public <T> Future<T> submit(Callable<T> task) {
  4.     RejectedExecutionException exception = null;
  5.     Future<T> future = null;
  6.     for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
  7.         try {
  8.             // - 添加任务
  9.             future = this.executorService.submit(()-> {
  10.                 T rst = task.call();
  11.                 // - 监控点②
  12.                 log.info("{} - Parallel task count {}", this.threadNamePrefix,  this.parallelTaskCount.decrementAndGet());
  13.                 return rst;
  14.             });
  15.             // - 监控点②
  16.             log.info("{} + Parallel task count {}", this.threadNamePrefix,  this.parallelTaskCount.incrementAndGet());
  17.             exception = null;
  18.             break;
  19.         } catch (RejectedExecutionException e) {
  20.             exception = e;
  21.             this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
  22.         }
  23.     }
  24.     if (exception != null) {
  25.         throw exception;
  26.     }
  27.     return future;
  28. }
复制代码
3、调节

线程池封装对象策略的调节时机
1)上线前基于流量预估的压测阶段;
2)上线后跟进监控数据和线程池中任务的满载水平进行人工微调,也可以通过JOB在指定的时间自动调整;
3)大促前依据往期大促峰值来调高相关参数。
线程池封装对象策略的调节经验
1)访问时长要求较低时,我们可以考虑调小线程数和阻塞队列,适当调大提交任务重试等待时间和次数,以便降低资源占用。
2)访问时长要求较高时,就需要调大线程数并保证相对较小的阻塞队列,调小提交任务的重试等待时间和次数甚至分别调成0和1(即关闭重试提交逻辑)。
作者:京东零售 王文明
来源:京东云开发者社区 转载请注明来源

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4