Flink-异步算子AsyncFunction利用

打印 上一主题 下一主题

主题 935|帖子 935|积分 2809

配景

        在Flink使命中必要将盘算结果数据先存入HBase中(Phoenix),在保证HBase写入成功后再转发到Kafka中。在数据量较大的情况下由于串行写入HBase会出现严峻反压,造成整体数据盘算链路的整体数据产出效率大大低落。
目标

        利用异步算子与外部系统交互,提高吞吐量,低落由于写入数据库造成的耽误。
描述

        在本地测试时验证出,一条数据写入的HBase的哀求完备耗时在40ms左右(仅作为参考,不怜悯况耗时差异较大),但是在Flink使命中上游算子的耗时基本都在几毫秒内,因此写入HBase是整体链路的显着短板。在一个写入HBase的Task中,由于写入哀求是同步执行的,在大部分的情况下等待哀求相应占据了该算子的大部分时间,因此通过并发的实现处理多个哀求和接收多个响应,将等待的时间分摊到每个哀求。
            注:仅仅提高Function的并行度(parallelism)在有些情况下也可以提升吞吐量,但是如许做通常会导致非常高的资源斲丧:更多的并行 Function 实例意味着更多的 Task、更多的线程、更多的 Flink 内部网络毗连、 更多的与数据库的网络毗连、更多的缓冲和更多程序内部协调的开销。
                                                                                                  --Apache Flink Documentation
  
两个参数控制异步操作:
Timeout: 超时参数界说了异步哀求发出多久后未得到响应即被认定为失败。 它可以防止一直等待得不到响应的哀求。
Capacity: 容量参数界说了可以同时进行的异步哀求数。 即使异步 I/O 通常带来更高的吞吐量,执行异步 I/O 操作的算子仍然可能成为流处理的瓶颈。 限定并发哀求的数量可以确保算子不会连续累积待处理的哀求进而造成积压,而是在容量耗尽时触发反压。
留意点:


  • 在利用与外部数据库交互的客户端时,需利用支持异步操作的客户端,若没有则可以利用线程池实现异步提交的哀求,若利用同步的客户端仍会壅闭当前异步算子。
  • 在利用异步算子时,Capacity参数、线程池最大毗连数、数据库毗连池最大毗连数要合理设置,克制由于某个参数设置的过小使哀求的并发数受限,无法达到期望并发。
  •  官方文档的中已对 超时处理、结果的顺序、事件时间、容错保证做了明白的解释,放在文档最后便于查看。
代码

  1. package app;
  2. import com.alibaba.fastjson.JSONObject;
  3. import functions.MyAsyncFunction;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.streaming.api.datastream.AsyncDataStream;
  6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import java.util.concurrent.TimeUnit;
  9. public class AsyncTestJob {
  10.     public static void main(String[] args) throws Exception {
  11.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.         SingleOutputStreamOperator<JSONObject> stream = env.socketTextStream("localhost", 8888)
  13.                 .map((MapFunction<String, JSONObject>) JSONObject::parseObject);
  14.         
  15.         AsyncDataStream.orderedWait(stream,
  16.                 //使用异步算子
  17.                 new MyAsyncFunction(50),
  18.                 //超时时间
  19.                 30000,
  20.                 //超时时间单位
  21.                 TimeUnit.MILLISECONDS,
  22.                 //capacity为异步请求数参数。建议:该参数应小于等于线程池、连接池的最大连接数
  23.                 50)
  24.                 .print();
  25.         env.execute();
  26.     }
  27. }
