掌握JDK21全新结构化并发编程,轻松提升开发效率!

打印 上一主题 下一主题

主题 915|帖子 915|积分 2745

1 概要

通过引入结构化并发编程的API,简化并发编程。结构化并发将在不同线程中运行的相关任务组视为单个工作单元,从而简化错误处理和取消操作,提高可靠性,并增强可观察性。这是一个预览版的API。
2 历史

结构化并发是由JEP 428提出的,并在JDK 19中作为孵化API发布。它在JDK 20中被JEP 437重新孵化,通过对作用域值(JEP 429)进行轻微更新。
我们在这里提议将结构化并发作为JUC包中的预览API。唯一重要变化是StructuredTaskScope::fork(...)方法返回一个[子任务],而不是一个Future,如下面所讨论的。
3 目标

推广一种并发编程风格,可以消除由于取消和关闭而产生的常见风险,如线程泄漏和取消延迟。
提高并发代码的可观察性。
4 非目标

不替换JUC包中的任何并发构造,如ExecutorService和Future。
不定义Java平台的最终结构化并发API。其他结构化并发构造可以由第三方库定义,或在未来的JDK版本中定义。
不定义在线程之间共享数据流的方法(即通道)。会在未来提出这样做。
不用新的线程取消机制替换现有的线程中断机制。会在未来提出这样做。
5 动机

开发人员通过将任务分解为多个子任务来管理复杂性。在普通的单线程代码中,子任务按顺序执行。然而,如果子任务彼此足够独立,并且存在足够的硬件资源,那么通过在不同线程中并发执行子任务,可以使整个任务运行得更快(即具有较低的延迟)。例如,将多个I/O操作的结果组合成一个任务,如果每个I/O操作都在自己的线程中并发执行,那么任务将运行得更快。虚拟线程(JEP 444)使得为每个此类I/O操作分配一个线程成为一种具有成本效益的方法,但是管理可能会产生大量线程仍然是一个挑战。
6 ExecutorService 非结构化并发

java.util.concurrent.ExecutorService API 是在 Java 5 中引入的,它帮助开发人员以并发方式执行子任务。
如下 handle() 的方法,它表示服务器应用程序中的一个任务。它通过将两个子任务提交给 ExecutorService 来处理传入的请求。
ExecutorService 立即返回每个子任务的 Future,并根据 Executor 的调度策略同时执行这些子任务。handle() 方法通过阻塞调用它们的 Future 的 get() 方法来等待子任务的结果,因此该任务被称为加入了其子任务。
  1. Response handle() throws ExecutionException, InterruptedException {
  2.     Future<String> user = esvc.submit(() -> findUser());
  3.     Future<Integer> order = esvc.submit(() -> fetchOrder());
  4.     String theUser = user.get();   // 加入 findUser
  5.     int theOrder = order.get();    // 加入 fetchOrder
  6.     return new Response(theUser, theOrder);
  7. }
复制代码
由于子任务并发执行,每个子任务都可独立地成功或失败。在这个上下文中,"失败" 意味着抛出异常。通常,像 handle() 这样的任务应该在任何一个子任务失败时失败。当出现失败时,理解线程的生命周期会变得非常复杂:

  • 如 findUser() 抛异常,那么调用 user.get() 时 handle() 也会抛出异常,但是 fetchOrder() 会继续在自己的线程中运行。这是线程泄漏,最好情况下浪费资源,最坏情况下 fetchOrder() 的线程可能会干扰其他任务。
  • 如执行 handle() 的线程被中断,这个中断不会传播到子任务。findUser() 和 fetchOrder() 的线程都会泄漏,即使在 handle() 失败后仍然继续运行。
  • 如果 findUser() 执行时间很长,但是在此期间 fetchOrder() 失败,那么 handle() 将不必要地等待 findUser(),因为它会在 user.get() 上阻塞,而不是取消它。只有在 findUser() 完成并且 user.get() 返回后,order.get() 才会抛出异常,导致 handle() 失败。
每种case下,问题在于我们的程序在逻辑上被结构化为任务-子任务关系,但这些关系只存在于开发人员的头脑中。这不仅增加错误可能性,还会使诊断和排除此类错误变得更加困难。例如,线程转储等可观察性工具会在不相关的线程调用栈中显示 handle()、findUser() 和 fetchOrder(),而没有任务-子任务关系的提示。
可尝试在错误发生时显式取消其他子任务,例如通过在失败的任务的 catch 块中使用 try-finally 包装任务,并调用其他任务的 Future 的 cancel(boolean) 方法。我们还需要在 try-with-resources 语句中使用 ExecutorService,就像
  1. try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
  2.     IntStream.range(0, 10_000).forEach(i -> {
  3.         executor.submit(() -> {
  4.             Thread.sleep(Duration.ofSeconds(1));
  5.             return i;
  6.         });
  7.     });
  8. }  // executor.close() is called implicitly, and waits
