【Guava】并发编程ListenableFuture&Service

打印 上一主题 下一主题

主题 1837|帖子 1837|积分 5511

MoreExecutors

directExecutor
  1. ExecutorService executor = Executors.newSingleThreadExecutor();
  2. SettableFuture<Integer> future = SettableFuture.create();
  3. // 使用其他线程去 set 对应的结果。
  4. executor.submit(() -> {
  5.     future.set(1);
  6. });
  7. Futures.addCallback(future, new FutureCallback<>() {
  8.     @Override
  9.     public void onSuccess(Integer result) {
  10.         // main线程执行的
  11.         System.out.println("result=" + result + "线程名:" + Thread.currentThread().getName());//main
  12.     }
  13.     @Override
  14.     public void onFailure(Throwable t) {
  15.     }
  16. }, MoreExecutors.directExecutor());
复制代码
执行 callback 的线程池这里指定为 MoreExecutors#directExecutor ,那么这里执行打印 result 的线程是主线程

在 MoreExecutors#directExecutor 中,可以看到界说是如许的:
  1. public final class MoreExecutors {
  2.     // 省略了类内其他成员
  3.     public static Executor directExecutor() {
  4.         return DirectExecutor.INSTANCE;
  5.   }
  6. }
复制代码
以及
  1. @GwtCompatible
  2. @ElementTypesAreNonnullByDefault
  3. enum DirectExecutor implements Executor {
  4.   INSTANCE;
  5.   @Override
  6.   public void execute(Runnable command) {
  7.     command.run();
  8.   }
  9.   @Override
  10.   public String toString() {
  11.     return "MoreExecutors.directExecutor()";
  12.   }
  13. }
复制代码
MoreExecutors#directExecutor 其实是一个假的线程池,表示直接执行。
再看下面这个例子:
  1. ExecutorService executor = Executors.newSingleThreadExecutor();
  2. SettableFuture<Integer> future = SettableFuture.create();
  3. // 使用其他线程去 set 对应的结果。
  4. executor.submit(() -> {
  5.     // 增加线程 sleep 的逻辑。
  6.     try {
  7.         Thread.sleep(1000);
  8.     } catch (InterruptedException e) {
  9.         e.printStackTrace();
  10.     }
  11.     future.set(1);
  12. });
  13. Futures.addCallback(future, new FutureCallback<>() {
  14.     @Override
  15.     public void onSuccess(Integer result) {
  16.         // 此时就会被 executor 的线程执行
  17.         System.out.println("result=" + result + "线程名:" + Thread.currentThread().getName());//此时还未打印出来,主线程就结束了
  18.     }
  19.     @Override
  20.     public void onFailure(Throwable t) {
  21.     }
  22. }, MoreExecutors.directExecutor());
复制代码

那么这里清晰了:

  • 如果 future 已经完成,那么 MoreExecutor#directExecutor 表示当前线程;
  • 如果 future 未完成,那么 MoreExecutor#directExecutor 就是将来完成 future 的线程。
因此其实详细执行回调的线程某种水平上是不确定的
ListenableFuture

引言

jdk原生的future已经提供了异步操作,但是不能直接回调。guava对future进行了增强,核心接口就是ListenableFuture。JDK8从guava中吸收了精华新增的类CompletableFuture,也可以直接看这个类的学习。
JUC 的 Future 接口提供了一种异步获取使命执行结果的机制,表示一个异步计算的结果。
  1. ExecutorService executor = Executors.newFixedThreadPool(1);
  2. Future<String> future = executor.submit(() -> {
  3.     // 执行异步任务,返回一个结果
  4.     return "Task completed";
  5. });
  6. // Blocked
  7. String result = future.get();
复制代码
Executor 实际返回的是实现类 FutureTask,它同时实现了 Runnable 接口,因此可以手动创建异步使命。
  1. FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
  2.     @Override
  3.     public String call() throws Exception {
  4.         return "Hello";
  5.     }
  6. });
  7.         
  8. new Thread(futureTask).start();
  9. System.out.println(futureTask.get());
复制代码
而 Guava 提供的 ListenableFuture 更进一步,允许注册回调,在使命完成后自动执行,实际也是使用它的实现类 ListenableFutureTask。
  1. // 装饰原始的线程池
  2. ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
  3. ListenableFuture<String> future = listeningExecutorService.submit(() -> {
  4.     // int i = 1 / 0;
  5.     return "Hello";
  6. });
  7. // 添加回调 1
  8. Futures.addCallback(future, new FutureCallback<String>() {
  9.     // 任务成功时的回调
  10.     @Override
  11.     public void onSuccess(String result) {
  12.         System.out.println(result);
  13.     }
  14.     // 任务失败时的回调
  15.     @Override
  16.     public void onFailure(Throwable t) {
  17.         System.out.println("Error: " + t.getMessage());
  18.     }
  19. }, listeningExecutorService);
  20. // 添加回调 2
  21. future.addListener(new Runnable() {
  22.     @Override
  23.     public void run() {
  24.         System.out.println("Done");
  25.     }
  26. }, listeningExecutorService);
