Future和CompletableFuture区别

打印 上一主题 下一主题

主题 887|帖子 887|积分 2661


  • Future :获取异步返回的结果需要使用轮询的方式,消耗cup
    1.         ExecutorService executorService = Executors.newFixedThreadPool(10);
    2.         Future<String> future = executorService.submit(()->{
    3.             try {
    4.                 Thread.sleep(2000);
    5.             } catch (InterruptedException e) {
    6.                 e.printStackTrace();
    7.             }
    8.             return "future";
    9.         });
    10.         while(true){
    11.             if(future.isDone()){
    12.                 System.out.println(future.get());
    13.                 break;
    14.             }
    15.         }
    复制代码
  • CompletableFuture:采用观察者模式,阻塞获取异步返回的结果,性能得到优化
    1.                  System.out.println("=============CompletableFuture===================");
    2.         CompletableFuture testFuture1 = CompletableFuture.supplyAsync(()->{
    3.             return "丽丽1";
    4.         }).thenApply((element)->{
    5.             System.out.println("testFuture1后续操作:"+element);
    6.             return "丽丽2";
    7.         });
    8.         System.out.println(testFuture1.get());
    9.         System.out.println("=============CompletableFuture===================");
    10.         CompletableFuture testFuture2 = CompletableFuture.supplyAsync(()->{
    11.             return "丽丽1";
    12.         }).thenAccept((element)->{
    13.             System.out.println("testFuture2后续操作:"+element);
    14.         });
    15.         System.out.println(testFuture2.get());
    复制代码
  • CompletableFuture的使用明细
    1. * runAsync 无返回值
    2. * supplyAsync 有返回值
    3. *
    4. * thenAccept 无返回值
    5. * thenApply 有返回值
    6. * thenRun 不关心上一步执行结果,执行下一个操作
    7. * get() 为阻塞获取 可设置超时时间 避免长时间阻塞
    复制代码
    1. 实现接口 AsyncFunction 用于请求分发
    2. 定义一个callback回调函数,该函数用于取出异步请求的返回结果,并将返回的结果传递给ResultFuture
    3. 对DataStream的数据使用Async操作
    复制代码

    • 例子
      1. /**
      2. * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
      3.   *  通过向数据库发送异步请求并设置回调方法
      4. */
      5. class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
      6.     /** The database specific client that can issue concurrent requests with callbacks
      7.      可以异步请求的特定数据库的客户端 */
      8.     private transient DatabaseClient client;
      9.     @Override
      10.     public void open(Configuration parameters) throws Exception {
      11.         client = new DatabaseClient(host, post, credentials);
      12.     }
      13.     @Override
      14.     public void close() throws Exception {
      15.         client.close();
      16.     }
      17.     @Override
      18.     public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
      19.         // issue the asynchronous request, receive a future for result
      20.         // 发起一个异步请求,返回结果的 future
      21.         final Future<String> result = client.query(key);
      22.         // set the callback to be executed once the request by the client is complete
      23.         // the callback simply forwards the result to the result future
      24.         // 设置请求完成时的回调.将结果传递给 result future
      25.         CompletableFuture.supplyAsync(new Supplier<String>() {
      26.         
      27.             @Override
      28.             public String get() {
      29.                 try {
      30.                     return result.get();
      31.                 } catch (InterruptedException | ExecutionException e) {
      32.                     // Normally handled explicitly.
      33.                     return null;
      34.                 }
      35.             }
      36.         }).thenAccept( (String dbResult) -> {
      37.             resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
      38.         });
      39.     }
      40. }
      41. // create the original stream
      42. // 创建一个原始的流
      43. DataStream<String> stream = ...;
      44. // apply the async I/O transformation
      45. // 添加一个 async I/O ,指定超时时间,和进行中的异步请求的最大数量
      46. DataStream<Tuple2<String, String>> resultStream =
      47.     AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
      复制代码
    • 注意事项

      • Timeout:定义请求超时时间,异步请求多久没完成会被认为是超时了
      • Capacity:定义了同时进行的异步请求的数量,可以限制并发请求数量,不会积压过多的请求
      • 超时处理:默认当一个异步 I/O 请求超时时,会引发异常并重新启动作业。 如果要处理超时,可以覆盖该AsyncFunction的timeout方法来自定义超时之后的处理方式
      • 响应结果的顺序:AsyncDataStream包含两种输出模式,

        • unorderedWait无序:响应结果的顺序与异步请求的顺序不同
        • orderedWait有序:响应结果的顺序与异步请求的顺序相同




免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

千千梦丶琪

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表