复制代码
因为 Future 没有提供等待被取消的任务的方法。但所有这些都很难做到,并且往往会使代码的逻辑意图变得更加难以理解。跟踪任务之间的关系,并手动添加所需的任务间取消边缘,是对开发人员的一种很大要求。
无限制的并发模式

这种需要手动协调生命周期的需求是因为 ExecutorService 和 Future 允许无限制的并发模式。在涉及的所有线程中,没有限制或顺序:

  • 一个线程可以创建一个 ExecutorService
  • 另一个线程可以向其提交工作
  • 执行工作的线程与第一个或第二个线程没有任何关系
线程提交工作之后,一个完全不同的线程可以等待执行的结果。具有对 Future 的引用的任何代码都可以加入它(即通过调用 get() 等待其结果),甚至可以在与获取 Future 的线程不同的线程中执行代码。实际上,由一个任务启动的子任务不必返回到提交它的任务。它可以返回给许多任务中的任何一个,甚至可能是没有返回给任何任务。
因为 ExecutorService 和 Future 允许这种无结构的使用,它们既不强制执行也不跟踪任务和子任务之间的关系,尽管这些关系是常见且有用的。因此,即使子任务在同一个任务中被提交和加入,一个子任务的失败也不能自动导致另一个子任务的取消。在上述的 handle() 方法中,fetchOrder() 的失败不能自动导致 findUser() 的取消。fetchOrder() 的 Future 与 findUser() 的 Future 没有关系,也与最终通过其 get() 方法加入它的线程无关。与其要求开发人员手动管理这种取消,我们希望能够可靠地自动化这一过程。
任务结构应反映代码结构

与 ExecutorService 下的自由线程组合相反,单线程代码的执行总是强制执行任务和子任务的层次结构。方法的代码块 {...} 对应一个任务,代码块内部调用的方法对应子任务。调用的方法必须返回给调用它的方法,或者抛出异常给调用它的方法。它不能生存于调用它的方法之外,也不能返回或抛出异常给其他方法。因此,所有子任务在任务之前完成,每个子任务都是其父任务的子任务,每个子任务的生命周期相对于其他子任务和任务来说,都由代码块结构的语法规则来管理。
如单线程版本的 handle() 中,任务-子任务关系在语法结构明显:
  1. Response handle() throws IOException {
  2.     String theUser = findUser();
  3.     int theOrder = fetchOrder();
  4.     return new Response(theUser, theOrder);
  5. }
复制代码
我们不会在 findUser() 子任务完成之前启动 fetchOrder() 子任务,无论 findUser() 是成功还是失败。如果 findUser() 失败,我们根本不会启动 fetchOrder(),而且 handle() 任务会隐式地失败。一个子任务只能返回给其父任务,这是很重要的:这意味着父任务可以将一个子任务的失败隐式地视为触发来取消其他未完成的子任务,然后自己失败。
单线程代码中,任务-子任务层次关系在运行时的调用栈中得到体现。因此,我们获得了相应的父子关系,这些关系管理着错误传播。观察单个线程时,层次关系显而易见:findUser()(及后来的 fetchOrder())似乎是在 handle() 下执行的。这使得回答问题 "handle() 正在处理什么?" 很容易。
如任务和子任务之间的父子关系在代码的语法结构中明显,并且在运行时得到了体现,那并发编程将更加容易、可靠且易于观察,就像单线程代码一样。语法结构将定义子任务的生命周期,并使得能够在运行时创建一个类似于单线程调用栈的线程层次结构的表示。这种表示将实现错误传播、取消以及对并发程序的有意义的观察。
7 结构化并发

结构化并发是一种并发编程方法,它保持了任务和子任务之间的自然关系,从而实现了更具可读性、可维护性和可靠性的并发代码。"结构化并发" 这个术语由 Martin Sústrik 提出,并由 Nathaniel J. Smith 推广。从其他编程语言中的概念,如 Erlang 中的层次监控者,可以了解到结构化并发中错误处理的设计思想。
结构化并发源于一个简单的原则:
如果一个任务分解为并发的子任务,那么所有这些子任务都会返回到同一个地方,即任务的代码块。
在结构化并发中,子任务代表任务工作。任务等待子任务的结果并监视它们的失败情况。与单线程代码中的结构化编程技术类似,结构化并发在多线程中的威力来自于两个思想:

  • 为代码块中的执行流程定义明确的进入和退出点
  • 在严格的操作生命周期嵌套中,以反映它们在代码中的语法嵌套方式
