Java 原生异步编程与Spring 异步编程 详解

打印 上一主题 下一主题

主题 1622|帖子 1622|积分 4876

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
简介

Java 异步编程是现代高性能应用开发的焦点技能之一,它允许程序在实验耗时使用(如网络哀求、文件 IO)时不必阻塞主线程,从而提高系统吞吐量和响应性。
异步 vs 同步


  • 同步:任务按顺序实验,后续任务需等待前任务完成。
  1. public String syncTask() {
  2.     // 模拟耗时操作
  3.     Thread.sleep(1000);
  4.     return "Result";
  5. }
复制代码

  • 异步:任务并行或在背景实验,主线程立刻返回。
  1. public CompletableFuture<String> asyncTask() {
  2.     return CompletableFuture.supplyAsync(() -> {
  3.         try {
  4.             Thread.sleep(1000);
  5.         } catch (InterruptedException e) {
  6.             throw new RuntimeException(e);
  7.         }
  8.         return "Result";
  9.     });
  10. }
复制代码
Java 原生异步支持

手动创建线程

最基本的异步方式是创建 Thread 或实现 Runnable。

  • 缺点:管理线程池困难,资源浪费,难以复用,缺乏效果处理机制。
  1. public class BasicAsync {
  2.     public static void main(String[] args) {
  3.         Thread thread = new Thread(() -> {
  4.             try {
  5.                 Thread.sleep(1000);
  6.                 System.out.println("Task completed");
  7.             } catch (InterruptedException e) {
  8.                 e.printStackTrace();
  9.             }
  10.         });
  11.         thread.start();
  12.         System.out.println("Main thread continues");
  13.     }
  14. }
复制代码
使用 ExecutorService


  • 优点:提供线程池管理,复用线程,减少创建开销
  • 缺点:Future.get() 是阻塞的,难以链式调用
  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. public class ThreadPoolExample {
  4.     public static void main(String[] args) {
  5.         ExecutorService executor = Executors.newFixedThreadPool(2);
  6.         executor.submit(() -> {
  7.             Thread.sleep(1000);
  8.             System.out.println("Task 1 completed");
  9.         });
  10.         executor.submit(() -> {
  11.             Thread.sleep(500);
  12.             System.out.println("Task 2 completed");
  13.         });
  14.         executor.shutdown();
  15.     }
  16. }
复制代码
常用方法:


  • submit(Runnable):提交无返回值的任务。
  • submit(Callable):提交有返回值的任务,返回 Future。
  • shutdown():关闭线程池,不接受新任务。
线程池类型:


  • Executors.newFixedThreadPool(n):固定大小线程池。
  • Executors.newCachedThreadPool():动态调解线程数。
  • Executors.newSingleThreadExecutor():单线程实验。
线程池类型对比:
类型特性适用场景FixedThreadPool固定线程数,无界队列负载稳定的长期任务CachedThreadPool自动扩容,60秒闲置回收短时突发任务ScheduledThreadPool支持定时/周期性任务心跳检测、定时报表WorkStealingPool使用 ForkJoinPool,任务窃取算法盘算密集型并行任务Future(Java 5+)
  1. import java.util.concurrent.*;
  2. public class FutureExample {
  3.     public static void main(String[] args) throws Exception {
  4.         ExecutorService executor = Executors.newFixedThreadPool(1);
  5.         Future<String> future = executor.submit(() -> {
  6.             Thread.sleep(1000);
  7.             return "Task completed";
  8.         });
  9.         // 主线程继续
  10.         System.out.println("Doing other work");
  11.         // 阻塞获取结果
  12.         String result = future.get(); // 等待任务完成
  13.         System.out.println(result);
  14.         executor.shutdown();
  15.     }
  16. }
复制代码
方法


  • get():阻塞获取效果。
  • isDone():检查任务是否完成。
  • cancel(boolean):取消任务。
缺点


  • get() 是阻塞的,不利于非阻塞编程。
  • 难以组合多个异步任务。
CompletableFuture(Java 8+)

