一文揭开JDK21虚拟线程的神秘面纱

打印 上一主题 下一主题

主题 832|帖子 832|积分 2496

虚拟线程快速体验

环境:JDK21 + IDEA
  1. public static void main(String[] args) {
  2.     try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
  3.         IntStream.range(0, 10_000).forEach(i -> {
  4.             executor.submit(() -> {
  5.                 Thread.sleep(Duration.ofSeconds(1));
  6.                 return i;
  7.             });
  8.         });
  9.     }
  10. }
复制代码
运行上面的代码看下执行时间,再试下 Executors.newFixedThreadPool(20) 和 Executors.newCachedThreadPool()
不出不测的话,会发现Executors.newVirtualThreadPerTaskExecutor()运行速度最快,Executors.newCachedThreadPool()运行时系统最卡顿,Executors.newFixedThreadPool(20) 最慢。
Executors.newCachedThreadPool()卡顿是因为一个使命创建一个Platform线程,占用了太多系统资源。
Executors.newFixedThreadPool(20)运行慢是因为只有20个并发去执行1万个使命
Executors.newVirtualThreadPerTaskExecutor()雷同Executors.newCachedThreadPool(),但是创建的是虚拟线程,所以在获得高并发的同时也没有占用太多系统资源。
为什么引入虚拟线程

首先,我们来看看如今的Java线程是怎样的。
java.lang.Thread 这个类我信赖大家都不陌生,代表Java中的最小并发单元,即一个线程。它是Java对底层的操纵系统线程(OS Thread)的封装,为了区别于OS线程,我们称之为平台线程(Platform Thread)。当我们初始化一个Thread实例时,其实就是创建了一个Platform线程并将之与一个OS线程绑定(1:1)。
这种方式存在以下问题:

  • OS线程是有限的,Platform线程的创建数量受限制于OS线程
  • 因为绑定系统资源,因此线程的创建/销毁的代价都是昂贵的
这两个问题并非无解,比如,问题1的本质是垂直扩展到顶了,完全可以用水平扩展的方式解决,一台机器的OS线程不能满意需求,再增加一台便是;问题2可以通过池化技术来解决,既然线程的创建和销毁代价比较昂贵,那便将创建好的线程收集起来,推迟销毁的时机,尽量复用它。
JDK21则是在语言层面上的提供了一个更换方案,也就是本文要介绍的虚拟线程(virtual thread),熟悉linux的同学肯定知道系统线程和用户线程的区别,虚拟线程就像是JDK实现的“用户线程”,下面来重点介绍。
什么是虚拟线程