复制代码
  1. package functions;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.zaxxer.hikari.HikariConfig;
  4. import com.zaxxer.hikari.HikariDataSource;
  5. import org.apache.flink.configuration.Configuration;
  6. import org.apache.flink.streaming.api.functions.async.ResultFuture;
  7. import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import java.sql.Connection;
  11. import java.sql.PreparedStatement;
  12. import java.sql.SQLException;
  13. import java.util.Collections;
  14. import java.util.Properties;
  15. import java.util.concurrent.*;
  16. public class MyAsyncFunction extends RichAsyncFunction<JSONObject, String> {
  17.     private final int maxConnTotal;
  18.     private transient ExecutorService executorService;
  19.     private HikariDataSource phoenixDataSource;
  20.     private final static Logger logger = LoggerFactory.getLogger(AsyncUpsertHBase.class);
  21.     public MyAsyncFunction(Integer maxConnTotal) {
  22.         this.maxConnTotal = maxConnTotal;
  23.     }
  24.     @Override
  25.     public void open(Configuration parameters) throws Exception {
  26.         executorService = Executors.newFixedThreadPool(maxConnTotal);
  27.         phoenixDataSource = getPhoenixDataSource();
  28.     }
  29.     @Override
  30.     public void close() throws Exception {
  31.         executorService.shutdown();
  32.         phoenixDataSource.close();
  33.     }
  34.     @Override
  35.     public void asyncInvoke(JSONObject json, ResultFuture<String> resultFuture) {
  36.         //使用线程池实现异步提交请求
  37.         Future<String> future = executorService.submit(() -> upsertHBase(json));
  38.         CompletableFuture.supplyAsync(() -> {
  39.             try {
  40.                 return future.get();
  41.             } catch (InterruptedException | ExecutionException e) {
  42.                 return null;
  43.             }
  44.         }).thenAccept((String dbResult) -> resultFuture.complete(Collections.singleton(dbResult)));
  45.     }
  46.     /**
  47.      * 标签写入HBase
  48.      *
  49.      * @param json 标签数据
  50.      * @return 标签数据字符串
  51.      * @throws SQLException SQLException
  52.      */
  53.     private String upsertHBase(JSONObject json) throws SQLException {
  54.         Long id = json.getLong("id");
  55.         String key = json.getString("key");
  56.         Integer value = json.getInteger("data");
  57.         String SQL = "UPSERT INTO TEST_TABLE (ID," + key + ") VALUES (" + id + "," + value + ")";
  58.         try (
  59.                 Connection conn = phoenixDataSource.getConnection();
  60.                 PreparedStatement ps = conn.prepareStatement(SQL)
  61.         ) {
  62.             ps.executeUpdate();
  63.             conn.commit();
  64.             return json.toJSONString();
  65.         } catch (Exception e) {
  66.             logger.error(e.getMessage(), e);
  67.             throw e;
  68.         }
  69.     }
  70.     private HikariDataSource getPhoenixDataSource() {
  71.         HikariConfig config = new HikariConfig();
  72.         config.setDriverClassName("org.apache.phoenix.jdbc.PhoenixDriver");
  73.         config.setJdbcUrl("jdbc:phoenix:10.xxx.xxx.xxx:2181");
  74.         // 最大活跃连接数
  75.         config.setMaximumPoolSize(50);
  76.         //最小空闲连接数
  77.         config.setMinimumIdle(10);
  78.         //连接超时时间,单位毫秒
  79.         config.setConnectionTimeout(30000);
  80.         //空闲连接最大存活时间,单位毫秒
  81.         config.setIdleTimeout(5 * 60 * 1000);
  82.         //初始化失败时的超时时间,单位秒
  83.         config.setInitializationFailTimeout(1);
  84.         //池名称
  85.         config.setPoolName("PhoenixHikariCP");
  86.         // 允许池在达到最大大小时增长,如果当前所有连接都在使用中,并且执行者正在等待一个连接
  87.         config.setAllowPoolSuspension(true);
  88.         Properties properties = new Properties();
  89.         properties.setProperty("phoenix.schema.mapSystemTablesToNamespace", "true");
  90.         properties.setProperty("phoenix.schema.isNamespaceMappingEnabled", "true");
  91.         properties.setProperty("phoenix.query.timeoutMs", "1200000");
  92.         properties.setProperty("hbase.rpc.timeout", "1200000");
  93.         properties.setProperty("hbase.client.scanner.timeout.period", "1200000");
  94.         config.setDataSourceProperties(properties);
  95.         return new HikariDataSource(config);
  96.     }
  97. }
复制代码
官方文档