支持链式调用,真正现代化异步编程方式。
  1. import java.util.concurrent.CompletableFuture;
  2. public class CompletableFutureExample {
  3.     public static void main(String[] args) {
  4.         CompletableFuture.supplyAsync(() -> {
  5.             try {
  6.                 Thread.sleep(1000);
  7.             } catch (InterruptedException e) {
  8.                 throw new RuntimeException(e);
  9.             }
  10.             return "Task result";
  11.         })
  12.         .thenApply(result -> result.toUpperCase()) // 转换结果
  13.         .thenAccept(result -> System.out.println(result)) // 消费结果
  14.         .exceptionally(throwable -> {
  15.             System.err.println("Error: " + throwable.getMessage());
  16.             return null;
  17.         });
  18.         System.out.println("Main thread continues");
  19.     }
  20. }
复制代码
假造线程(Java 21+,Project Loom)

假造线程是 Java 21 引入的轻量级线程,适合高并发 I/O 密集型任务。
  1. public class VirtualThreadExample {
  2.     public static void main(String[] args) {
  3.         try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
  4.             executor.submit(() -> {
  5.                 try {
  6.                     Thread.sleep(1000);
  7.                     System.out.println("Task completed in virtual thread");
  8.                 } catch (InterruptedException e) {
  9.                     e.printStackTrace();
  10.                 }
  11.             });
  12.         }
  13.         System.out.println("Main thread continues");
  14.     }
  15. }
复制代码
优势

  • 轻量级,创建开销极低(相比传统线程)。
  • 适合 I/O 密集型任务(如 HTTP 哀求、数据库查询)。
注意

  • 不适合 CPU 密集型任务(可能导致线程饥饿)。
  • Spring Boot 3.2+ 支持假造线程(需配置)。
阻塞 vs 非阻塞

类型是否阻塞获取效果方式Future✅ 是future.get()(阻塞)CompletableFuture✅(get) ❌(then)支持非阻塞链式处理@Async + Future/CompletableFuture✅get() 或回调WebFlux❌ 完全非阻塞响应式 Mono / FluxFuture vs CompletableFuture:焦点对比

功能FutureCompletableFutureJava 版本Java 5+Java 8+是否可组合❌ 不支持✅ 支持链式组合、并行实验支持异步回调❌ 无✅ 有 .thenApply()、.thenAccept() 等支持非常处理❌ 无✅ 有 .exceptionally() 等可取消✅ 支持 cancel()✅ 也支持阻塞获取✅ get() 阻塞✅ get() 阻塞(也可非阻塞)使用场景简单线程任务多异步任务组合、复杂控制流Spring 异步编程(基于 @Async)

配置类或启动类启用异步支持
  1. @SpringBootApplication
  2. @EnableAsync
  3. public class Application {
  4.     public static void main(String[] args) {
  5.         SpringApplication.run(Application.class, args);
  6.     }
  7. }
复制代码
  1. @Configuration
  2. @EnableAsync
  3. public class AsyncConfig {
  4. }
复制代码
无返回值用法
  1. // 无返回值的异步方法
  2. @Async
  3. public void sendEmail(String to) {
  4.     System.out.println("异步发送邮件给: " + to);
  5.     try { Thread.sleep(2000); } catch (InterruptedException e) {}
  6.     System.out.println("邮件发送完成");
  7. }
复制代码
使用 Future

创建异步方法
  1. @Service
  2. public class AsyncService {
  3.     @Async
  4.     public Future<String> processTask() {
  5.         // 模拟耗时操作
  6.         return new AsyncResult<>("Task completed");
  7.     }
  8. }
复制代码
调用并获取效果:
  1. @Autowired
  2. private AsyncService asyncService;
  3. public void executeTask() throws Exception {
  4.     Future<String> future = asyncService.processTask();
  5.     String result = future.get(); // 阻塞等待结果
  6. }
复制代码
使用 CompletableFuture

创建异步方法
  1. @Async
  2. public CompletableFuture<String> asyncMethod() {
  3.     return CompletableFuture.completedFuture("Async Result");
  4. }
