ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flink-异步算子AsyncFunction利用 [打印本页]

作者: 王國慶    时间: 2024-9-22 18:37
标题: Flink-异步算子AsyncFunction利用
配景

        在Flink使命中必要将盘算结果数据先存入HBase中(Phoenix),在保证HBase写入成功后再转发到Kafka中。在数据量较大的情况下由于串行写入HBase会出现严峻反压,造成整体数据盘算链路的整体数据产出效率大大低落。
目标

        利用异步算子与外部系统交互,提高吞吐量,低落由于写入数据库造成的耽误。
描述

        在本地测试时验证出,一条数据写入的HBase的哀求完备耗时在40ms左右(仅作为参考,不怜悯况耗时差异较大),但是在Flink使命中上游算子的耗时基本都在几毫秒内,因此写入HBase是整体链路的显着短板。在一个写入HBase的Task中,由于写入哀求是同步执行的,在大部分的情况下等待哀求相应占据了该算子的大部分时间,因此通过并发的实现处理多个哀求和接收多个响应,将等待的时间分摊到每个哀求。
            注:仅仅提高Function的并行度(parallelism)在有些情况下也可以提升吞吐量,但是如许做通常会导致非常高的资源斲丧:更多的并行 Function 实例意味着更多的 Task、更多的线程、更多的 Flink 内部网络毗连、 更多的与数据库的网络毗连、更多的缓冲和更多程序内部协调的开销。
                                                                                                  --Apache Flink Documentation
  
两个参数控制异步操作:
Timeout: 超时参数界说了异步哀求发出多久后未得到响应即被认定为失败。 它可以防止一直等待得不到响应的哀求。
Capacity: 容量参数界说了可以同时进行的异步哀求数。 即使异步 I/O 通常带来更高的吞吐量,执行异步 I/O 操作的算子仍然可能成为流处理的瓶颈。 限定并发哀求的数量可以确保算子不会连续累积待处理的哀求进而造成积压,而是在容量耗尽时触发反压。
留意点:

代码

  1. package app;
  2. import com.alibaba.fastjson.JSONObject;
  3. import functions.MyAsyncFunction;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.streaming.api.datastream.AsyncDataStream;
  6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import java.util.concurrent.TimeUnit;
  9. public class AsyncTestJob {
  10.     public static void main(String[] args) throws Exception {
  11.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.         SingleOutputStreamOperator<JSONObject> stream = env.socketTextStream("localhost", 8888)
  13.                 .map((MapFunction<String, JSONObject>) JSONObject::parseObject);
  14.         
  15.         AsyncDataStream.orderedWait(stream,
  16.                 //使用异步算子
  17.                 new MyAsyncFunction(50),
  18.                 //超时时间
  19.                 30000,
  20.                 //超时时间单位
  21.                 TimeUnit.MILLISECONDS,
  22.                 //capacity为异步请求数参数。建议:该参数应小于等于线程池、连接池的最大连接数
  23.                 50)
  24.                 .print();
  25.         env.execute();
  26.     }
  27. }
