Java多种方法实现等待所有子线程完成再继续执行

打印 上一主题 下一主题

主题 1023|帖子 1023|积分 3069

简介

在现实世界中,我们常常需要等待其它任务完成,才能继续执行下一步。Java实现等待子线程完成再继续执行的方式很多。我们来一一查看一下。
Thread的join方法

该方法是Thread提供的方法,调用join()时,会阻塞主线程,等该Thread完成才会继续执行,代码如下:
  1. private static void threadJoin() {
  2.   List<Thread> threads = new ArrayList<>();
  3.   for (int i = 0; i < NUM; i++) {
  4.     Thread t = new Thread(new PkslowTask("Task " + i));
  5.     t.start();
  6.     threads.add(t);
  7.   }
  8.   threads.forEach(t -> {
  9.     try {
  10.       t.join();
  11.     } catch (InterruptedException e) {
  12.       throw new RuntimeException(e);
  13.     }
  14.   });
  15.   System.out.println("threadJoin Finished All Tasks...");
  16. }
复制代码
结果:
  1. Task 6 is running
  2. Task 9 is running
  3. Task 3 is running
  4. Task 4 is running
  5. Task 7 is running
  6. Task 0 is running
  7. Task 2 is running
  8. Task 1 is running
  9. Task 5 is running
  10. Task 8 is running
  11. Task 1 is completed
  12. Task 8 is completed
  13. Task 6 is completed
  14. Task 4 is completed
  15. Task 3 is completed
  16. Task 0 is completed
  17. Task 7 is completed
  18. Task 9 is completed
  19. Task 2 is completed
  20. Task 5 is completed
  21. threadJoin Finished All Tasks...
复制代码
CountDownLatch

CountDownLatch是一个很好用的并发工具,初始化时要指定线程数,如10。在子线程调用countDown()时计数减1。直到为0时,await()方法才不会阻塞。代码如下:
  1. private static void countDownLatch() {
  2.   CountDownLatch latch = new CountDownLatch(NUM);
  3.   for (int i = 0; i < NUM; i++) {
  4.     Thread t = new Thread(() -> {
  5.       System.out.println("countDownLatch running...");
  6.       try {
  7.         Thread.sleep(1000);
  8.         System.out.println("countDownLatch Finished...");
  9.         latch.countDown();
  10.       } catch (InterruptedException e) {
  11.         throw new RuntimeException(e);
  12.       }
  13.     });
  14.     t.start();
  15.   }
  16.   try {
  17.     latch.await();
  18.   } catch (InterruptedException e) {
  19.     throw new RuntimeException(e);
  20.   }
  21.   System.out.println("countDownLatch Finished All Tasks...");
  22. }
复制代码
结果:
  1. countDownLatch running...
  2. countDownLatch running...
  3. countDownLatch running...
  4. countDownLatch running...
  5. countDownLatch running...
  6. countDownLatch running...
  7. countDownLatch running...
  8. countDownLatch running...
  9. countDownLatch running...
  10. countDownLatch running...
  11. countDownLatch Finished...
  12. countDownLatch Finished...
  13. countDownLatch Finished...
  14. countDownLatch Finished...
  15. countDownLatch Finished...
  16. countDownLatch Finished...
  17. countDownLatch Finished...
  18. countDownLatch Finished...
  19. countDownLatch Finished...
  20. countDownLatch Finished...
  21. countDownLatch Finished All Tasks...
复制代码
CyclicBarrier

CyclicBarrier与CountDownLatch类似,但CyclicBarrier可重置,可重用。代码如下:
  1. private static void cyclicBarrier() {
  2.   CyclicBarrier barrier = new CyclicBarrier(NUM + 1);
  3.   for (int i = 0; i < NUM; i++) {
  4.     Thread t = new Thread(() -> {
  5.       System.out.println("cyclicBarrier running...");
  6.       try {
  7.         Thread.sleep(1000);
  8.         System.out.println("cyclicBarrier Finished...");
  9.         barrier.await();
  10.       } catch (InterruptedException | BrokenBarrierException e) {
  11.         throw new RuntimeException(e);
  12.       }
  13.     });
  14.     t.start();
  15.   }
  16.   try {
  17.     barrier.await();
  18.   } catch (InterruptedException | BrokenBarrierException e) {
  19.     throw new RuntimeException(e);
  20.   }
  21.   System.out.println("cyclicBarrier Finished All Tasks...");
  22. }
