Java的CompletableFuture,Java的多线程开发

打印 上一主题 下一主题

主题 948|帖子 948|积分 2844

三、Java8的CompletableFuture,Java的多线程开发

1、CompletableFuture的常用方法


  • 以后用到再加
  1. runAsync() :开启异步(创建线程执行任务),无返回值
  2. supplyAsync() :开启异步(创建线程执行任务),有返回值
  3. thenApply() :然后应用,适用于有返回值的结果,拿着返回值再去处理。
  4. exceptionally():用于处理异步任务执行过程中出现异常的情况的一个方法:返回默认值或者一个替代的 CompletableFuture 对象,从而避免系统的崩溃或异常处理的问题。
  5. handle():类似exceptionally()
  6. get()  :阻塞线程:主要可以: ①获取线程中的异常然后处理异常、②设置等待时间
  7. join() :阻塞线程:推荐使用  join()  方法,因为它没有受到 interrupt 的干扰,不需要捕获异常,也不需要强制类型转换。他自己会抛出异常。
  8. CompletableFuture.allOf()
  9. CompletableFuture.anyOf()
复制代码

  • get() 和 join() 方法区别?

    • 都可以阻塞线程 —— 等所有任务都执行完了再执行后续代码。

  1. CompletableFuture 中的  get()  和  join()  方法都用于获取异步任务的执行结果,但是在使用时需要注意以下几点区别:
  2. 1. 抛出异常的方式不同:如果异步任务执行过程中出现异常, get()  方法会抛出 ExecutionException 异常,而  join()  方法会抛出 CompletionException 异常,这两个异常都是继承自 RuntimeException 的。
  3. 2. 方法调用限制不同: join()  方法是不可以被中断的,一旦调用就必须等待任务执行完成才能返回结果;而  get()  方法可以在调用时设置等待的超时时间,如果超时还没有获取到结果,就会抛出 TimeoutException 异常。
  4. 3. 返回结果类型不同: get()  方法返回的是异步任务的执行结果,该结果是泛型类型 T 的,需要强制转换才能获取真正的结果;而  join()  方法返回的是异步任务的执行结果,该结果是泛型类型 T,不需要强制转换。
  5. 4. 推荐使用方式不同:推荐在 CompletableFuture 中使用  join()  方法,因为它没有受到 interrupt 的干扰,不需要捕获异常,也不需要强制类型转换。
  6. 综上所述, get()  方法和  join()  方法都是获取异步任务的执行结果,但是在使用时需要根据具体场景选择使用哪个方法。如果需要获取执行结果并且不希望被中断,推荐使用  join()  方法;如果需要控制等待时间或者需要捕获异常,则可以使用  get()  方法。
复制代码

  • anyOf() 和 allOf()  的区别?
  1. CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,它支持链式调用、组合和转换异步操作等功能。其中,anyOf 和 allOf 都是 CompletableFuture 的两个常用方法,它们的区别如下:
  2. 1. anyOf:任意一个 CompletableFuture 完成,它就会跟随这个 CompletableFuture 的结果完成,返回第一个完成的 CompletableFuture 的结果。
  3. 2. allOf:所有的 CompletableFuture 都完成时,它才会跟随它们的结果完成,返回一个空的 CompletableFuture。
  4. 简而言之,anyOf 和 allOf 的最大区别是:anyOf 任意一个 CompletableFuture 完成就跟着它的结果完成,而 allOf 所有的 CompletableFuture 完成才可以完成,并返回一个空的 CompletableFuture。
  5. 举例来说,如果有三个 CompletableFuture:f1、f2、f3,其中 f1 和 f2 可能会返回一个字符串,而 f3 可能会返回一个整数,那么:
  6. - anyOf(f1, f2, f3) 的结果是 f1、f2、f3 中任意一个 CompletableFuture 的结果;
  7. - allOf(f1, f2, f3) 的结果是一个空的 CompletableFuture,它的完成状态表示 f1、f2、f3 是否全部完成。
  8. 总之,anyOf 和 allOf 在实际使用中可以根据不同的需求来选择,它们都是 CompletableFuture 中非常强大的组合操作。
复制代码
2、使用CompletableFuture

2.1、实体类准备
  1. package com.cc.md.entity;
  2. import lombok.Data;
  3. /**
  4. * @author CC
  5. * @since 2023/5/24 0024
  6. */
  7. @Data
  8. public class UserCs {
  9.     private String name;
  10.     private Integer age;
  11. }
