CompletableFuture异步回调

打印 上一主题 下一主题

主题 794|帖子 794|积分 2382

CompletableFuture异步回调

CompletableFuture简介

CompletableFuture被用于异步编程,异步通常意味着非阻塞,可以使得任务单独允许在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常信息。
CompletableFuture实现了Future,CompletionStage接口,实现了Future接口可以兼容线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture类。
Futrue和CompletableFuture

Future在Java里面,通过用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我么会得到一个Future,在Future里面有isDone方法来判断任务是否处理结束,该有get方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
Futrue缺点

1.不支持手动完成。2.不支持进一步的非阻塞调用。3.不支持链式调用。4.不支持多个Future合并。5.不支持异步处理。
CompletableFuture类的使用案例

CompletableFuture01
  1. package com.shaonian.juc.completable;
  2. import java.util.concurrent.CompletableFuture;
  3. import java.util.concurrent.ExecutionException;
  4. /**
  5. * 演示CompletableFuture
  6. * @author 长名06
  7. * @version 1.0
  8. */
  9. public class CompletableFuture01 {
  10.     public static void main(String[] args) throws ExecutionException, InterruptedException {
  11.         CompletableFuture<String> future = new CompletableFuture<>();
  12.         new Thread(() -> {
  13.             System.out.println("子线程开始干活");
  14.             try {
  15.                 //子线程沉睡3s
  16.                 Thread.sleep(3000);
  17.             } catch (InterruptedException e) {
  18.                 e.printStackTrace();
  19.             }
  20.             //完成future任务
  21.             future.complete("success");
  22.         },"A").start();
  23.         System.out.println("主线程调用get方法获取结果为:" + future.get());
  24.         System.out.println("主线程完成,阻塞结束");
  25.     }
  26. }
复制代码
CompletableFuture02
  1. package com.shaonian.juc.completable;
  2. import java.util.concurrent.CompletableFuture;
  3. import java.util.concurrent.ExecutionException;
  4. /**
  5. * @author 长名06
  6. * @version 1.0
  7. */
  8. public class CompletableFuture02 {
  9.     public static void main(String[] args) throws ExecutionException, InterruptedException {
  10.         //异步调用,无返回值
  11.         CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
  12.             try {
  13.                 Thread.sleep(3000);
  14.             } catch (InterruptedException e) {
  15.                 e.printStackTrace();
  16.             }
  17.             System.out.println(Thread.currentThread().getName() + "执行runSync()");
  18.         });
  19.         completableFuture1.get();
  20.         //异步调用,有返回值
  21.         CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
  22.             System.out.println(Thread.currentThread().getName() + "执行supplyAsync()");
  23. //            int i = 1/0;
  24.             return 1024;
  25.         });
  26.         completableFuture2.whenComplete((t, u) -> {
  27.             System.out.println("----t=" + t);//t参数,是执行的返回值
  28.             System.out.println("----u=" + u);//异常信息
  29.         }).get();
  30. //        System.out.println(Runtime.getRuntime().availableProcessors());
  31.     }
  32. }
复制代码
CompletableFuture03
  1. package com.shaonian.juc.completable;
  2. import java.util.concurrent.CompletableFuture;
  3. import java.util.concurrent.ExecutionException;
  4. /**
  5. * 演示线程依赖,执行api thenApply()
  6. * 一个任务,依赖于另一个任务可以使用thenApply()将两个任务(线程)串行化
  7. * 对一个数先加10 再平方
  8. * @author 长名06
  9. * @version 1.0
  10. */
  11. public class CompletableFuture03 {
  12.     public static Integer num = 10;
  13.     public static void main(String[] args) throws ExecutionException, InterruptedException {
  14.         System.out.println(Thread.currentThread().getName() + "主线程开始");
  15.         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  16.             System.out.println("加10任务开启");
  17.             num += 10;
  18.             return num;
  19.         }).thenApply(i -> num * num);
  20.         Integer integer = future.get();
  21.         System.out.println("主线程结束,子线程的结果为" + integer);
  22.     }
  23. }
复制代码
CompletableFuture04
  1. package com.shaonian.juc.completable;
  2. import java.util.concurrent.CompletableFuture;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.function.Consumer;
  5. import java.util.function.Function;
  6. /**
  7. * 消费处理结果
  8. * thenAccept()方法,接收任务的处理结果,并消费结果,不返回结果了
  9. * @author 长名06
  10. * @version 1.0
  11. */
  12. public class CompletableFuture04 {
  13.     public static Integer num = 10;
  14.     public static void main(String[] args) throws ExecutionException, InterruptedException {
  15.         System.out.println(Thread.currentThread().getName() + "主线程开始");
  16.         CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
  17.             System.out.println("加10任务开启");
  18.             num += 10;
  19.             return num;
  20.         }).thenApply(new Function<Integer, Integer>() {
  21.             @Override
  22.             public Integer apply(Integer integer) {
  23.                 return num * num;
  24.             }
  25.         }).thenAccept(new Consumer<Integer>() {
  26.             @Override
  27.             public void accept(Integer i) {
  28.                 System.out.println("子线程全部处理完成,最后调用了accept方法,消费了结果" + i);
  29.             }
  30.         });
  31.     }
  32. }