复制代码
调用方式:
  1. CompletableFuture<String> result = asyncService.asyncMethod();
  2. // 非阻塞,可以做其他事
  3. String value = result.get(); // 阻塞获取
复制代码
线程池配置

使用自定义配置类
  1. @Configuration
  2. public class AsyncConfig {
  3.     @Bean("taskExecutor")
  4.     public Executor taskExecutor() {
  5.         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  6.         executor.setCorePoolSize(5);      // 核心线程数
  7.         executor.setMaxPoolSize(20);      // 最大线程数
  8.         executor.setQueueCapacity(100);   // 队列容量
  9.         executor.setKeepAliveSeconds(30); // 空闲线程存活时间
  10.         executor.setThreadNamePrefix("async-task-");
  11.         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  12.         executor.initialize();
  13.         return executor;
  14.     }
  15. }
  16. // 指定线程池
  17. @Async("taskExecutor")
  18. public Future<String> customPoolTask() { ... }
复制代码
使用配置文件
  1. # application.yml
  2. spring:
  3.   task:
  4.     execution:
  5.       pool:
  6.         core-size: 5
  7.         max-size: 20
  8.         queue-capacity: 100
  9.         thread-name-prefix: async-
  10.       shutdown:
  11.         await-termination: true
  12.         terminate-on-timeout: true
复制代码
Spring WebFlux 示例
  1. @Service
  2. public class UserService {
  3.     public Mono<String> getUser() {
  4.         return Mono.just("用户信息").delayElement(Duration.ofSeconds(2));
  5.     }
  6.     public Flux<String> getAllUsers() {
  7.         return Flux.just("用户1", "用户2", "用户3").delayElements(Duration.ofSeconds(1));
  8.     }
  9. }
复制代码
  1. @RestController
  2. @RequestMapping("/users")
  3. public class UserController {
  4.     @Autowired
  5.     private UserService userService;
  6.     @GetMapping("/one")
  7.     public Mono<String> getUser() {
  8.         return userService.getUser();
  9.     }
  10.     @GetMapping("/all")
  11.     public Flux<String> getAllUsers() {
  12.         return userService.getAllUsers();
  13.     }
  14. }
复制代码
调用时非阻塞举动体现

  • Mono 表示未来异步返回一个值;
  • Flux 表示异步返回多个值;
  • 哀求立刻返回 Publisher,只有订阅时才开始实验(懒实验、非阻塞);
  • 它不占用线程,不会“卡死线程”等待值返回。
SpringBoot 集成示例


  • 标记 @Async 注解:
@Async 标记方法为异步实验,Spring 在线程池中运行该方法。
  1. import org.springframework.scheduling.annotation.Async;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class AsyncService {
  5.     @Async
  6.     public CompletableFuture<String> doAsyncTask() {
  7.         try {
  8.             Thread.sleep(1000);
  9.         } catch (InterruptedException e) {
  10.             throw new RuntimeException(e);
  11.         }
  12.         return CompletableFuture.completedFuture("Task completed");
  13.     }
  14. }
复制代码

  • 启用异步
在主类或配置类上添加 @EnableAsync。
  1. @SpringBootApplication
  2. @EnableAsync
  3. public class Application {
  4.     public static void main(String[] args) {
  5.         SpringApplication.run(Application.class, args);
  6.     }
  7. }
复制代码

  • 控制器调用异步方法
  1. @RestController
  2. public class AsyncController {
  3.     @Autowired
  4.     private AsyncService asyncService;
  5.     @GetMapping("/async")
  6.     public String triggerAsync() {
  7.         asyncService.doAsyncTask().thenAccept(result -> System.out.println(result));
  8.         return "Task triggered";
  9.     }
  10. }
复制代码

  • 自定义线程池
