MoreExecutors
directExecutor
- ExecutorService executor = Executors.newSingleThreadExecutor();
- SettableFuture<Integer> future = SettableFuture.create();
- // 使用其他线程去 set 对应的结果。
- executor.submit(() -> {
- future.set(1);
- });
- Futures.addCallback(future, new FutureCallback<>() {
- @Override
- public void onSuccess(Integer result) {
- // main线程执行的
- System.out.println("result=" + result + "线程名:" + Thread.currentThread().getName());//main
- }
- @Override
- public void onFailure(Throwable t) {
- }
- }, MoreExecutors.directExecutor());
复制代码 执行 callback 的线程池这里指定为 MoreExecutors#directExecutor ,那么这里执行打印 result 的线程是主线程
在 MoreExecutors#directExecutor 中,可以看到界说是如许的:- public final class MoreExecutors {
- // 省略了类内其他成员
- public static Executor directExecutor() {
- return DirectExecutor.INSTANCE;
- }
- }
复制代码 以及- @GwtCompatible
- @ElementTypesAreNonnullByDefault
- enum DirectExecutor implements Executor {
- INSTANCE;
- @Override
- public void execute(Runnable command) {
- command.run();
- }
- @Override
- public String toString() {
- return "MoreExecutors.directExecutor()";
- }
- }
复制代码 MoreExecutors#directExecutor 其实是一个假的线程池,表示直接执行。
再看下面这个例子:- ExecutorService executor = Executors.newSingleThreadExecutor();
- SettableFuture<Integer> future = SettableFuture.create();
- // 使用其他线程去 set 对应的结果。
- executor.submit(() -> {
- // 增加线程 sleep 的逻辑。
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- future.set(1);
- });
- Futures.addCallback(future, new FutureCallback<>() {
- @Override
- public void onSuccess(Integer result) {
- // 此时就会被 executor 的线程执行
- System.out.println("result=" + result + "线程名:" + Thread.currentThread().getName());//此时还未打印出来,主线程就结束了
- }
- @Override
- public void onFailure(Throwable t) {
- }
- }, MoreExecutors.directExecutor());
复制代码
那么这里清晰了:
- 如果 future 已经完成,那么 MoreExecutor#directExecutor 表示当前线程;
- 如果 future 未完成,那么 MoreExecutor#directExecutor 就是将来完成 future 的线程。
因此其实详细执行回调的线程某种水平上是不确定的
ListenableFuture
引言
jdk原生的future已经提供了异步操作,但是不能直接回调。guava对future进行了增强,核心接口就是ListenableFuture。JDK8从guava中吸收了精华新增的类CompletableFuture,也可以直接看这个类的学习。
JUC 的 Future 接口提供了一种异步获取使命执行结果的机制,表示一个异步计算的结果。- ExecutorService executor = Executors.newFixedThreadPool(1);
- Future<String> future = executor.submit(() -> {
- // 执行异步任务,返回一个结果
- return "Task completed";
- });
- // Blocked
- String result = future.get();
复制代码 Executor 实际返回的是实现类 FutureTask,它同时实现了 Runnable 接口,因此可以手动创建异步使命。- FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
- @Override
- public String call() throws Exception {
- return "Hello";
- }
- });
-
- new Thread(futureTask).start();
- System.out.println(futureTask.get());
复制代码 而 Guava 提供的 ListenableFuture 更进一步,允许注册回调,在使命完成后自动执行,实际也是使用它的实现类 ListenableFutureTask。- // 装饰原始的线程池
- ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
- ListenableFuture<String> future = listeningExecutorService.submit(() -> {
- // int i = 1 / 0;
- return "Hello";
- });
- // 添加回调 1
- Futures.addCallback(future, new FutureCallback<String>() {
- // 任务成功时的回调
- @Override
- public void onSuccess(String result) {
- System.out.println(result);
- }
- // 任务失败时的回调
- @Override
- public void onFailure(Throwable t) {
- System.out.println("Error: " + t.getMessage());
- }
- }, listeningExecutorService);
- // 添加回调 2
- future.addListener(new Runnable() {
- @Override
- public void run() {
- System.out.println("Done");
- }
- }, listeningExecutorService);
复制代码 回调源码分析
先看下ListenableFuture接口界说:- public interface ListenableFuture<V> extends Future<V> {
- void addListener(Runnable listener, Executor executor);
- }
复制代码 可以看到,这个接口在Future接口的底子上增长了addListener方法,允许我们注册回调函数。固然,在编程时可能不会直接使用这个接口,由于这个接口只能传Runnable实例。
addListener方法
- @Override
- public void addListener(Runnable listener, Executor executor) {
- checkNotNull(listener, "Runnable was null.");
- checkNotNull(executor, "Executor was null.");
- // Checking isDone and listeners != TOMBSTONE may seem redundant, but our contract for
- // addListener says that listeners execute 'immediate' if the future isDone(). However, our
- // protocol for completing a future is to assign the value field (which sets isDone to true) and
- // then to release waiters, followed by executing afterDone(), followed by releasing listeners.
- // That means that it is possible to observe that the future isDone and that your listeners
- // don't execute 'immediately'. By checking isDone here we avoid that.
- // A corollary to all that is that we don't need to check isDone inside the loop because if we
- // get into the loop we know that we weren't done when we entered and therefore we aren't under
- // an obligation to execute 'immediately'.
- if (!isDone()) {
- Listener oldHead = listeners; // 获取当前监听器的头结点
- if (oldHead != Listener.TOMBSTONE) {// 检查当前的头节点是否是TOMBSTONE。TOMBSTONE用来表示监听器列表不再接受新的监听器,通常是因为Future已经完成。
- Listener newNode = new Listener(listener, executor);//通过这个listener新增一个一个节点,节点中包含executor
- do {
- newNode.next = oldHead;//将newNode.next指向当前头结点,此时newNode就是头结点
- if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {//检查头节点是否更新成功
- return;//更新成功就可以返回了
- }
- oldHead = listeners; // 重新执行 头插法
- } while (oldHead != Listener.TOMBSTONE);// 如果头节点变成了TOMBSTONE,则退出循环;并且
- }
- }
- // If we get here then the Listener TOMBSTONE was set, which means the future is done, call
- // the listener.
- executeListener(listener, executor);//执行到这里意味着监听器TOMBSTONE就设置好了,也就是future已经完成,可以直接调用监听器
- }
复制代码 这里其实就是在添加listener的方法中首先检查Future是否已经完成:
- 如果Future已经完成,那么就没有必要添加新的监听器,直接executeListener。
- 如果future没有完成,那么会新建一个Listener节点,并插入到链表头部(Listener就是一个链表)
如果已经完成,会直接执行executeListner 方法- private static void executeListener(Runnable runnable, Executor executor) {
- try {
- executor.execute(runnable);//直接使用listener拥有的线程executor执行
- } catch (Exception e) { // sneaky checked exception
- // Log it and keep going -- bad runnable and/or executor. Don't punish the other runnables if
- // we're given a bad one. We only catch Exception because we want Errors to propagate up.
- log.get()
- .log(
- Level.SEVERE,
- "RuntimeException while executing runnable "
- + runnable
- + " with executor "
- + executor,
- e);
- }
- }
复制代码那么如果没有完成呢,在listener链表中的什么时候会执行?看后续的回调函数的触发内容
addCallback方法
Futures类还提供了另一个回调方法:addCallback方法- public static <V> void addCallback(
- final ListenableFuture<V> future,
- final FutureCallback<? super V> callback,
- Executor executor) {
- Preconditions.checkNotNull(callback);
- future.addListener(new CallbackListener<V>(future, callback), executor);//调用了addListener方法
- }
复制代码 那哪些方法会来调用这个complete方法呢?
Service
Guava 的 Service 框架是一个用于管理服务生命周期的轻量级框架,可以资助我们把异步操作封装成一个Service服务。让这个服务有了运行状态(也可以理解成生命周期),如许可以实时了解当前服务的运行状态。同时还可以添加监听器来监听服务运行状态之间的变化。
Guava里面的服务有五种状态,如下所示:
- Service.State.NEW: 服务创建状态
- Service.State.STARTING: 服务启动中
- Service.State.RUNNING:服务启动完成,正在运行中
- Service.State.STOPPING: 服务停止中
- Service.State.TERMINATED: 服务停止完成,竣事
全部的服务都需要实现Service接口,里面包罗了服务需要实现的一些基本方法,以下是Service接口:- private static final class CallbackListener<V> implements Runnable {
- final Future<V> future;
- final FutureCallback<? super V> callback;
-
- CallbackListener(Future<V> future, FutureCallback<? super V> callback) {
- this.future = future;
- this.callback = callback;
- }
-
- @Override
- public void run() {//回调时的逻辑
- if (future instanceof InternalFutureFailureAccess) {
- Throwable failure =
- InternalFutures.tryInternalFastPathGetFailure((InternalFutureFailureAccess) future);
- if (failure != null) {
- callback.onFailure(failure);
- return;
- }
- }
- final V value;
- try {
- value = getDone(future);//获取返回值
- } catch (ExecutionException e) {
- callback.onFailure(e.getCause());//如果发生了异常,则会调用onFailure方法通知异常
- return;
- } catch (RuntimeException | Error e) {
- callback.onFailure(e);//如果发生了异常,则会调用onFailure方法通知异常
- return;
- }
- callback.onSuccess(value);//将返回值调用FutureCallback实例的onSuccess方法执行注册的回调逻辑
- }
- }
复制代码 那应该怎么来使用Service,需要实现的异步逻辑包装成服务呢.Guava里面已经给提供了三个底子实现类:
- AbstractService
- AbstractExecutionThreadService
- AbstractScheduledService
AbstractExecutionThreadService
AbstractExecutionThreadService可以把一个详细的异步操作封装成Service服务。说白了就是把之前在线程的实现逻辑封装成服务,把之前线程的详细实现逻辑搬到AbstractExecutionThreadService的实现方法run()方法去执行。
常用方法介绍
首先AbstractExecutionThreadService实现了Service,Service的方法在AbstractExecutionThreadService里面都有,AbstractExecutionThreadService新加了一些方法。如下所示:- private static void complete(AbstractFuture<?> param) {
- // Declare a "true" local variable so that the Checker Framework will infer nullness.
- AbstractFuture<?> future = param;//获取future
- Listener next = null;
- outer:
- while (true) {
- future.releaseWaiters();//通知所有执行的方法
- // We call this before the listeners in order to avoid needing to manage a separate stack data
- // structure for them. Also, some implementations rely on this running prior to listeners
- // so that the cleanup work is visible to listeners.
- // afterDone() should be generally fast and only used for cleanup work... but in theory can
- // also be recursive and create StackOverflowErrors
- future.afterDone();
- // push the current set of listeners onto next
- next = future.clearListeners(next);//反转listener链表
- future = null;
- while (next != null) {
- Listener curr = next;//获取当前listener
- next = next.next;
- /*
- * requireNonNull is safe because the listener stack never contains TOMBSTONE until after
- * clearListeners.
- */
- Runnable task = requireNonNull(curr.task);
- if (task instanceof SetFuture) {
- SetFuture<?> setFuture = (SetFuture<?>) task;
- // We unwind setFuture specifically to avoid StackOverflowErrors in the case of long
- // chains of SetFutures
- // Handling this special case is important because there is no way to pass an executor to
- // setFuture, so a user couldn't break the chain by doing this themselves. It is also
- // potentially common if someone writes a recursive Futures.transformAsync transformer.
- future = setFuture.owner;
- if (future.value == setFuture) {
- Object valueToSet = getFutureValue(setFuture.future);
- if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) {
- continue outer;
- }
- }
- // other wise the future we were trying to set is already done.
- } else {
- /*
- * requireNonNull is safe because the listener stack never contains TOMBSTONE until after
- * clearListeners.
- */
- executeListener(task, requireNonNull(curr.executor));// 交给listener拥有的线程池进行处理
- }
- }
- break;
- }
- }
复制代码 AbstractExecutionThreadService类里面最重要的就是run()方法了,这个方法是服务需要详细实现的方法,服务需要处理的详细逻辑在这个方法里面做。
详细使用
- public interface Service {
- //启动当前服务,只有当服务的状态是NEW的情况下才可以启动,否则抛出IllegalStateException异常
- @CanIgnoreReturnValue
- Service startAsync();
- //判断当前服务是否处在运行状态 (RUNNING)
- boolean isRunning();
- //获取当前服务的状态
- Service.State state();
- //停止当前服务
- @CanIgnoreReturnValue
- Service stopAsync();
- // 等待当前服务到达RUNNING状态
- void awaitRunning();
- // 在指定的时间内等待当前服务到达RUNNING状态,如果在指定时间没有达到则抛出TimeoutException
- void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException;
- // 等待当前服务到达TERMINATED状态
- void awaitTerminated();
- //在指定的时间内等待当前服务达到TERMINATED状态,
- void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException;
- // 获取服务器失败的原因,在服务是FAILED的状态的时候调用该函数,否则抛出IllegalStateException异常
- Throwable failureCause();
- //监听当前服务的状态改变,executor参数表示,监听回调函数在哪里执行
- void addListener(Service.Listener listener, Executor executor);
- }
复制代码 AbstractScheduledService
AbstractScheduledService可以把周期性的使命封装成一个服务。线程池也有一个周期性的线程池么,两者是一一对应的.
常用方法介绍
AbstractScheduledService也是一个服务所以Service里面的方法AbstractScheduledService也都有,同时,AbstractScheduledService也新增了一些其它方法- public class AbstractExecutionThreadService {
- // 开始执行服务逻辑的时候会调用,可以在里面做一些初始化的操作
- protected void startUp() throws Exception;
- // 当前服务需要执行的具体逻辑
- protected abstract void run() throws Exception;
- // 服务停止之后会调用的函数,可以在里面做 一些释放资源的处理
- protected void shutDown() throws Exception {}
- //比如在run方法里面有一个无线循环,可以在这个方法里面置状态,退出无线循环,让服务真正停止
- //调stopAsync函数的时候,会调用该方法
- protected void triggerShutdown() {}
-
- ...
- }
复制代码 详细使用
自界说一个类继承AbstractScheduledService,实现一个非常简朴的周期性使命.- public class AbstractExecutionThreadServiceImpl extends AbstractExecutionThreadService {
- private volatile boolean running = true; //声明一个状态
- @Override
- protected void startUp() {
- //TODO: 做一些初始化操作
- }
- @Override
- public void run() {
- // 具体需要实现的业务逻辑,会在线程中执行
- while (running) {
- try {
- // 等待2s
- Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
- System.out.println("do our work.....");
- } catch (Exception e) {
- //TODO: 处理异常,这里如果抛出异常,会使服务状态变为failed同时导致任务终止。
- }
- }
- }
- @Override
- protected void triggerShutdown() {
- //TODO: 如果的run方法中有无限循环,可以在这里置状态,让其退出无限循环,stopAsync()里面会调用到该方法
- running = false; //这里改变状态值,run方法中就能够得到响应。
- }
- @Override
- protected void shutDown() throws Exception {
- //TODO: 可以做一些清理操作,比如关闭连接。shutDown() 是在线程的具体实现里面调用的
- }
- }
复制代码 ServiceManager
ServiceManager是用来管理多个服务的,让对多个服务的操作变的更加方便,比如可以同时去启动多个服务,同时去停止多个服务等等。
常用方法介绍
[code]public class ServiceManager { //构造函数,管理多个Service服务 public ServiceManager(Iterable |