复制代码
2.2、常用方式


  • 无返回值推荐:开启多线程——无返回值的——阻塞:test06
  1.     @Resource(name = "myIoThreadPool")
  2.     private ThreadPoolTaskExecutor myIoThreadPool;
  3.    
  4.     //CompletableFuture开启多线程——无返回值的
  5.     @Test
  6.     public void test06() throws Exception {
  7.         List<CompletableFuture<Void>> futures = new ArrayList<>();
  8.         //循环,模仿很多任务
  9.         for (int i = 0; i < 1000; i++) {
  10.             int finalI = i;
  11.             CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
  12.                 //第一批创建的线程数
  13.                 log.info("打印:{}", finalI);
  14.                 //模仿io流耗时
  15.                 try {
  16.                     Thread.sleep(5000);
  17.                 } catch (InterruptedException e) {
  18.                     throw new RuntimeException(e);
  19.                 }
  20.             }, myIoThreadPool);
  21.             futures.add(future);
  22.         }
  23.         //阻塞:多线程的任务执行。相当于多线程执行完了,再执行后面的代码
  24.         //如果不阻塞,上面的相当于异步执行了。
  25.         //阻塞方式1:可以获取返回的异常、设置等待时间
  26. //        futures.forEach(future -> {
  27. //            try {
  28. //                future.get();
  29. //            } catch (Exception e) {
  30. //                throw new RuntimeException(e);
  31. //            }
  32. //        });
  33.         //阻塞方式2(推荐)
  34.         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
  35.         log.info("打印:都执行完了。。。");
  36.     }
复制代码

  • 有返回值推荐:开启多线程——有返回值的,返回一个新的List——阻塞——使用stream流的map:test09

    • test07、test08 可以转化为 test09 (现在这个)
    • 可以返回任务类型的值,不一定要返回下面的user对象。

  1.     @Resource(name = "myIoThreadPool")
  2.     private ThreadPoolTaskExecutor myIoThreadPool;
  3.    
  4.     //CompletableFuture开启多线程——有返回值的,返回一个新的List——先有数据的情况——使用stream流的map
  5.     //像这种,需要构建另一个数组的,相当于一个线程执行完了,会有返回值
  6.     //使用stream流的map + CompletableFuture.supplyAsync()
  7.     @Test
  8.     public void test09() throws Exception {
  9.         //先获取数据,需要处理的任务。
  10.         List<UserCs> users = this.getUserCs();
  11.         //莫法处理任务
  12.         List<CompletableFuture<UserCs>> futures = users.stream()
  13.                 .map(user -> CompletableFuture.supplyAsync(() -> {
  14.                     // 处理数据
  15.                     user.setName(user.getName() + "-改");
  16.                     log.info("打印-改:{}", user.getName());
  17.                     // 其他的业务逻辑。。。
  18.                     return user;
  19.                 }, myIoThreadPool)).collect(Collectors.toList());
  20.         //获取futures
  21.         List<UserCs> endList = futures.stream()
  22.                 //阻塞所有线程
  23.                 .map(CompletableFuture::join)
  24.                 //取age大于10的用户
  25.                 .filter(user -> user.getAge() > 10)
  26.                 //按照age升序排序
  27.                 .sorted(Comparator.comparing(UserCs::getAge))
  28.                 .collect(Collectors.toList());
  29.         log.info("打印:都执行完了。。。{}", endList);
  30.     }