Spring 默认使用 SimpleAsyncTaskExecutor,不适合生产环境。保举配置自定义线程池。
  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  4. @Configuration
  5. public class AsyncConfig {
  6.     @Bean(name = "taskExecutor")
  7.     public ThreadPoolTaskExecutor taskExecutor() {
  8.         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  9.         executor.setCorePoolSize(2);
  10.         executor.setMaxPoolSize(10);
  11.         executor.setQueueCapacity(25);
  12.         executor.setThreadNamePrefix("AsyncThread-");
  13.         executor.initialize();
  14.         return executor;
  15.     }
  16. }
复制代码

  • 指定线程池:
  1. @Async("taskExecutor")
  2. public CompletableFuture<String> doAsyncTask() {
  3.     // 异步逻辑
  4. }
复制代码

  • 为 @Async 方法定义全局非常处理器
  1. @Component
  2. public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
  3.     @Override
  4.     public void handleUncaughtException(Throwable ex, Method method, Object... params) {
  5.         System.err.println("Async error: " + ex.getMessage());
  6.     }
  7. }
复制代码

  • Spring Boot 测试:
  1. @SpringBootTest
  2. public class AsyncServiceTest {
  3.     @Autowired
  4.     private AsyncService asyncService;
  5.     @Test
  6.     void testAsync() throws Exception {
  7.         CompletableFuture<String> future = asyncService.doAsyncTask();
  8.         assertEquals("Task completed", future.get(2, TimeUnit.SECONDS));
  9.     }
  10. }
复制代码
并行调用多个服务示例

并行调用 getUser 和 getProfile,总耗时接近较慢的任务(~1s)。
  1. @Service
  2. public class UserService {
  3.     @Async
  4.     public CompletableFuture<User> getUser(Long id) {
  5.         return CompletableFuture.supplyAsync(() -> {
  6.             // 模拟远程调用
  7.             try {
  8.                 Thread.sleep(1000);
  9.             } catch (InterruptedException e) {
  10.                 throw new RuntimeException(e);
  11.             }
  12.             return new User(id, "User" + id);
  13.         });
  14.     }
  15.     @Async
  16.     public CompletableFuture<Profile> getProfile(Long id) {
  17.         return CompletableFuture.supplyAsync(() -> {
  18.             try {
  19.                 Thread.sleep(500);
  20.             } catch (InterruptedException e) {
  21.                 throw new RuntimeException(e);
  22.             }
  23.             return new Profile(id, "Profile" + id);
  24.         });
  25.     }
  26. }
  27. @RestController
  28. public class UserController {
  29.     @Autowired
  30.     private UserService userService;
  31.     @GetMapping("/user/{id}")
  32.     public CompletableFuture<UserProfile> getUserProfile(@PathVariable Long id) {
  33.         return userService.getUser(id)
  34.             .thenCombine(userService.getProfile(id),
  35.                 (user, profile) -> new UserProfile(user, profile));
  36.     }
  37. }
复制代码
异步批量处理示例

并行处理 10 个任务,明显减少总耗时。
  1. @Service
  2. public class BatchService {
  3.     @Async
  4.     public CompletableFuture<Void> processItem(int item) {
  5.         return CompletableFuture.runAsync(() -> {
  6.             try {
  7.                 Thread.sleep(100);
  8.                 System.out.println("Processed item: " + item);
  9.             } catch (InterruptedException e) {
  10.                 throw new RuntimeException(e);
  11.             }
  12.         });
  13.     }
  14. }
  15. @RestController
  16. public class BatchController {
  17.     @Autowired
  18.     private BatchService batchService;
  19.     @PostMapping("/batch")
  20.     public CompletableFuture<Void> processBatch() {
  21.         List<CompletableFuture<Void>> futures = new ArrayList<>();
  22.         for (int i = 1; i <= 10; i++) {
  23.             futures.add(batchService.processItem(i));
  24.         }
  25.         return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
  26.     }
  27. }
复制代码
MyBatis Plus 集成示例