复制代码
结果:
  1. cyclicBarrier running...
  2. cyclicBarrier running...
  3. cyclicBarrier running...
  4. cyclicBarrier running...
  5. cyclicBarrier running...
  6. cyclicBarrier running...
  7. cyclicBarrier running...
  8. cyclicBarrier running...
  9. cyclicBarrier running...
  10. cyclicBarrier running...
  11. cyclicBarrier Finished...
  12. cyclicBarrier Finished...
  13. cyclicBarrier Finished...
  14. cyclicBarrier Finished...
  15. cyclicBarrier Finished...
  16. cyclicBarrier Finished...
  17. cyclicBarrier Finished...
  18. cyclicBarrier Finished...
  19. cyclicBarrier Finished...
  20. cyclicBarrier Finished...
  21. cyclicBarrier Finished All Tasks...
复制代码
executorService.isTerminated()

ExecutorService调用shutdown()方法后,可以通过方法isTerminated()来判断任务是否完成。代码如下:
  1. private static void executeServiceIsTerminated() {
  2.   ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
  3.   IntStream.range(0, NUM)
  4.     .forEach(i -> executorService.execute(new PkslowTask("Task " + i)));
  5.   executorService.shutdown();
  6.   while (!executorService.isTerminated()) {
  7.     //waiting...
  8.   }
  9.   System.out.println("executeServiceIsTerminated Finished All Tasks...");
  10. }
复制代码
结果:
  1. Task 0 is running
  2. Task 2 is running
  3. Task 1 is running
  4. Task 3 is running
  5. Task 4 is running
  6. Task 0 is completed
  7. Task 2 is completed
  8. Task 5 is running
  9. Task 4 is completed
  10. Task 7 is running
  11. Task 3 is completed
  12. Task 1 is completed
  13. Task 8 is running
  14. Task 6 is running
  15. Task 9 is running
  16. Task 5 is completed
  17. Task 9 is completed
  18. Task 7 is completed
  19. Task 6 is completed
  20. Task 8 is completed
  21. executeServiceIsTerminated Finished All Tasks...
复制代码
executorService.awaitTermination

executorService.awaitTermination方法会等待任务完成,并给一个超时时间,代码如下:
  1. private static void executeServiceAwaitTermination() {
  2.   ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
  3.   IntStream.range(0, NUM)
  4.     .forEach(i -> executorService.execute(new PkslowTask("Task " + i)));
  5.   executorService.shutdown();
  6.   try {
  7.     if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
  8.       executorService.shutdownNow();
  9.     }
  10.   } catch (InterruptedException e) {
  11.     throw new RuntimeException(e);
  12.   }
  13.   System.out.println("executeServiceAwaitTermination Finished All Tasks...");
  14. }
复制代码
结果:
  1. Task 0 is running
  2. Task 1 is running
  3. Task 2 is running
  4. Task 3 is running
  5. Task 4 is running
  6. Task 0 is completed
  7. Task 5 is running
  8. Task 1 is completed
  9. Task 4 is completed
  10. Task 7 is running
  11. Task 3 is completed
  12. Task 8 is running
  13. Task 2 is completed
  14. Task 9 is running
  15. Task 6 is running
  16. Task 5 is completed
  17. Task 7 is completed
  18. Task 9 is completed
  19. Task 8 is completed
  20. Task 6 is completed
  21. executeServiceAwaitTermination Finished All Tasks...
复制代码
executorService.invokeAll