复制代码
2.3、异常处理


  • exceptionally
  • handle
  1.         //CompletableFuture 异常处理
  2.     @Test
  3.     public void test10() throws Exception {
  4.         //先获取数据,需要处理的任务。
  5.         List<UserCs> users = this.getUserCs();
  6.         //莫法处理任务
  7.         List<CompletableFuture<UserCs>> futures = users.stream()
  8.                 .map(user -> CompletableFuture.supplyAsync(() -> {
  9.                         if (user.getAge() > 5){
  10.                             int a = 1/0;
  11.                         }
  12.                         // 处理数据
  13.                         user.setName(user.getName() + "-改");
  14.                         log.info("打印-改:{}", user.getName());
  15.                         // 其他的业务逻辑。。。
  16.                         return user;
  17.                     }, myIoThreadPool)
  18.                     //处理异常方式1:返回默认值或者一个替代的 Future 对象,从而避免系统的崩溃或异常处理的问题。
  19.                     .exceptionally(throwable -> {
  20.                         //可以直接获取user
  21.                         System.out.println("异常了:" + user);
  22.                         //处理异常的方法……
  23.                         //1还可以进行业务处理……比如将异常数据存起来,然后导出……
  24.                         //2返回默认值,如:user、null
  25.                         //return user;
  26.                         //3抛出异常
  27.                         throw new RuntimeException(throwable.getMessage());
  28.                     })
  29.                     //处理异常方式2:类似exceptionally(不推荐)
  30. //                    .handle((userCs, throwable) -> {
  31. //                        System.out.println("handle:" + user);
  32. //                        if (throwable != null) {
  33. //                            // 处理异常
  34. //                            log.error("处理用户信息出现异常,用户名为:" + user.getName(), throwable);
  35. //                            // 返回原始数据
  36. //                            return userCs;
  37. //                        } else {
  38. //                            // 返回正常数据
  39. //                            return userCs;
  40. //                        }
  41. //                    })
  42.                 )
  43.                 .collect(Collectors.toList());
  44.         //获取futures
  45.         List<UserCs> endList = futures.stream()
  46.                 //阻塞所有线程
  47.                 .map(CompletableFuture::join)
  48.                 //取age大于10的用户
  49.                 .filter(user -> user.getAge() > 10)
  50.                 //按照age升序排序
  51.                 .sorted(Comparator.comparing(UserCs::getAge))
  52.                 .collect(Collectors.toList());
  53.         log.info("打印:都执行完了。。。{}", endList);
  54.     }
复制代码
2.4、CompletableFuture的使用测试

1、推荐使用:test03、test05、test09、test10、test11
2、test07、test08就是test09的前身。

  • test01:获取当前电脑(服务器)的cpu核数
  • test02:线程池原始的使用(不推荐直接这样用)
  • test03:开启异步1 —— @Async
  • test04:开启异步2 —— CompletableFuture.runAsync()
  • test05:开启异步2的改造 —— CompletableFuture.runAsync() 和 supplyAsync()  —— 阻塞所有异步方法,一起提交

      1. 相当于开了3个线程去执行三个不同的方法,然后执行完后一起提交。
      复制代码

  • test052:开启异步2的改造 —— 第一个任务执行完了,获取到返回值,给后面的执行,可以连写,也可以单写。 —— 阻塞线程:get、join
  • test06:CompletableFuture开启多线程——无返回值的
  • test07:CompletableFuture开启多线程——无返回值的——构建一个新List

      1. 1、相当于多线程执行任务,然后把结果插入到List中
      2. 2、接收多线程的List必须是线程安全的,ArrayList线程不安全
      3.    线程安全的List —— CopyOnWriteArrayList 替代 ArrayList
      复制代码

  • test08:CompletableFuture开启多线程——无返回值的——构建一个新List——先有数据的情况(基本和test07是一个方法)
  • test09:CompletableFuture开启多线程——有返回值的,返回一个新的List——先有数据的情况——使用stream流的map
  • test10:CompletableFuture 异常处理。相当于是 test09的增强,处理异常
  • test11:CompletableFuture 异常处理:如果出现异常就舍弃任务。

      1. 1、想了一下,出现异常后的任务确实没有执行下去了,任务不往下执行,怎么会发现异常呢?
      2. 2、发现了异常任务也就完了。而且打印了异常,相当于返回了异常。
      3. 3、未发生异常的任务会执行完成。如果发生异常都返回空,最后舍弃空的,就得到任务执行成功的 CompletableFuture
      复制代码

↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓所有方式↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
  1. package com.cc.md;
  2. import com.cc.md.entity.UserCs;
  3. import com.cc.md.service.IAsyncService;
  4. import org.junit.jupiter.api.Test;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.boot.test.context.SpringBootTest;
  8. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  9. import javax.annotation.Resource;
  10. import java.util.*;
  11. import java.util.concurrent.CompletableFuture;
  12. import java.util.concurrent.CopyOnWriteArrayList;
  13. import java.util.concurrent.ForkJoinPool;
  14. import java.util.concurrent.TimeUnit;
  15. import java.util.stream.Collectors;
  16. @SpringBootTest
  17. class Test01 {
  18.     private static final Logger log = LoggerFactory.getLogger(Test01.class);
  19.     @Resource(name = "myIoThreadPool")
  20.     private ThreadPoolTaskExecutor myIoThreadPool;
  21.     /**
  22.      * 异步类
  23.      */
  24.     @Resource
  25.     private IAsyncService asyncService;
  26.     @Test
  27.     void test01() {
  28.         //获取当前jdk能调用的CPU个数(当前服务器的处理器个数)
  29.         int i = Runtime.getRuntime().availableProcessors();
  30.         System.out.println(i);
  31.     }
  32.     //线程池原始的使用
  33.     @Test
  34.     void test02() {
  35.         try {
  36.             for (int i = 0; i < 1000; i++) {
  37.                 int finalI = i;
  38.                 myIoThreadPool.submit(() -> {
  39.                     //第一批创建的线程数
  40.                     log.info("打印:{}", finalI);
  41.                     //模仿io流耗时
  42.                     try {
  43.                         Thread.sleep(5000);
  44.                     } catch (InterruptedException e) {
  45.                         throw new RuntimeException(e);
  46.                     }
  47.                 });
  48.             }
  49.         }catch(Exception e){
  50.             throw new RuntimeException(e);
  51.         }finally {
  52.             myIoThreadPool.shutdown();
  53.         }
  54.     }
  55.     //开启异步1 —— @Async
  56.     @Test
  57.     public void test03() throws Exception {
  58.         log.info("打印:{}", "异步测试的-主方法1");
  59.         asyncService.async1();
  60.         asyncService.async2();
  61.         //不会等待异步方法执行,直接返回前端数据
  62.         log.info("打印:{}", "异步测试的-主方法2");
  63.     }
  64.     //开启异步2 —— CompletableFuture.runAsync()
  65.     @Test
  66.     public void test04() throws Exception {
  67.         log.info("打印:{}", "异步测试的-主方法1");
  68.         CompletableFuture.runAsync(() -> {
  69.             log.info("打印:{}", "异步方法1!");
  70.             //异步执行的代码,也可以是方法,该方法不用单独写到其他类中。
  71.             this.async2("异步方法1!-end");
  72.         }, myIoThreadPool);
  73.         //不会等待异步方法执行,直接返回前端数据
  74.         log.info("打印:{}", "异步测试的-主方法2");
  75.     }
  76.     //异步需要执行的方法,可以写在同一个类中。
  77.     private void async2(String msg) {
  78.         //模仿io流耗时
  79.         try {
  80.             Thread.sleep(5000);
  81.         } catch (InterruptedException e) {
  82.             throw new RuntimeException(e);
  83.         }
  84.         log.info("打印:{}", msg);
  85.     }
  86.     //开启异步2的改造 —— CompletableFuture.runAsync() 和 supplyAsync()  —— 阻塞所有异步方法,一起提交
  87.     //相当于开了3个线程去执行三个不同的方法,然后执行完后一起提交。
  88.     @Test
  89.     public void test05() throws Exception {
  90.         log.info("打印:{}", "异步测试的-主方法1");
  91.         //异步执行1
  92.         CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
  93.             log.info("打印:{}", "异步方法1!");
  94.             //异步执行的代码,也可以是方法,该方法不用单独写到其他类中。
  95.             this.async2("异步方法1-end");
  96.             return "异步方法1";
  97.         }, myIoThreadPool);
  98.         //异步执行2
  99.         CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
  100.             log.info("打印:{}", "异步方法2!");
  101.             //异步执行的代码,也可以是方法,该方法不用单独写到其他类中。
  102.             this.async2("异步方法2-end");
  103.             return "异步方法2";
  104.         }, myIoThreadPool);
  105.         //异步执行3,不用我们自己的线程池 —— 用的就是系统自带的 ForkJoinPool 线程池
  106.         CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
  107.             log.info("打印:{}", "异步方法3!");
  108.             //异步执行的代码,也可以是方法,该方法不用单独写到其他类中。
  109.             this.async2("异步方法3-end");
  110.         });
  111.         //阻塞所有异步方法,一起提交后才走下面的代码
  112.         CompletableFuture.allOf(future1, future2, future3).join();
  113.         log.info("打印:{}", "异步-阻塞-测试的-主方法2-end");
  114.     }
  115.     //开启异步2的改造 —— 第一个任务执行完了,获取到返回值,给后面的执行,可以连写,也可以单写。 —— 阻塞线程:get、join
  116.     // CompletableFuture 的 get 和 join 方法区别:
  117.     // get:①可以获取线程中的异常、②设置等待时间
  118.     // join:推荐在 CompletableFuture 中使用  join()  方法,因为它没有受到 interrupt 的干扰,不需要捕获异常,也不需要强制类型转换。
  119.     @Test
  120.     public void test052() throws Exception {
  121.         log.info("打印:{}", "异步测试的-主方法1");
  122.         //异步执行1
  123.         CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
  124.             log.info("打印:{}", "异步方法1!");
  125.             // 异步执行的代码,也可以是方法,该方法不用单独写到其他类中。
  126.             String str = "异步方法1-end";
  127.             this.async2(str);
  128.             return str;
  129.         }, myIoThreadPool);
  130.         // 异步执行2 - 无返回值 —— 分开写的方式
  131.         CompletableFuture<Void> future2 = future1.thenAccept(str1 -> {
  132.             log.info("打印:{}", "异步方法2!");
  133.             // 异步执行的代码,也可以是方法,该方法不用单独写到其他类中。
  134.             this.async2(String.format("%s-加-异步方法2! - 无返回值 - ",str1));
  135.         });
  136.         // 异步执行3 - 有返回值 —— 分开写future1,连写future3方式
  137.         CompletableFuture<String> future3 = future1.thenApply(str2 -> {
  138.             log.info("打印:{}", "异步方法3!");
  139.             // 异步执行的代码,也可以是方法,该方法不用单独写到其他类中。
  140.             this.async2(String.format("%s-加-异步方法3! - 有返回值 - ", str2));
  141.             return "异步执行3 - 有返回值 ";
  142.             //连写的方式。
  143.         }).thenApply(str3 -> {
  144.             String format = String.format("%s- end", str3);
  145.             log.error("异步3然后应用 - {}", format);
  146.             //返回后面的应用
  147.             return format;
  148.         });
  149.         // 获取future3的返回值:
  150.         //如果需要捕获异常、设置等待超时时间,则用get
  151.         log.info("future3的返回值(不阻塞):{}", future3.get());
  152. //        log.info("future3的返回值(不阻塞-设置等待时间,超时报错:TimeoutException):{}",
  153. //                future3.get(2, TimeUnit.SECONDS));
  154.         //推荐使用 join方法
  155. //        log.info("future3的返回值(阻塞):{}", future3.join());
  156.         //阻塞所有异步方法,一起提交后才走下面的代码
  157.         CompletableFuture.allOf(future1, future2).join();
  158.         log.info("打印:{}", "异步-阻塞-测试的-主方法2-end");
  159.     }
  160.     //CompletableFuture开启多线程——无返回值的
  161.     @Test
  162.     public void test06() throws Exception {
  163.         List<CompletableFuture<Void>> futures = new ArrayList<>();
  164.         //循环,模仿很多任务
  165.         for (int i = 0; i < 1000; i++) {
  166.             int finalI = i;
  167.             CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
  168.                 //第一批创建的线程数
  169.                 log.info("打印:{}", finalI);
  170.                 //模仿io流耗时
  171.                 try {
  172.                     Thread.sleep(5000);
  173.                 } catch (InterruptedException e) {
  174.                     throw new RuntimeException(e);
  175.                 }
  176.             }, myIoThreadPool);
  177.             futures.add(future);
  178.         }
  179.         //阻塞:多线程的任务执行。相当于多线程执行完了,再执行后面的代码
  180.         //如果不阻塞,上面的相当于异步执行了。
  181.         //阻塞方式1:可以获取返回的异常、设置等待时间
  182. //        futures.forEach(future -> {
  183. //            try {
  184. //                future.get();
  185. //            } catch (Exception e) {
  186. //                throw new RuntimeException(e);
  187. //            }
  188. //        });
  189.         //阻塞方式2(推荐)
  190.         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
  191.         log.info("打印:都执行完了。。。");
  192.     }
  193.     //CompletableFuture开启多线程——无返回值的——构建一个新List
  194.     //相当于多线程执行任务,然后把结果插入到List中
  195.     //接收多线程的List必须是线程安全的,ArrayList线程不安全
  196.     //线程安全的List —— CopyOnWriteArrayList 替代 ArrayList
  197.     @Test
  198.     public void test07() throws Exception {
  199.         List<CompletableFuture<Void>> futures = new ArrayList<>();
  200.         //存数据的List
  201.         List<UserCs> addList = new CopyOnWriteArrayList<>();
  202.         //循环,模仿很多任务
  203.         for (int i = 0; i < 1000; i++) {
  204.             int finalI = i;
  205.             CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
  206.                 log.info("打印:{}", finalI);
  207.                 UserCs userCs = new UserCs();
  208.                 userCs.setName(String.format("姓名-%s", finalI));
  209.                 userCs.setAge(finalI);
  210.                 addList.add(userCs);
  211.             }, myIoThreadPool);
  212.             futures.add(future);
  213.         }
  214.         //阻塞
  215.         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
  216.         //返回新的List:endList,取age大于10的用户
  217.         List<UserCs> endList = addList.stream()
  218.                 .filter(user -> user.getAge() > 10)
  219.                 //按照age升序排序
  220.                 .sorted(Comparator.comparing(UserCs::getAge))
  221.                 .collect(Collectors.toList());
  222.         log.info("打印:都执行完了。。。{}", endList);
  223.     }
  224.     //CompletableFuture开启多线程——无返回值的——构建一个新List——先有数据的情况
  225.     //用CopyOnWriteArrayList 替代 ArrayList接收
  226.     @Test
  227.     public void test08() throws Exception {
  228.         //先获取数据,需要处理的任务。
  229.         List<UserCs> users = this.getUserCs();
  230.         //开启多线程
  231.         List<CompletableFuture<Void>> futures = new ArrayList<>();
  232.         //存数据的List
  233.         List<UserCs> addList = new CopyOnWriteArrayList<>();
  234.         //莫法处理任务
  235.         users.forEach(user -> {
  236.             CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
  237.                 //添加数据
  238.                 user.setName(user.getName() + "-改");
  239.                 addList.add(user);
  240.                 log.info("打印-改:{}", user.getName());
  241.                 //其他的业务逻辑。。。
  242.             }, myIoThreadPool);
  243.             futures.add(future);
  244.         });
  245.         //阻塞
  246.         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
  247.         //返回新的List:endList
  248.         List<UserCs> endList = addList.stream()
  249.                 .filter(user -> user.getAge() > 10)
  250.                 //按照age升序排序
  251.                 .sorted(Comparator.comparing(UserCs::getAge))
  252.                 .collect(Collectors.toList());
  253.         log.info("打印:都执行完了。。。{}", endList);
  254.     }
  255.     //CompletableFuture开启多线程——有返回值的,返回一个新的List——先有数据的情况——使用stream流的map
  256.     //像这种,需要构建另一个数组的,相当于一个线程执行完了,会有返回值
  257.     //使用stream流的map + CompletableFuture.supplyAsync()
  258.     @Test
  259.     public void test09() throws Exception {
  260.         //先获取数据,需要处理的任务。
  261.         List<UserCs> users = this.getUserCs();
  262.         //莫法处理任务
  263.         List<CompletableFuture<UserCs>> futures = users.stream()
  264.                 .map(user -> CompletableFuture.supplyAsync(() -> {
  265.                     // 处理数据
  266.                     user.setName(user.getName() + "-改");
  267.                     log.info("打印-改:{}", user.getName());
  268.                     // 其他的业务逻辑。。。
  269.                     return user;
  270.                 }, myIoThreadPool)).collect(Collectors.toList());
  271.         //获取futures
  272.         List<UserCs> endList = futures.stream()
  273.                 //阻塞所有线程
  274.                 .map(CompletableFuture::join)
  275.                 //取age大于10的用户
  276.                 .filter(user -> user.getAge() > 10)
  277.                 //按照age升序排序
  278.                 .sorted(Comparator.comparing(UserCs::getAge))
  279.                 .collect(Collectors.toList());
  280.         log.info("打印:都执行完了。。。{}", endList);
  281.     }
  282.     //基础数据
  283.     private List<UserCs> getUserCs() {
  284.         List<UserCs> users = new ArrayList<>();
  285.         for (int i = 0; i < 1000; i++) {
  286.             UserCs userCs = new UserCs();
  287.             userCs.setName(String.format("姓名-%s", i));
  288.             userCs.setAge(i);
  289.             users.add(userCs);
  290.         }
  291.         return users;
  292.     }
  293.     //CompletableFuture 异常处理
  294.     @Test
  295.     public void test10() throws Exception {
  296.         //先获取数据,需要处理的任务。
  297.         List<UserCs> users = this.getUserCs();
  298.         //莫法处理任务
  299.         List<CompletableFuture<UserCs>> futures = users.stream()
  300.                 .map(user -> CompletableFuture.supplyAsync(() -> {
  301.                         if (user.getAge() > 5){
  302.                             int a = 1/0;
  303.                         }
  304.                         // 处理数据
  305.                         user.setName(user.getName() + "-改");
  306.                         log.info("打印-改:{}", user.getName());
  307.                         // 其他的业务逻辑。。。
  308.                         return user;
  309.                     }, myIoThreadPool)
  310.                     //处理异常方式1:返回默认值或者一个替代的 Future 对象,从而避免系统的崩溃或异常处理的问题。
  311.                     .exceptionally(throwable -> {
  312.                         //可以直接获取user
  313.                         System.out.println("异常了:" + user);
  314.                         //处理异常的方法……
  315.                         //1还可以进行业务处理……比如将异常数据存起来,然后导出……
  316.                         //2返回默认值,如:user、null
  317.                         //return user;
  318.                         //3抛出异常
  319.                         throw new RuntimeException(throwable.getMessage());
  320.                     })
  321.                     //处理异常方式2:类似exceptionally(不推荐)
  322. //                    .handle((userCs, throwable) -> {
  323. //                        System.out.println("handle:" + user);
  324. //                        if (throwable != null) {
  325. //                            // 处理异常
  326. //                            log.error("处理用户信息出现异常,用户名为:" + user.getName(), throwable);
  327. //                            // 返回原始数据
  328. //                            return userCs;
  329. //                        } else {
  330. //                            // 返回正常数据
  331. //                            return userCs;
  332. //                        }
  333. //                    })
  334.                 )
  335.                 .collect(Collectors.toList());
  336.         //获取futures
  337.         List<UserCs> endList = futures.stream()
  338.                 //阻塞所有线程
  339.                 .map(CompletableFuture::join)
  340.                 //取age大于10的用户
  341.                 .filter(user -> user.getAge() > 10)
  342.                 //按照age升序排序
  343.                 .sorted(Comparator.comparing(UserCs::getAge))
  344.                 .collect(Collectors.toList());
  345.         log.info("打印:都执行完了。。。{}", endList);
  346.     }
  347.     //CompletableFuture 异常处理:如果出现异常就舍弃任务。
  348.     // 想了一下,出现异常后的任务确实没有执行下去了,任务不往下执行,怎么会发现异常呢?
  349.     // 发现了异常任务也就完了。而且打印了异常,相当于返回了异常。
  350.     // 未发生异常的任务会执行完成。如果发生异常都返回空,最后舍弃空的,就得到任务执行成功的 CompletableFuture
  351.     @Test
  352.     public void test11() {
  353.         List<UserCs> users = getUserCs();
  354.         List<CompletableFuture<UserCs>> futures = users.stream()
  355.                 .map(user -> CompletableFuture.supplyAsync(() -> {
  356.                             if (user.getAge() > 15) {
  357.                                 int a = 1 / 0;
  358.                             }
  359.                             user.setName(user.getName() + "-改");
  360.                             log.info("打印-改:{}", user.getName());
  361.                             return user;
  362.                         }, myIoThreadPool)
  363.                         //处理异常
  364.                         .exceptionally(throwable -> {
  365.                             //其他处理异常的逻辑
  366.                             return null;
  367.                         })
  368.                 )
  369.                 //舍弃返回的对象是null的 CompletableFuture
  370.                 .filter(e -> Objects.nonNull(e.join())).collect(Collectors.toList());
  371.         //获取futures
  372.         List<UserCs> endList = futures.stream()
  373.                 //阻塞所有线程
  374.                 .map(CompletableFuture::join)
  375.                 //取age大于10的用户
  376.                 .filter(user -> user.getAge() > 10)
  377.                 //按照age升序排序
  378.                 .sorted(Comparator.comparing(UserCs::getAge))
  379.                 .collect(Collectors.toList());
  380.         log.info("打印:都执行完了。。。{}", endList);
  381.     }
  382. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

卖不甜枣

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表