地址:异步 I/O | Apache Flink
   超时处理 #

  当异步 I/O 哀求超时的时候,默认会抛出异常并重启作业。 假如你想处理超时,可以重写 AsyncFunction#timeout 方法。
  结果的顺序 #

  AsyncFunction 发出的并发哀求经常以不确定的顺序完成,这取决于哀求得到响应的顺序。 Flink 提供两种模式控制结果纪录以何种顺序发出。
  

  • 无序模式: 异步哀求一竣事就立刻发出结果纪录。 流中纪录的顺序在颠末异步 I/O 算子之后发生了改变。 当利用 处理时间 作为基本时间特性时,这个模式具有最低的耽误和最少的开销。 此模式利用 AsyncDataStream.unorderedWait(...) 方法。
  • 有序模式: 这种模式保持了流的顺序。发出结果纪录的顺序与触发异步哀求的顺序(纪录输入算子的顺序)相同。为了实现这一点,算子将缓冲一个结果纪录直到这条纪录前面的全部纪录都发出(或超时)。由于纪录大概结果要在 checkpoint 的状态中保存更长的时间,所以与无序模式相比,有序模式通常会带来一些额外的耽误和 checkpoint 开销。此模式利用 AsyncDataStream.orderedWait(...) 方法。
  事件时间 #

  当流处理应用利用事件时间时,异步 I/O 算子会精确处理 watermark。对于两种顺序模式,这意味着以下内容:
  

  • 无序模式: Watermark 既不超前于纪录也不落后于纪录,即 watermark 建立了顺序的边界。 只有连续两个 watermark 之间的纪录是无序发出的。 在一个 watermark 背面天生的纪录只会在这个 watermark 发出以后才发出。 在一个 watermark 之前的全部输入的结果纪录全部发出以后,才会发出这个 watermark。
    这意味着存在 watermark 的情况下,无序模式 会引入一些与有序模式 相同的耽误和管理开销。开销巨细取决于 watermark 的频率。
  • 有序模式: 连续两个 watermark 之间的纪录顺序也被保留了。开销与利用处理时间 相比,没有显著的差异。
  请记着,摄入时间 是一种特殊的事件时间,它基于数据源的处理时间自动天生 watermark。
  容错保证 #

  异步 I/O 算子提供了完全的精确一次容错保证。它将在途的异步哀求的纪录保存在 checkpoint 中,在故障恢复时重新触发哀求。
  实现提示 #

  在实现利用 Executor(大概 Scala 中的 ExecutionContext)和回调的 Futures 时,建议利用 DirectExecutor,因为通常回调的工作量很小,DirectExecutor 克制了额外的线程切换开销。回调通常只是把结果发送给 ResultFuture,也就是把它添加进输出缓冲。从这里开始,包括发送纪录和与 chenkpoint 交互在内的繁重逻辑都将在专有的线程池中进行处理。
  DirectExecutor 可以通过 org.apache.flink.util.concurrent.Executors.directExecutor() 或 com.google.common.util.concurrent.MoreExecutors.directExecutor() 获得。
  警告 #

  Flink 不以多线程方式调用 AsyncFunction
  我们想在这里明白指出一个经常肴杂的地方:AsyncFunction 不是以多线程方式调用的。 只有一个 AsyncFunction 实例,它被流中相应分区内的每个纪录顺序地调用。除非 asyncInvoke(...) 方法快速返回并且依靠于(客户端的)回调, 否则无法实现精确的异步 I/O。
  比方,以下情况导致壅闭的 asyncInvoke(...) 函数,从而使异步举动无效:
  

  • 利用同步数据库客户端,它的查询方法调用在返回结果前一直被壅闭。
  • 在 asyncInvoke(...) 方法内壅闭等待异步客户端返回的 future 范例对象
  目前,出于一致性的原因,AsyncFunction 的算子(异步等待算子)必须位于算子链的头部
  根据 FLINK-13063 给出的原因,目前我们必须断开异步等待算子的算子链以防止潜伏的一致性问题。这改变了先前支持的算子链的举动。必要旧有举动并继承可能违背一致性保证的用户可以实例化并手工将异步等待算子添加到作业图中并将链策略设置回通过异步等待算子的 ChainingStrategy.ALWAYS 方法进行链接。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王國慶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表