MyBatis Plus 默认阻塞,可通过 @Async 或线程池异步化。
  1. @Service
  2. public class ReactiveService {
  3.     public Mono<String> fetchData() {
  4.         return Mono.just("Data")
  5.                    .delayElement(Duration.ofSeconds(1));
  6.     }
  7. }
  8. @RestController
  9. public class ReactiveController {
  10.     @Autowired
  11.     private ReactiveService reactiveService;
  12.     @GetMapping("/data")
  13.     public Mono<String> getData() {
  14.         return reactiveService.fetchData();
  15.     }
  16. }
复制代码
注意事项


  • @Async 方法必须是 public 的。
  • 不能在同一类内调用 @Async 方法(因 Spring AOP 代理机制)。
  • 默认线程池由 Spring 提供,可自定义。
CompletableFuture 所有焦点 API


  • supplyAsync():异步实验任务,返回值
  • runAsync():异步实验任务,无返回值
  • thenApply():接收前面任务效果并返回新效果
  • thenAccept():接收效果但无返回
  • thenRun():不接收效果也不返回,仅实验
  • thenCompose():嵌套异步任务
  • thenCombine():两个任务都完成后,归并效果
  • allOf():等多个任务全部完成
  • anyOf():任一任务完成即继续
  • exceptionally():捕捉非常并处理
  • whenComplete():无论乐成失败都实验
  • handle():可处理正常或非常效果
CompletableFuture 用法详解

创建异步任务

supplyAsync:基本异步任务实验
  1. @Repository
  2. public interface UserRepository extends JpaRepository<User, Long> {}
  3. @Service
  4. public class UserService {
  5.     @Autowired
  6.     private UserRepository userRepository;
  7.     @Async
  8.     public CompletableFuture<User> findUser(Long id) {
  9.         return CompletableFuture.supplyAsync(() -> userRepository.findById(id).orElse(null));
  10.     }
  11. }
复制代码
runAsync:异步实验任务,无返回值
  1. @Mapper
  2. public interface UserMapper extends BaseMapper<User> {}
  3. @Service
  4. public class UserService {
  5.     @Autowired
  6.     private UserMapper userMapper;
  7.     @Async
  8.     public CompletableFuture<User> getUser(Long id) {
  9.         return CompletableFuture.supplyAsync(() -> userMapper.selectById(id));
  10.     }
  11. }
复制代码
任务转换

thenApply(Function):转换效果,对效果加工
  1. CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "Result");
复制代码
thenCompose(Function):扁平化链式异步
  1. CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> System.out.println("Async run"));
复制代码
thenCombine(CompletionStage, BiFunction):两个任务完成后归并效果
  1. CompletableFuture<String> future = CompletableFuture
  2.     .supplyAsync(() -> "data")
  3.     .thenApply(data -> data.toUpperCase());
  4. System.out.println(future.get()); // DATA
复制代码
  1. CompletableFuture<String> composed = CompletableFuture
  2.     .supplyAsync(() -> "A")
  3.     .thenCompose(a -> CompletableFuture.supplyAsync(() -> a + "B"));
  4. composed.thenAccept(System.out::println); // 输出 AB
复制代码
消费效果

thenAccept(Consumer):消费效果
  1. CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "Hello");
  2. CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "World");
  3. cf1.thenCombine(cf2, (a, b) -> a + " " + b).thenAccept(System.out::println);
复制代码
thenRun(Runnable):继续实验下一个任务,无需前面效果
  1. CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
  2. CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
  3. CompletableFuture<String> result = f1.thenCombine(f2, (a, b) -> a + b);
  4. System.out.println(result.get()); // AB
复制代码
非常处理

exceptionally(Function):非常处理
  1. CompletableFuture
  2.     .supplyAsync(() -> "Result")
  3.     .thenAccept(result -> System.out.println("Received: " + result));
复制代码
handle(BiFunction):同时处理正常与非常效果
  1. CompletableFuture
  2.     .supplyAsync(() -> "X")
  3.     .thenRun(() -> System.out.println("Next step executed"));
