ToB企服应用市场:ToB评测及商务社交产业平台
标题:
Future和CompletableFuture区别
[打印本页]
作者:
千千梦丶琪
时间:
2023-8-30 22:52
标题:
Future和CompletableFuture区别
Future :获取异步返回的结果需要使用轮询的方式,消耗cup
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(()->{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "future";
});
while(true){
if(future.isDone()){
System.out.println(future.get());
break;
}
}
复制代码
CompletableFuture:采用观察者模式,阻塞获取异步返回的结果,性能得到优化
System.out.println("=============CompletableFuture===================");
CompletableFuture testFuture1 = CompletableFuture.supplyAsync(()->{
return "丽丽1";
}).thenApply((element)->{
System.out.println("testFuture1后续操作:"+element);
return "丽丽2";
});
System.out.println(testFuture1.get());
System.out.println("=============CompletableFuture===================");
CompletableFuture testFuture2 = CompletableFuture.supplyAsync(()->{
return "丽丽1";
}).thenAccept((element)->{
System.out.println("testFuture2后续操作:"+element);
});
System.out.println(testFuture2.get());
复制代码
CompletableFuture的使用明细
官方文档:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio
* runAsync 无返回值
* supplyAsync 有返回值
*
* thenAccept 无返回值
* thenApply 有返回值
* thenRun 不关心上一步执行结果,执行下一个操作
* get() 为阻塞获取 可设置超时时间 避免长时间阻塞
复制代码
实现接口 AsyncFunction 用于请求分发
定义一个callback回调函数,该函数用于取出异步请求的返回结果,并将返回的结果传递给ResultFuture
对DataStream的数据使用Async操作
复制代码
例子
/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
* 通过向数据库发送异步请求并设置回调方法
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
/** The database specific client that can issue concurrent requests with callbacks
可以异步请求的特定数据库的客户端 */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
// issue the asynchronous request, receive a future for result
// 发起一个异步请求,返回结果的 future
final Future<String> result = client.query(key);
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
// 设置请求完成时的回调.将结果传递给 result future
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// Normally handled explicitly.
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}
// create the original stream
// 创建一个原始的流
DataStream<String> stream = ...;
// apply the async I/O transformation
// 添加一个 async I/O ,指定超时时间,和进行中的异步请求的最大数量
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
复制代码
注意事项
Timeout
:定义请求超时时间,异步请求多久没完成会被认为是超时了
Capacity
:定义了同时进行的异步请求的数量,可以限制并发请求数量,不会积压过多的请求
超时处理
:默认当一个异步 I/O 请求超时时,会引发异常并重新启动作业。 如果要处理超时,可以覆盖该AsyncFunction的timeout方法来自定义超时之后的处理方式
响应结果的顺序
:AsyncDataStream包含两种输出模式,
unorderedWait无序:响应结果的顺序与异步请求的顺序不同
orderedWait有序:响应结果的顺序与异步请求的顺序相同
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4