复制代码
CompletableFuture05
  1. package com.shaonian.juc.completable;
  2. import java.util.concurrent.CompletableFuture;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.function.Consumer;
  5. import java.util.function.Function;
  6. /**
  7. * 异常处理
  8. * @author 长名06
  9. * @version 1.0
  10. */
  11. public class CompletableFuture05 {
  12.     public static Integer num = 10;
  13.     public static void main(String[] args) throws ExecutionException, InterruptedException {
  14.         System.out.println(Thread.currentThread().getName() + "主线程开始");
  15.         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  16.             int i = 1/0;//模拟异常
  17.             System.out.println("加10任务开启");
  18.             num += 10;
  19.             return num;
  20.         }).exceptionally(new Function<Throwable, Integer>() {
  21.             @Override
  22.             public Integer apply(Throwable ex) {
  23.                 System.out.println(ex.getMessage());
  24.                 return -1;
  25.             }
  26.         });
  27.     }
  28. }
复制代码
CompletableFuture06
  1. package com.shaonian.juc.completable;
  2. import java.util.concurrent.CompletableFuture;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.function.BiFunction;
  5. import java.util.function.Function;
  6. /**
  7. * 消费结果,同时处理异常
  8. * handle类似与thenAccept/thenRun方法,是最后一步结果的调用,但是同时可以处理异常
  9. * @author 长名06
  10. * @version 1.0
  11. */
  12. public class CompletableFuture06 {
  13.     public static Integer num = 10;
  14.     public static void main(String[] args) throws ExecutionException, InterruptedException {
  15.         System.out.println(Thread.currentThread().getName() + "主线程开始");
  16.         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  17.                 int i = 1/0;
  18.                 System.out.println("加10任务开启");
  19.                 num += 10;
  20.             return num;
  21.         }).handle(new BiFunction<Integer, Throwable, Integer>() {
  22.             @Override
  23.             public Integer apply(Integer i, Throwable ex) {
  24.                 System.out.println("进入了handle方法");
  25.                 if(ex != null){
  26.                     System.out.println("发生了异常,内容为" + ex.getMessage());
  27.                     return -1;
  28.                 }else{
  29.                     System.out.println("正常执行,结果为" + i);
  30.                     return i;
  31.                 }
  32.             }
  33.         });
  34.     }
  35. }
复制代码
CompletableFuture07
  1. package com.shaonian.juc.completable;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.concurrent.CompletableFuture;
  5. import java.util.concurrent.CompletionStage;
  6. import java.util.concurrent.ExecutionException;
  7. import java.util.function.BiFunction;
  8. import java.util.function.Function;
  9. /**
  10. * 两个CompletableFuture结果的合并
  11. * @author 长名06
  12. * @version 1.0
  13. */
  14. public class CompletableFuture07 {
  15.     public static Integer num = 10;
  16.     public static void main(String[] args) throws ExecutionException, InterruptedException {
  17.         //有依赖关系的合并
  18.         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  19.             System.out.println("加10任务开启");
  20.             num += 10;
  21.             return num;
  22.         });
  23.         //合并
  24.         CompletableFuture<Integer> future2 = future.thenCompose(new Function<Integer, CompletionStage<Integer>>() {
  25.             @Override
  26.             public CompletionStage<Integer> apply(Integer i) {
  27.                 return CompletableFuture.supplyAsync(() -> {
  28.                     return i + 1;
  29.                 });
  30.             }
  31.         });
  32.         System.out.println(future.get());
  33.         System.out.println(future2.get());
  34.         //无依赖的任务合并
  35.         CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
  36.             System.out.println("加10任务开启");
  37.             num += 10;
  38.             return num;
  39.         });
  40.         CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
  41.             System.out.println("乘10任务开启");
  42.             num *= 10;
  43.             return num;
  44.         });
  45.         //合并两个结果
  46.         CompletableFuture<Object> future3 = job1.thenCombine(job2, new BiFunction<Integer, Integer, List<Integer>>() {
  47.             @Override
  48.             public List<Integer> apply(Integer result1, Integer result2) {
  49.                 ArrayList<Integer> list = new ArrayList<>();
  50.                 list.add(result1);
  51.                 list.add(result2);
  52.                 return list;
  53.             }
  54.         });
  55.         System.out.println("合并结果为" + future3.get());
  56.     }
  57. }