复制代码
whenComplete(BiConsumer):雷同 finally


  • 在 CompletableFuture 实验完毕后实验一个回调,无论是乐成还是非常。
  • 不会改变原来的效果或非常,仅用于处理副作用(如日志)。
  1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  2.     if (true) throw new RuntimeException("Oops!");
  3.     return "ok";
  4. }).exceptionally(ex -> "Fallback: " + ex.getMessage());
  5. System.out.println(future.get());
复制代码
  1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  2.     throw new RuntimeException("Error!");
  3. }).handle((result, ex) -> {
  4.     if (ex != null) return "Handled: " + ex.getMessage();
  5.     return result;
  6. });
  7. System.out.println(future.get());
复制代码
并发组合

allOf / anyOf:组合任务
  1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Final Result")
  2.     .whenComplete((result, ex) -> {
  3.         System.out.println("Completed with: " + result);
  4.     });
复制代码
allOf(...):等待全部任务完成

需要单独从每个任务中再 .get() 拿到效果
  1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  2.     if (true) throw new RuntimeException("出错了");
  3.     return "成功";
  4. }).whenComplete((result, exception) -> {
  5.     if (exception != null) {
  6.         System.out.println("发生异常:" + exception.getMessage());
  7.     } else {
  8.         System.out.println("执行结果:" + result);
  9.     }
  10. });
复制代码
  1. CompletableFuture<Void> all = CompletableFuture.allOf(task1, task2);
  2. CompletableFuture<Object> any = CompletableFuture.anyOf(task1, task2);
复制代码
anyOf(...):任一完成即触发
  1. CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
  2. CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
  3. CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);
  4. all.thenRun(() -> System.out.println("All done")).get();
复制代码
超时控制

orTimeout(long timeout, TimeUnit unit):超时非常

如果在指定时间内没有完成,就抛出 TimeoutException 非常。
  1. CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> fetchUser());
  2. CompletableFuture<String> orderFuture = CompletableFuture.supplyAsync(() -> fetchOrder());
  3. // 两个任务都完成后执行
  4. CompletableFuture<Void> bothDone = CompletableFuture.allOf(userFuture, orderFuture);
  5. bothDone.thenRun(() -> {
  6.     try {
  7.         String user = userFuture.get();
  8.         String order = orderFuture.get();
  9.         System.out.println("用户: " + user + ", 订单: " + order);
  10.     } catch (Exception e) {
  11.         e.printStackTrace();
  12.     }
  13. });
复制代码
  1. CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
  2.     try { Thread.sleep(1000); } catch (InterruptedException e) {}
  3.     return "fast";
  4. });
  5. CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "slow");
  6. CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2);
  7. System.out.println(any.get()); // 输出最快那个
复制代码
completeOnTimeout(T value, long timeout, TimeUnit unit):超时默认值

如果在指定时间内没有完成,则返回一个默认值,并完成该任务。
  1. CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
  2.     try { Thread.sleep(2000); } catch (Exception e) {}
  3.     return "late result";
  4. }).orTimeout(1, TimeUnit.SECONDS);
  5. try {
  6.     System.out.println(f.get());
  7. } catch (Exception e) {
  8.     System.out.println("Timeout: " + e.getMessage());
  9. }
复制代码
  1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  2.     try {
  3.         Thread.sleep(3000);
  4.     } catch (InterruptedException e) {
  5.         e.printStackTrace();
  6.     }
  7.     return "执行完成";
  8. }).orTimeout(2, TimeUnit.SECONDS)
  9.   .exceptionally(ex -> "捕获到异常:" + ex.getClass().getSimpleName());
  10. System.out.println("结果:" + future.join()); // 打印“捕获到异常:TimeoutException”
复制代码
自定义线程池
  1. CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
  2.     try { Thread.sleep(2000); } catch (Exception e) {}
  3.     return "slow";
  4. }).completeOnTimeout("timeout default", 1, TimeUnit.SECONDS);
  5. System.out.println(f.get()); // timeout default