复制代码
回调源码分析

先看下ListenableFuture接口界说:
  1. public interface ListenableFuture<V> extends Future<V> {
  2.     void addListener(Runnable listener, Executor executor);
  3. }
复制代码
可以看到,这个接口在Future接口的底子上增长了addListener方法,允许我们注册回调函数。固然,在编程时可能不会直接使用这个接口,由于这个接口只能传Runnable实例。
addListener方法
  1. @Override
  2.   public void addListener(Runnable listener, Executor executor) {
  3.     checkNotNull(listener, "Runnable was null.");
  4.     checkNotNull(executor, "Executor was null.");
  5.     // Checking isDone and listeners != TOMBSTONE may seem redundant, but our contract for
  6.     // addListener says that listeners execute 'immediate' if the future isDone(). However, our
  7.     // protocol for completing a future is to assign the value field (which sets isDone to true) and
  8.     // then to release waiters, followed by executing afterDone(), followed by releasing listeners.
  9.     // That means that it is possible to observe that the future isDone and that your listeners
  10.     // don't execute 'immediately'.  By checking isDone here we avoid that.
  11.     // A corollary to all that is that we don't need to check isDone inside the loop because if we
  12.     // get into the loop we know that we weren't done when we entered and therefore we aren't under
  13.     // an obligation to execute 'immediately'.
  14.     if (!isDone()) {
  15.       Listener oldHead = listeners; // 获取当前监听器的头结点
  16.       if (oldHead != Listener.TOMBSTONE) {// 检查当前的头节点是否是TOMBSTONE。TOMBSTONE用来表示监听器列表不再接受新的监听器,通常是因为Future已经完成。
  17.         Listener newNode = new Listener(listener, executor);//通过这个listener新增一个一个节点,节点中包含executor
  18.         do {
  19.           newNode.next = oldHead;//将newNode.next指向当前头结点,此时newNode就是头结点
  20.           if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {//检查头节点是否更新成功
  21.             return;//更新成功就可以返回了
  22.           }
  23.           oldHead = listeners; // 重新执行 头插法
  24.         } while (oldHead != Listener.TOMBSTONE);// 如果头节点变成了TOMBSTONE,则退出循环;并且
  25.       }
  26.     }
  27.     // If we get here then the Listener TOMBSTONE was set, which means the future is done, call
  28.     // the listener.
  29.     executeListener(listener, executor);//执行到这里意味着监听器TOMBSTONE就设置好了,也就是future已经完成,可以直接调用监听器
  30.   }
复制代码
这里其实就是在添加listener的方法中首先检查Future是否已经完成:

  • 如果Future已经完成,那么就没有必要添加新的监听器,直接executeListener。
  • 如果future没有完成,那么会新建一个Listener节点,并插入到链表头部(Listener就是一个链表)
如果已经完成,会直接执行executeListner 方法
  1. private static void executeListener(Runnable runnable, Executor executor) {
  2.   try {
  3.     executor.execute(runnable);//直接使用listener拥有的线程executor执行
  4.   } catch (Exception e) { // sneaky checked exception
  5.     // Log it and keep going -- bad runnable and/or executor. Don't punish the other runnables if
  6.     // we're given a bad one. We only catch Exception because we want Errors to propagate up.
  7.     log.get()
  8.         .log(
  9.             Level.SEVERE,
  10.             "RuntimeException while executing runnable "
  11.                 + runnable
  12.                 + " with executor "
  13.                 + executor,
  14.             e);
  15.   }
  16. }
复制代码
那么如果没有完成呢,在listener链表中的什么时候会执行?看后续的回调函数的触发内容
addCallback方法

Futures类还提供了另一个回调方法:addCallback方法
  1. public static <V> void addCallback(
  2.   final ListenableFuture<V> future,
  3.   final FutureCallback<? super V> callback,
  4.   Executor executor) {
  5.         Preconditions.checkNotNull(callback);
  6.         future.addListener(new CallbackListener<V>(future, callback), executor);//调用了addListener方法
  7. }
复制代码
那哪些方法会来调用这个complete方法呢?

Service