虚拟线程,可以看作是对Platform线程的轻量级封装,Platform线程和OS线程的关系是1:1,虚拟线程和Platform线程的关系则是M:N,且一般M要远远大于N。
可以直接看下虚拟线程的构造函数源码加深理解,坐标java.lang.VirtualThread#。
虚拟线程实例化
  1. final class VirtualThread extends BaseVirtualThread {
  2.     VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
  3.         super(name, characteristics, /*bound*/ false);
  4.         Objects.requireNonNull(task);
  5.         // choose scheduler if not specified
  6.         if (scheduler == null) {
  7.             Thread parent = Thread.currentThread();
  8.             if (parent instanceof VirtualThread vparent) {
  9.                 scheduler = vparent.scheduler;
  10.             } else {
  11.                 scheduler = DEFAULT_SCHEDULER;
  12.             }
  13.         }
  14.         this.scheduler = scheduler;
  15.         this.cont = new VThreadContinuation(this, task);
  16.         this.runContinuation = this::runContinuation;
  17.     }
  18. }
  19. private static ForkJoinPool createDefaultScheduler() {
  20.         ForkJoinWorkerThreadFactory factory = pool -> {
  21.             PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
  22.             return AccessController.doPrivileged(pa);
  23.         };
  24.         PrivilegedAction<ForkJoinPool> pa = () -> {
  25.             int parallelism, maxPoolSize, minRunnable;
  26.             String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
  27.             String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
  28.             String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
  29.             if (parallelismValue != null) {
  30.                 parallelism = Integer.parseInt(parallelismValue);
  31.             } else {
  32.                 parallelism = Runtime.getRuntime().availableProcessors();
  33.             }
  34.             if (maxPoolSizeValue != null) {
  35.                 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
  36.                 parallelism = Integer.min(parallelism, maxPoolSize);
  37.             } else {
  38.                 maxPoolSize = Integer.max(parallelism, 256);
  39.             }
  40.             if (minRunnableValue != null) {
  41.                 minRunnable = Integer.parseInt(minRunnableValue);
  42.             } else {
  43.                 minRunnable = Integer.max(parallelism / 2, 1);
  44.             }
  45.             Thread.UncaughtExceptionHandler handler = (t, e) -> { };
  46.             boolean asyncMode = true; // FIFO
  47.             return new ForkJoinPool(parallelism, factory, handler, asyncMode,
  48.                          0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
  49.         };
  50.         return AccessController.doPrivileged(pa);
  51.     }
复制代码
可以看到,创建虚拟线程的时候,利用了一个默认的调度器(ForkJoinPool),也就是Platform的线程池,可以看到池子的几个配置参数。

  • 最大Platform线程数:默以为系统核心数,最大为256,可以通过jdk.virtualThreadScheduler.maxPoolSize设置
这个时候,爱思考的同学大概就要问了,既然默认的最大Platform线程数为系统核心数,岂不是大大限制了并发能力?是不是要主动设置一个较大值?
答案是不必要,因为JDK在线程池的基础上实现了调度的功能。当虚拟线程启动时,调度器会将虚拟线程mount到Platform线程,此时该Platform线程被称为这个虚拟线程的carrier;当线程运行碰到IO操纵必要等候时,调度器又会将虚拟现场unmount,把Platform线程释放出来给其他虚拟线程利用,不占用CPU时间。因此,对于非CPU密集的应用,很少的Platform线程就能支持大量的虚拟线程来执行使命。事实上,对于CPU密集的应用,虚拟线程并不会带来多大的提升。虚拟线程真正的应用场景是生存周期短、调用栈浅的使命,如一次http请求、一次JDBC查询。
必要明白的是,操纵系统真正能同时运算的线程数也就只有逻辑CPU数,多出来的线程只能等候系统的调度获得CPU时间。
虚拟线程状态

stateDiagram-v2NEW --> STARTED        STARTED --> TERMINATED     STARTED --> RUNNING        RUNNING --> TERMINATED     RUNNING --> PARKING        PARKING --> PARKED         PARKING --> PINNED         PARKED --> UNPARKED       PINNED --> RUNNING        UNPARKED --> RUNNING                可以看出,虚拟线程相较原先的线程状态,多了Parked、Unparked、Pinned等状态

  • Parked:就是前面说的mount
  • Unparked:就是前面说的unmount
  • Pinned:虚拟线程阻塞时,正常会unmount,但是在一些特殊场景下,不能unmount,此时就会进入Pinned状态:

    • 阻塞操纵在 synchronized 代码块中(后续JDK大概优化这一点限制)
    • 执行 native 方法时
    Pinned状态占用了Platform线程,无疑会影响性能,官方建议对于经常执行的 synchronized 代码块,最好利用java.util.concurrent.locks.ReentrantLock 更换。如果不清楚自己代码里哪些地方利用到了 synchronized 代码块,在切换利用虚拟线程时,可以添加JVM参数jdk.tracePinnedThreads帮助排查。

总结

虚拟线程特别实用如了局景:有大量的并发使命必要执行,且使命是非CPU密集的。
虚拟线程利用上和普通的线程没有太大区别,甚至因为内置了调度逻辑和线程池,可以让开辟人员不用再考虑线程池的大小、拒绝计谋等,尤其给框架开辟者提供了新的优化思路。
对于已经利用了reactive技术的如webFlux框架,没必要再切换到虚拟线程,两者性能相当。
对于web容器如tomcat来说,自己已经利用reactor、nio等技术优化吞吐量,在小的并发数场景下,没必要切换虚拟线程,提升不大。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

写过一篇

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表