由于代码块的进入和退出点被明确定义,因此并发子任务的生命周期被限定在其父任务的语法块中。因为同级子任务的生命周期嵌套在其父任务的生命周期之内,因此可以将它们作为一个单元进行推理和管理。由于父任务的生命周期,依次嵌套在其父任务的生命周期之内,运行时可以将任务层次结构实现为树状结构,类似于单线程调用栈的并发对应物。这允许代码为任务子树应用策略,如截止时间,并允许可观察性工具将子任务呈现为父任务的下属。
结构化并发非常适合虚拟线程,这是由JDK实现的轻量级线程。许多虚拟线程可以共享同一个操作系统线程,从而可以支持非常大量的虚拟线程。除此外,虚拟线程足够廉价,可以表示任何涉及I/O等并发行为。这意味着服务器应用程序可以使用结构化并发来同时处理成千上万甚至百万个传入请求:它可以为处理每个请求的任务分配一个新的虚拟线程,当一个任务通过提交子任务进行并发执行时,它可以为每个子任务分配一个新的虚拟线程。在幕后,任务-子任务关系通过为每个虚拟线程提供一个对其唯一父任务的引用来实现为树状结构,类似于调用栈中的帧引用其唯一的调用者。
总之,虚拟线程提供了大量的线程。结构化并发可以正确且强大地协调它们,并使可观察性工具能够按照开发人员的理解显示线程。在JDK中拥有结构化并发的API将使构建可维护、可靠且可观察的服务器应用程序变得更加容易。
8 描述

结构化并发 API 的主要类是 java.util.concurrent 包中的 StructuredTaskScope。该类允许开发人员将一个任务结构化为一组并发的子任务,并将它们作为一个单元进行协调。子任务通过分别分叉它们并将它们作为一个单元加入,可能作为一个单元取消,来在它们自己的线程中执行。子任务的成功结果或异常由父任务汇总并处理。StructuredTaskScope 将子任务的生命周期限制在一个清晰的词法作用域内,在这个作用域中,任务与其子任务的所有交互(分叉、加入、取消、处理错误和组合结果)都发生。
前面提到的 handle() 示例,使用 StructuredTaskScope 编写:
  1. Response handle() throws ExecutionException, InterruptedException {
  2.     try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
  3.         Supplier<String> user = scope.fork(() -> findUser());
  4.         Supplier<Integer> order = scope.fork(() -> fetchOrder());
  5.         scope.join()             // 加入两个子任务
  6.              .throwIfFailed();   // ... 并传播错误
  7.         // 两个子任务都成功完成,因此组合它们的结果
  8.         return new Response(user.get(), order.get());
  9.     }
  10. }
复制代码
与原始示例相比,理解涉及的线程的生命周期在这里变得更加容易:在所有情况下,它们的生命周期都限制在一个词法作用域内,即 try-with-resources 语句的代码块内。此外,使用 StructuredTaskScope 可以确保一些有价值的属性:

  • 错误处理与短路 — 如果 findUser() 或 fetchOrder() 子任务中的任何一个失败,另一个如果尚未完成则会被取消。(这由 ShutdownOnFailure 实现的关闭策略来管理;还有其他策略可能)。
  • 取消传播 — 如果在运行 handle() 的线程在调用 join() 之前或之中被中断,则线程在退出作用域时会自动取消两个子任务。
  • 清晰性 — 上述代码具有清晰的结构:设置子任务,等待它们完成或被取消,然后决定是成功(并处理已经完成的子任务的结果)还是失败(子任务已经完成,因此没有更多需要清理的)。
  • 可观察性 — 如下所述,线程转储清楚地显示了任务层次结构,其中运行 findUser() 和 fetchOrder() 的线程被显示为作用域的子任务。
9 突破预览版限制

StructuredTaskScope 是预览版 API,默认禁用。要使用 StructuredTaskScope API,需启用预览 API:

  • 使用 javac --release 21 --enable-preview Main.java 编译程序,然后使用 java --enable-preview Main 运行它;或
  • 当使用源代码启动器时,使用 java --source 21 --enable-preview Main.java 运行程序
  • IDEA 运行时,勾选即可:
10 使用 StructuredTaskScope

10.1 API

[code]public class StructuredTaskScope implements AutoCloseable {    public <U extends T> Subtask<U> fork(Callable

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

吴旭华

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表