Guava 的 Service 框架是一个用于管理服务生命周期的轻量级框架,可以资助我们把异步操作封装成一个Service服务。让这个服务有了运行状态(也可以理解成生命周期),如许可以实时了解当前服务的运行状态。同时还可以添加监听器来监听服务运行状态之间的变化。
Guava里面的服务有五种状态,如下所示:

  • Service.State.NEW: 服务创建状态
  • Service.State.STARTING: 服务启动中
  • Service.State.RUNNING:服务启动完成,正在运行中
  • Service.State.STOPPING: 服务停止中
  • Service.State.TERMINATED: 服务停止完成,竣事
全部的服务都需要实现Service接口,里面包罗了服务需要实现的一些基本方法,以下是Service接口:
  1. private static final class CallbackListener<V> implements Runnable {
  2.     final Future<V> future;
  3.     final FutureCallback<? super V> callback;
  4.     CallbackListener(Future<V> future, FutureCallback<? super V> callback) {
  5.       this.future = future;
  6.       this.callback = callback;
  7.     }
  8.     @Override
  9.     public void run() {//回调时的逻辑
  10.       if (future instanceof InternalFutureFailureAccess) {
  11.         Throwable failure =
  12.             InternalFutures.tryInternalFastPathGetFailure((InternalFutureFailureAccess) future);
  13.         if (failure != null) {
  14.           callback.onFailure(failure);
  15.           return;
  16.         }
  17.       }
  18.       final V value;
  19.       try {
  20.         value = getDone(future);//获取返回值
  21.       } catch (ExecutionException e) {
  22.         callback.onFailure(e.getCause());//如果发生了异常,则会调用onFailure方法通知异常
  23.         return;
  24.       } catch (RuntimeException | Error e) {
  25.         callback.onFailure(e);//如果发生了异常,则会调用onFailure方法通知异常
  26.         return;
  27.       }
  28.       callback.onSuccess(value);//将返回值调用FutureCallback实例的onSuccess方法执行注册的回调逻辑
  29.     }
  30. }
复制代码
那应该怎么来使用Service,需要实现的异步逻辑包装成服务呢.Guava里面已经给提供了三个底子实现类:

  • AbstractService
  • AbstractExecutionThreadService
  • AbstractScheduledService
AbstractExecutionThreadService

AbstractExecutionThreadService可以把一个详细的异步操作封装成Service服务。说白了就是把之前在线程的实现逻辑封装成服务,把之前线程的详细实现逻辑搬到AbstractExecutionThreadService的实现方法run()方法去执行。
常用方法介绍

首先AbstractExecutionThreadService实现了Service,Service的方法在AbstractExecutionThreadService里面都有,AbstractExecutionThreadService新加了一些方法。如下所示:
  1. private static void complete(AbstractFuture<?> param) {
  2.     // Declare a "true" local variable so that the Checker Framework will infer nullness.
  3.     AbstractFuture<?> future = param;//获取future
  4.     Listener next = null;
  5.     outer:
  6.     while (true) {
  7.       future.releaseWaiters();//通知所有执行的方法
  8.       // We call this before the listeners in order to avoid needing to manage a separate stack data
  9.       // structure for them.  Also, some implementations rely on this running prior to listeners
  10.       // so that the cleanup work is visible to listeners.
  11.       // afterDone() should be generally fast and only used for cleanup work... but in theory can
  12.       // also be recursive and create StackOverflowErrors
  13.       future.afterDone();
  14.       // push the current set of listeners onto next
  15.       next = future.clearListeners(next);//反转listener链表
  16.       future = null;
  17.       while (next != null) {
  18.         Listener curr = next;//获取当前listener
  19.         next = next.next;
  20.         /*
  21.          * requireNonNull is safe because the listener stack never contains TOMBSTONE until after
  22.          * clearListeners.
  23.          */
  24.         Runnable task = requireNonNull(curr.task);
  25.         if (task instanceof SetFuture) {
  26.           SetFuture<?> setFuture = (SetFuture<?>) task;
  27.           // We unwind setFuture specifically to avoid StackOverflowErrors in the case of long
  28.           // chains of SetFutures
  29.           // Handling this special case is important because there is no way to pass an executor to
  30.           // setFuture, so a user couldn't break the chain by doing this themselves.  It is also
  31.           // potentially common if someone writes a recursive Futures.transformAsync transformer.
  32.           future = setFuture.owner;
  33.           if (future.value == setFuture) {
  34.             Object valueToSet = getFutureValue(setFuture.future);
  35.             if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) {
  36.               continue outer;
  37.             }
  38.           }
  39.           // other wise the future we were trying to set is already done.
  40.         } else {
  41.           /*
  42.            * requireNonNull is safe because the listener stack never contains TOMBSTONE until after
  43.            * clearListeners.
  44.            */
  45.           executeListener(task, requireNonNull(curr.executor));// 交给listener拥有的线程池进行处理
  46.         }
  47.       }
  48.       break;
  49.     }
  50.   }