复制代码
CompletableFuture08
  1. package com.shaonian.juc.completable;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.concurrent.CompletableFuture;
  5. import java.util.concurrent.ExecutionException;
  6. import java.util.stream.Collectors;
  7. /**
  8. * 多个独立任务的合并 allOf
  9. * @author 长名06
  10. * @version 1.0
  11. */
  12. public class CompletableFuture08 {
  13.     public static Integer num = 10;
  14.     public static void main(String[] args) throws ExecutionException, InterruptedException {
  15.         List<CompletableFuture<Integer>> list = new ArrayList<>();
  16.         CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
  17.             System.out.println("加10任务开启");
  18.             num += 10;
  19.             return num;
  20.         });
  21.         list.add(job1);
  22.         CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
  23.             System.out.println("乘10任务开启");
  24.             num *= 10;
  25.             return num;
  26.         });
  27.         list.add(job2);
  28.         CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
  29.             System.out.println("减10任务开启");
  30.             num -= 10;
  31.             return num;
  32.         });
  33.         list.add(job3);
  34.         CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
  35.             System.out.println("除10任务开启");
  36.             num /= 10;
  37.             return num;
  38.         });
  39.         list.add(job4);
  40.         //使用allOf需注意,输入也会执行任务,但是无法获取到结果
  41.         //allOf需要等所有的任务执行完毕
  42.         /**
  43.          * 返回值是CompletableFuture<Void>类型
  44.          *  public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
  45.          *         return andTree(cfs, 0, cfs.length - 1);
  46.          *  }
  47.          */
  48. //        CompletableFuture<Void> allJob = CompletableFuture.allOf(list.toArray(new CompletableFuture[0]));
  49. //        System.out.println(allJob.get());
  50.         //也可以使用 join的形式,执行,可以获取结果
  51.         List<Integer> allResult = list.stream().map(CompletableFuture::join)
  52.                 .collect(Collectors.toList());
  53.         System.out.println(allResult);
  54.     }
  55. }
复制代码
CompletableFuture09
  1. package com.shaonian.juc.completable;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.concurrent.CompletableFuture;
  5. import java.util.concurrent.ExecutionException;
  6. /**
  7. * anyOf
  8. * @author 长名06
  9. * @version 1.0
  10. */
  11. public class CompletableFuture09 {
  12.     public static Integer num = 10;
  13.     public static void main(String[] args) throws ExecutionException, InterruptedException {
  14.         List<CompletableFuture<Integer>> list = new ArrayList<>();
  15.         CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
  16.             try {
  17.                 Thread.sleep(5000);
  18.             } catch (InterruptedException e) {
  19.                 e.printStackTrace();
  20.             }
  21.             System.out.println("加10任务开启");
  22.             num += 10;
  23.             return num;
  24.         });
  25.         list.add(job1);
  26.         CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
  27.             try {
  28.                 Thread.sleep(4000);
  29.             } catch (InterruptedException e) {
  30.                 e.printStackTrace();
  31.             }
  32.             System.out.println("乘10任务开启");
  33.             num *= 10;
  34.             return num;
  35.         });
  36.         list.add(job2);
  37.         CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
  38.             try {
  39.                 Thread.sleep(3000);
  40.             } catch (InterruptedException e) {
  41.                 e.printStackTrace();
  42.             }
  43.             System.out.println("减10任务开启");
  44.             num -= 10;
  45.             return num;
  46.         });
  47.         list.add(job3);
  48.         CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
  49.             try {
  50.                 Thread.sleep(2000);
  51.             } catch (InterruptedException e) {
  52.                 e.printStackTrace();
  53.             }
  54.             System.out.println("除10任务开启");
  55.             num /= 10;
  56.             return num;
  57.         });
  58.         list.add(job4);
  59.         //anyOf,这里只要有一个job执行完毕,就结束所有的任务执行,不需要等待所有的job执行完毕
  60.         //但是这个很鸡肋,因为如果不要执行所有的任务,就没必要开启一个CompletableFuture了
  61.         //也可以适用于竞争的场景,先执行成功的获取结果,其他的不再竞争了
  62.         CompletableFuture<Object> allJob = CompletableFuture.anyOf(list.toArray(new CompletableFuture[0]));
  63.         System.out.println(allJob.get());
  64.     }
  65. }
复制代码
只是为了记录自己的学习历程,且本人水平有限,不对之处,请指正。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

傲渊山岳

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表