复制代码
异步任务 + 消费效果
  1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  2.     try {
  3.         Thread.sleep(3000); // 模拟耗时任务
  4.     } catch (InterruptedException e) {
  5.         e.printStackTrace();
  6.     }
  7.     return "正常返回结果";
  8. }).completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS);
  9. System.out.println("最终结果:" + future.join()); // 会打印“超时默认值”
复制代码
异步任务 + 转换效果(链式调用)
  1. ExecutorService pool = Executors.newFixedThreadPool(2);
  2. CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "pooled", pool);
  3. System.out.println(f.get());
  4. pool.shutdown();
复制代码
非常处理
  1. CompletableFuture<Void> future = CompletableFuture
  2.     .supplyAsync(() -> "hello")
  3.     .thenAccept(result -> System.out.println("结果是:" + result));
复制代码
多任务并发组合(allOf / anyOf)
  1. CompletableFuture<String> future = CompletableFuture
  2.     .supplyAsync(() -> "5")
  3.     .thenApply(Integer::parseInt)
  4.     .thenApply(num -> num * 2)
  5.     .thenApply(Object::toString);
复制代码
归并两个任务效果
  1. CompletableFuture<String> future = CompletableFuture
  2.     .supplyAsync(() -> {
  3.         if (true) throw new RuntimeException("出错了!");
  4.         return "success";
  5.     })
  6.     .exceptionally(ex -> {
  7.         System.out.println("异常: " + ex.getMessage());
  8.         return "默认值";
  9.     });
复制代码
自定义线程池
  1. CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
  2. CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
  3. // 等待全部完成
  4. CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);
  5. all.join();
  6. System.out.println("结果:" + f1.join() + ", " + f2.join());
复制代码
链式异步处理
  1. CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> 100);
  2. CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> 200);
  3. CompletableFuture<Integer> result = f1.thenCombine(f2, Integer::sum);
  4. System.out.println(result.get()); // 输出 300
复制代码
订单处理示例
  1. ExecutorService pool = Executors.newFixedThreadPool(4);
  2. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  3.     return "线程池中的任务";
  4. }, pool);
  5. System.out.println(future.get());
  6. pool.shutdown();
复制代码
总结图谱
  1. CompletableFuture.supplyAsync(() -> "Step 1")
  2.     .thenApply(s -> s + " -> Step 2")
  3.     .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " -> Step 3"))
  4.     .thenAccept(System.out::println)
  5.     .exceptionally(ex -> {
  6.         ex.printStackTrace();
  7.         return null;
  8.     });
复制代码
什么场景适合用 Java 异步(@Async / CompletableFuture)?

场景是否适合异步?调用多个长途服务并行✅ 很适合复杂 CPU 运算耗时任务✅ 可以放到异步线程池简单业务逻辑、数据库使用❌ 不建议,同步更可控非主流程的日志、打点使用✅ 合适异步处理Java 和 .NET 异步处理对比

并行调用两个服务,提高响应速率
Spring Boot 示例(@Async + CompletableFuture)

项目结构
  1. public class OrderSystem {
  2.     @Async("dbExecutor")
  3.     public CompletableFuture<Order> saveOrder(Order order) {
  4.         // 数据库写入操作
  5.         return CompletableFuture.completedFuture(order);
  6.     }
  7.     @Async("httpExecutor")
  8.     public CompletableFuture<String> notifyLogistics(Order order) {
  9.         // 调用物流API
  10.         return CompletableFuture.completedFuture("SUCCESS");
  11.     }
  12.     public void processOrder(Order order) {
  13.         CompletableFuture<Order> saveFuture = saveOrder(order);
  14.         saveFuture.thenCompose(savedOrder ->
  15.             notifyLogistics(savedOrder)
  16.         ).exceptionally(ex -> {
  17.             log.error("物流通知失败", ex);
  18.             return "FALLBACK";
  19.         });
  20.     }
  21. }