复制代码
AbstractExecutionThreadService类里面最重要的就是run()方法了,这个方法是服务需要详细实现的方法,服务需要处理的详细逻辑在这个方法里面做。
详细使用
  1. public interface Service {
  2.         //启动当前服务,只有当服务的状态是NEW的情况下才可以启动,否则抛出IllegalStateException异常
  3.         @CanIgnoreReturnValue
  4.         Service startAsync();
  5.         //判断当前服务是否处在运行状态 (RUNNING)
  6.         boolean isRunning();
  7.         //获取当前服务的状态
  8.         Service.State state();
  9.         //停止当前服务
  10.         @CanIgnoreReturnValue
  11.         Service stopAsync();
  12.         // 等待当前服务到达RUNNING状态
  13.         void awaitRunning();
  14.         // 在指定的时间内等待当前服务到达RUNNING状态,如果在指定时间没有达到则抛出TimeoutException
  15.         void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException;
  16.         // 等待当前服务到达TERMINATED状态
  17.         void awaitTerminated();
  18.         //在指定的时间内等待当前服务达到TERMINATED状态,
  19.         void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException;
  20.         // 获取服务器失败的原因,在服务是FAILED的状态的时候调用该函数,否则抛出IllegalStateException异常
  21.         Throwable failureCause();
  22.         //监听当前服务的状态改变,executor参数表示,监听回调函数在哪里执行
  23.         void addListener(Service.Listener listener, Executor executor);
  24. }
复制代码
AbstractScheduledService

AbstractScheduledService可以把周期性的使命封装成一个服务。线程池也有一个周期性的线程池么,两者是一一对应的.
常用方法介绍

AbstractScheduledService也是一个服务所以Service里面的方法AbstractScheduledService也都有,同时,AbstractScheduledService也新增了一些其它方法
  1. public class AbstractExecutionThreadService {
  2.         // 开始执行服务逻辑的时候会调用,可以在里面做一些初始化的操作
  3.         protected void startUp() throws Exception;
  4.         // 当前服务需要执行的具体逻辑
  5.         protected abstract void run() throws Exception;
  6.         // 服务停止之后会调用的函数,可以在里面做 一些释放资源的处理
  7.         protected void shutDown() throws Exception {}
  8.     //比如在run方法里面有一个无线循环,可以在这个方法里面置状态,退出无线循环,让服务真正停止
  9.         //调stopAsync函数的时候,会调用该方法
  10.         protected void triggerShutdown() {}
  11.        
  12.         ...
  13. }
复制代码
详细使用

自界说一个类继承AbstractScheduledService,实现一个非常简朴的周期性使命.
  1. public class AbstractExecutionThreadServiceImpl extends AbstractExecutionThreadService {
  2.         private volatile boolean running = true; //声明一个状态
  3.         @Override
  4.         protected void startUp() {
  5.                 //TODO: 做一些初始化操作
  6.         }
  7.         @Override
  8.         public void run() {
  9.                 // 具体需要实现的业务逻辑,会在线程中执行
  10.                 while (running) {
  11.                         try {
  12.                                 // 等待2s
  13.                                 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
  14.                                 System.out.println("do our work.....");
  15.                         } catch (Exception e) {
  16.                                 //TODO: 处理异常,这里如果抛出异常,会使服务状态变为failed同时导致任务终止。
  17.                         }
  18.                 }
  19.         }
  20.         @Override
  21.         protected void triggerShutdown() {
  22.                 //TODO: 如果的run方法中有无限循环,可以在这里置状态,让其退出无限循环,stopAsync()里面会调用到该方法
  23.                 running = false; //这里改变状态值,run方法中就能够得到响应。
  24.         }
  25.         @Override
  26.         protected void shutDown() throws Exception {
  27.                 //TODO: 可以做一些清理操作,比如关闭连接。shutDown() 是在线程的具体实现里面调用的
  28.         }
  29. }
复制代码
ServiceManager

ServiceManager是用来管理多个服务的,让对多个服务的操作变的更加方便,比如可以同时去启动多个服务,同时去停止多个服务等等。
常用方法介绍

[code]public class ServiceManager {        //构造函数,管理多个Service服务        public ServiceManager(Iterable

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

伤心客

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表