复制代码
  1. package functions;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.zaxxer.hikari.HikariConfig;
  4. import com.zaxxer.hikari.HikariDataSource;
  5. import org.apache.flink.configuration.Configuration;
  6. import org.apache.flink.streaming.api.functions.async.ResultFuture;
  7. import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import java.sql.Connection;
  11. import java.sql.PreparedStatement;
  12. import java.sql.SQLException;
  13. import java.util.Collections;
  14. import java.util.Properties;
  15. import java.util.concurrent.*;
  16. public class MyAsyncFunction extends RichAsyncFunction<JSONObject, String> {
  17.     private final int maxConnTotal;
  18.     private transient ExecutorService executorService;
  19.     private HikariDataSource phoenixDataSource;
  20.     private final static Logger logger = LoggerFactory.getLogger(AsyncUpsertHBase.class);
  21.     public MyAsyncFunction(Integer maxConnTotal) {
  22.         this.maxConnTotal = maxConnTotal;
  23.     }
  24.     @Override
  25.     public void open(Configuration parameters) throws Exception {
  26.         executorService = Executors.newFixedThreadPool(maxConnTotal);
  27.         phoenixDataSource = getPhoenixDataSource();
  28.     }
  29.     @Override
  30.     public void close() throws Exception {
  31.         executorService.shutdown();
  32.         phoenixDataSource.close();
  33.     }
  34.     @Override
  35.     public void asyncInvoke(JSONObject json, ResultFuture<String> resultFuture) {
  36.         //使用线程池实现异步提交请求
  37.         Future<String> future = executorService.submit(() -> upsertHBase(json));
  38.         CompletableFuture.supplyAsync(() -> {
  39.             try {
  40.                 return future.get();
  41.             } catch (InterruptedException | ExecutionException e) {
  42.                 return null;
  43.             }
  44.         }).thenAccept((String dbResult) -> resultFuture.complete(Collections.singleton(dbResult)));
  45.     }
  46.     /**
  47.      * 标签写入HBase
  48.      *
  49.      * @param json 标签数据
  50.      * @return 标签数据字符串
  51.      * @throws SQLException SQLException
  52.      */
  53.     private String upsertHBase(JSONObject json) throws SQLException {
  54.         Long id = json.getLong("id");
  55.         String key = json.getString("key");
  56.         Integer value = json.getInteger("data");
  57.         String SQL = "UPSERT INTO TEST_TABLE (ID," + key + ") VALUES (" + id + "," + value + ")";
  58.         try (
  59.                 Connection conn = phoenixDataSource.getConnection();
  60.                 PreparedStatement ps = conn.prepareStatement(SQL)
  61.         ) {
  62.             ps.executeUpdate();
  63.             conn.commit();
  64.             return json.toJSONString();
  65.         } catch (Exception e) {
  66.             logger.error(e.getMessage(), e);
  67.             throw e;
  68.         }
  69.     }
  70.     private HikariDataSource getPhoenixDataSource() {
  71.         HikariConfig config = new HikariConfig();
  72.         config.setDriverClassName("org.apache.phoenix.jdbc.PhoenixDriver");
  73.         config.setJdbcUrl("jdbc:phoenix:10.xxx.xxx.xxx:2181");
  74.         // 最大活跃连接数
  75.         config.setMaximumPoolSize(50);
  76.         //最小空闲连接数
  77.         config.setMinimumIdle(10);
  78.         //连接超时时间,单位毫秒
  79.         config.setConnectionTimeout(30000);
  80.         //空闲连接最大存活时间,单位毫秒
  81.         config.setIdleTimeout(5 * 60 * 1000);
  82.         //初始化失败时的超时时间,单位秒
  83.         config.setInitializationFailTimeout(1);
  84.         //池名称
  85.         config.setPoolName("PhoenixHikariCP");
  86.         // 允许池在达到最大大小时增长,如果当前所有连接都在使用中,并且执行者正在等待一个连接
  87.         config.setAllowPoolSuspension(true);
  88.         Properties properties = new Properties();
  89.         properties.setProperty("phoenix.schema.mapSystemTablesToNamespace", "true");
  90.         properties.setProperty("phoenix.schema.isNamespaceMappingEnabled", "true");
  91.         properties.setProperty("phoenix.query.timeoutMs", "1200000");
  92.         properties.setProperty("hbase.rpc.timeout", "1200000");
  93.         properties.setProperty("hbase.client.scanner.timeout.period", "1200000");
  94.         config.setDataSourceProperties(properties);
  95.         return new HikariDataSource(config);
  96.     }
  97. }
复制代码
官方文档

地址:异步 I/O | Apache Flink
   超时处理 #

  当异步 I/O 哀求超时的时候,默认会抛出异常并重启作业。 假如你想处理超时,可以重写 AsyncFunction#timeout 方法。
  结果的顺序 #

  AsyncFunction 发出的并发哀求经常以不确定的顺序完成,这取决于哀求得到响应的顺序。 Flink 提供两种模式控制结果纪录以何种顺序发出。
  
  事件时间 #

  当流处理应用利用事件时间时,异步 I/O 算子会精确处理 watermark。对于两种顺序模式,这意味着以下内容:
  
  请记着,摄入时间 是一种特殊的事件时间,它基于数据源的处理时间自动天生 watermark。
  容错保证 #

  异步 I/O 算子提供了完全的精确一次容错保证。它将在途的异步哀求的纪录保存在 checkpoint 中,在故障恢复时重新触发哀求。
  实现提示 #

  在实现利用 Executor(大概 Scala 中的 ExecutionContext)和回调的 Futures 时,建议利用 DirectExecutor,因为通常回调的工作量很小,DirectExecutor 克制了额外的线程切换开销。回调通常只是把结果发送给 ResultFuture,也就是把它添加进输出缓冲。从这里开始,包括发送纪录和与 chenkpoint 交互在内的繁重逻辑都将在专有的线程池中进行处理。
  DirectExecutor 可以通过 org.apache.flink.util.concurrent.Executors.directExecutor() 或 com.google.common.util.concurrent.MoreExecutors.directExecutor() 获得。
  警告 #

  Flink 不以多线程方式调用 AsyncFunction
  我们想在这里明白指出一个经常肴杂的地方:AsyncFunction 不是以多线程方式调用的。 只有一个 AsyncFunction 实例,它被流中相应分区内的每个纪录顺序地调用。除非 asyncInvoke(...) 方法快速返回并且依靠于(客户端的)回调, 否则无法实现精确的异步 I/O。
  比方,以下情况导致壅闭的 asyncInvoke(...) 函数,从而使异步举动无效:
  
  目前,出于一致性的原因,AsyncFunction 的算子(异步等待算子)必须位于算子链的头部
  根据 FLINK-13063 给出的原因,目前我们必须断开异步等待算子的算子链以防止潜伏的一致性问题。这改变了先前支持的算子链的举动。必要旧有举动并继承可能违背一致性保证的用户可以实例化并手工将异步等待算子添加到作业图中并将链策略设置回通过异步等待算子的 ChainingStrategy.ALWAYS 方法进行链接。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4