使用invokeAll提交所有任务,代码如下:
  1. private static void executeServiceInvokeAll() {
  2.   ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
  3.   List<Callable<Void>> tasks = new ArrayList<>();
  4.   IntStream.range(0, NUM)
  5.     .forEach(i -> tasks.add(new PkslowTask("Task " + i)));
  6.   try {
  7.     executorService.invokeAll(tasks);
  8.   } catch (InterruptedException e) {
  9.     throw new RuntimeException(e);
  10.   }
  11.   executorService.shutdown();
  12.   System.out.println("executeServiceInvokeAll Finished All Tasks...");
  13. }
复制代码
结果:
  1. Task 1 is running
  2. Task 2 is running
  3. Task 0 is running
  4. Task 3 is running
  5. Task 4 is running
  6. Task 1 is completed
  7. Task 3 is completed
  8. Task 0 is completed
  9. Task 2 is completed
  10. Task 4 is completed
  11. Task 8 is running
  12. Task 5 is running
  13. Task 6 is running
  14. Task 9 is running
  15. Task 7 is running
  16. Task 8 is completed
  17. Task 5 is completed
  18. Task 6 is completed
  19. Task 9 is completed
  20. Task 7 is completed
  21. executeServiceInvokeAll Finished All Tasks...
复制代码
ExecutorCompletionService

ExecutorCompletionService通过take()方法,会返回最早完成的任务,代码如下:
  1. private static void executorCompletionService() {
  2.   ExecutorService executorService = Executors.newFixedThreadPool(10);
  3.   CompletionService<String> service = new ExecutorCompletionService<>(executorService);
  4.   List<Callable<String>> callables = new ArrayList<>();
  5.   callables.add(new DelayedCallable(2000, "2000ms"));
  6.   callables.add(new DelayedCallable(1500, "1500ms"));
  7.   callables.add(new DelayedCallable(6000, "6000ms"));
  8.   callables.add(new DelayedCallable(2500, "2500ms"));
  9.   callables.add(new DelayedCallable(300, "300ms"));
  10.   callables.add(new DelayedCallable(3000, "3000ms"));
  11.   callables.add(new DelayedCallable(1100, "1100ms"));
  12.   callables.add(new DelayedCallable(100, "100ms"));
  13.   callables.add(new DelayedCallable(100, "100ms"));
  14.   callables.add(new DelayedCallable(100, "100ms"));
  15.   callables.forEach(service::submit);
  16.   for (int i = 0; i < NUM; i++) {
  17.     try {
  18.       Future<String> future = service.take();
  19.       System.out.println(future.get() + " task is completed");
  20.     } catch (InterruptedException | ExecutionException e) {
  21.       throw new RuntimeException(e);
  22.     }
  23.   }
  24.   System.out.println("executorCompletionService Finished All Tasks...");
  25.   executorService.shutdown();
  26.   awaitTerminationAfterShutdown(executorService);
  27. }
复制代码
这里不同任务的时长是不一样的,但会先返回最早完成的任务:
  1. 2000ms is running
  2. 2500ms is running
  3. 300ms is running
  4. 1500ms is running
  5. 6000ms is running
  6. 3000ms is running
  7. 1100ms is running
  8. 100ms is running
  9. 100ms is running
  10. 100ms is running
  11. 100ms is completed
  12. 100ms is completed
  13. 100ms task is completed
  14. 100ms task is completed
  15. 100ms is completed
  16. 100ms task is completed
  17. 300ms is completed
  18. 300ms task is completed
  19. 1100ms is completed
  20. 1100ms task is completed
  21. 1500ms is completed
  22. 1500ms task is completed
  23. 2000ms is completed
  24. 2000ms task is completed
  25. 2500ms is completed
  26. 2500ms task is completed
  27. 3000ms is completed
  28. 3000ms task is completed
  29. 6000ms is completed
  30. 6000ms task is completed
  31. executorCompletionService Finished All Tasks...
复制代码

代码

代码请看GitHub: https://github.com/LarryDpk/pkslow-samples

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

天空闲话

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表