复制代码
RemoteService.java
  1. CompletableFuture
  2. ├─ 创建任务
  3. │  ├─ runAsync() -> 无返回值
  4. │  └─ supplyAsync() -> 有返回值
  5. ├─ 处理结果
  6. │  ├─ thenApply() -> 转换
  7. │  ├─ thenAccept() -> 消费
  8. │  ├─ thenRun() -> 执行新任务
  9. │  ├─ thenCombine() -> 合并结果
  10. │  └─ thenCompose() -> 链式调用
  11. ├─ 异常处理
  12. │  ├─ exceptionally()
  13. │  ├─ handle()
  14. │  └─ whenComplete()
  15. ├─ 组合任务
  16. │  ├─ allOf()
  17. │  └─ anyOf()
  18. └─ 超时控制
  19.    ├─ orTimeout()
  20.    └─ completeOnTimeout()
复制代码
RemoteServiceImpl.java
  1. └── src
  2.     └── main
  3.         ├── java
  4.         │   ├── demo
  5.         │   │   ├── controller
  6.         │   │   │   └── AggregateController.java
  7.         │   │   ├── service
  8.         │   │   │   ├── RemoteService.java
  9.         │   │   │   └── RemoteServiceImpl.java
  10.         │   │   └── DemoApplication.java
复制代码
AggregateController.java
  1. public interface RemoteService {
  2.     @Async
  3.     CompletableFuture<String> getUserInfo();
  4.     @Async
  5.     CompletableFuture<String> getAccountInfo();
  6. }
复制代码
DemoApplication.java
  1. @Service
  2. public class RemoteServiceImpl implements RemoteService {
  3.     @Override
  4.     public CompletableFuture<String> getUserInfo() {
  5.         try {
  6.             Thread.sleep(2000); // 模拟耗时
  7.         } catch (InterruptedException e) {
  8.             throw new RuntimeException(e);
  9.         }
  10.         return CompletableFuture.completedFuture("UserInfo");
  11.     }
  12.     @Override
  13.     public CompletableFuture<String> getAccountInfo() {
  14.         try {
  15.             Thread.sleep(3000); // 模拟耗时
  16.         } catch (InterruptedException e) {
  17.             throw new RuntimeException(e);
  18.         }
  19.         return CompletableFuture.completedFuture("AccountInfo");
  20.     }
  21. }
复制代码
.NET 示例(async/await)

项目结构
  1. @RestController
  2. @RequestMapping("/api")
  3. public class AggregateController {
  4.     @Autowired
  5.     private RemoteService remoteService;
  6.     @GetMapping("/aggregate")
  7.     public ResponseEntity<String> aggregate() throws Exception {
  8.         CompletableFuture<String> userFuture = remoteService.getUserInfo();
  9.         CompletableFuture<String> accountFuture = remoteService.getAccountInfo();
  10.         // 等待所有完成
  11.         CompletableFuture.allOf(userFuture, accountFuture).join();
  12.         // 获取结果
  13.         String result = userFuture.get() + " + " + accountFuture.get();
  14.         return ResponseEntity.ok(result);
  15.     }
  16. }
复制代码
IRemoteService.cs
  1. @SpringBootApplication
  2. @EnableAsync
  3. public class DemoApplication {
  4.     public static void main(String[] args) {
  5.         SpringApplication.run(DemoApplication.class, args);
  6.     }
  7. }
复制代码
RemoteService.cs
  1. └── Controllers
  2.     └── AggregateController.cs
  3. └── Services
  4.     └── IRemoteService.cs
  5.     └── RemoteService.cs
复制代码
AggregateController.cs
  1. public interface IRemoteService {
  2.     Task<string> GetUserInfoAsync();
  3.     Task<string> GetAccountInfoAsync();
  4. }
复制代码
Java vs .NET 异步用法对比总结

方面Java(Spring Boot).NET Core(ASP.NET)异步声明方式@Async + CompletableFutureasync/await返回值类型CompletableFutureTask等待多个任务CompletableFuture.allOf()Task.WhenAll()是否阻塞.get() 会阻塞,链式不阻塞await 非阻塞简便性稍复杂(需要注解和线程池配置)极简、天然异步支持
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

来自云龙湖轮廓分明的月亮

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表