大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu

打印 上一主题 下一主题

主题 835|帖子 835|积分 2505

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:



  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(正在更新…)
章节内容

上节我们完成了如下的内容:


  • Kudu Java API
  • 增删改查 编写案例测试

实现思路

将数据从 Flink 下沉到 Kudu 的基本思路如下:


  • 环境准备:确保 Flink 和 Kudu 环境正常运行,并设置好相干依赖。
  • 创建 Kudu 表:在 Kudu 中界说要存储的数据表,包括主键和列类型。
  • 数据流计划:使用 Flink 的 DataStream API 读取输入数据流,进行必要的数据处置惩罚和转换。
  • 写入 Kudu:通过 Kudu 的毗连器将处置惩罚后的数据写入 Kudu 表。需要设置 Kudu 客户端和表的相干信息。
  • 执行作业:启动 Flink 作业,实时将数据流中的数据写入 Kudu,便于后续查询和分析。
添加依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <modelVersion>4.0.0</modelVersion>
  6.     <groupId>org.example</groupId>
  7.     <artifactId>flink-test</artifactId>
  8.     <version>1.0-SNAPSHOT</version>
  9.     <properties>
  10.         <maven.compiler.source>11</maven.compiler.source>
  11.         <maven.compiler.target>11</maven.compiler.target>
  12.         <flink.version>1.11.1</flink.version>
  13.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  14.     </properties>
  15.     <dependencies>
  16.         <dependency>
  17.             <groupId>org.apache.flink</groupId>
  18.             <artifactId>flink-java</artifactId>
  19.             <version>${flink.version}</version>
  20.         </dependency>
  21.         <dependency>
  22.             <groupId>org.apache.flink</groupId>
  23.             <artifactId>flink-streaming-java_2.12</artifactId>
  24.             <version>${flink.version}</version>
  25.         </dependency>
  26.         <dependency>
  27.             <groupId>org.apache.flink</groupId>
  28.             <artifactId>flink-clients_2.12</artifactId>
  29.             <version>${flink.version}</version>
  30.         </dependency>
  31.         <dependency>
  32.             <groupId>org.apache.kudu</groupId>
  33.             <artifactId>kudu-client</artifactId>
  34.             <version>1.17.0</version>
  35.         </dependency>
  36.     </dependencies>
  37. </project>
复制代码
数据源

  1. new UserInfo("001", "Jack", 18),
  2. new UserInfo("002", "Rose", 20),
  3. new UserInfo("003", "Cris", 22),
  4. new UserInfo("004", "Lily", 19),
  5. new UserInfo("005", "Lucy", 21),
  6. new UserInfo("006", "Json", 24),
复制代码
自界说下沉器

  1. package icu.wzk.kudu;
  2. import org.apache.flink.configuration.Configuration;
  3. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  4. import org.apache.kudu.Schema;
  5. import org.apache.kudu.Type;
  6. import org.apache.kudu.client.*;
  7. import org.apache.log4j.Logger;
  8. import java.io.ByteArrayOutputStream;
  9. import java.io.ObjectOutputStream;
  10. import java.util.Map;
  11. public class MyFlinkSinkToKudu extends RichSinkFunction<Map<String, Object>> {
  12.     private final static Logger logger = Logger.getLogger("MyFlinkSinkToKudu");
  13.     private KuduClient kuduClient;
  14.     private KuduTable kuduTable;
  15.     private String kuduMasterAddr;
  16.     private String tableName;
  17.     private Schema schema;
  18.     private KuduSession kuduSession;
  19.     private ByteArrayOutputStream out;
  20.     private ObjectOutputStream os;
  21.     public MyFlinkSinkToKudu(String kuduMasterAddr, String tableName) {
  22.         this.kuduMasterAddr = kuduMasterAddr;
  23.         this.tableName = tableName;
  24.     }
  25.     @Override
  26.     public void open(Configuration parameters) throws Exception {
  27.         out = new ByteArrayOutputStream();
  28.         os = new ObjectOutputStream(out);
  29.         kuduClient = new KuduClient.KuduClientBuilder(kuduMasterAddr).build();
  30.         kuduTable = kuduClient.openTable(tableName);
  31.         schema = kuduTable.getSchema();
  32.         kuduSession = kuduClient.newSession();
  33.         kuduSession.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
  34.     }
  35.     @Override
  36.     public void invoke(Map<String, Object> map, Context context) throws Exception {
  37.         if (null == map) {
  38.             return;
  39.         }
  40.         try {
  41.             int columnCount = schema.getColumnCount();
  42.             Insert insert = kuduTable.newInsert();
  43.             PartialRow row = insert.getRow();
  44.             for (int i = 0; i < columnCount; i ++) {
  45.                 Object value = map.get(schema.getColumnByIndex(i).getName());
  46.                 insertData(row, schema.getColumnByIndex(i).getType(), schema.getColumnByIndex(i).getName(), value);
  47.                 OperationResponse response = kuduSession.apply(insert);
  48.                 if (null != response) {
  49.                     logger.error(response.getRowError().toString());
  50.                 }
  51.             }
  52.         } catch (Exception e) {
  53.             logger.error(e);
  54.         }
  55.     }
  56.     @Override
  57.     public void close() throws Exception {
  58.         try {
  59.             kuduSession.close();
  60.             kuduClient.close();
  61.             os.close();
  62.             out.close();
  63.         } catch (Exception e) {
  64.             logger.error(e);
  65.         }
  66.     }
  67.     private void insertData(PartialRow row, Type type, String columnName, Object value) {
  68.         try {
  69.             switch (type) {
  70.                 case STRING:
  71.                     row.addString(columnName, value.toString());
  72.                     return;
  73.                 case INT32:
  74.                     row.addInt(columnName, Integer.valueOf(value.toString()));
  75.                     return;
  76.                 case INT64:
  77.                     row.addLong(columnName, Long.valueOf(value.toString()));
  78.                     return;
  79.                 case DOUBLE:
  80.                     row.addDouble(columnName, Double.valueOf(value.toString()));
  81.                     return;
  82.                 case BOOL:
  83.                     row.addBoolean(columnName, Boolean.valueOf(value.toString()));
  84.                     return;
  85.                 case BINARY:
  86.                     os.writeObject(value);
  87.                     row.addBinary(columnName, out.toByteArray());
  88.                     return;
  89.                 case FLOAT:
  90.                     row.addFloat(columnName, Float.valueOf(value.toString()));
  91.                 default:
  92.                     throw new UnsupportedOperationException("Unknown Type: " + type);
  93.             }
  94.         } catch (Exception e) {
  95.             logger.error("插入数据异常: " + e);
  96.         }
  97.     }
  98. }
复制代码
编写实体

  1. package icu.wzk.kudu;
  2. public class UserInfo {
  3.     private String id;
  4.     private String name;
  5.     private Integer age;
  6.     public UserInfo(String id, String name, Integer age) {
  7.         this.id = id;
  8.         this.name = name;
  9.         this.age = age;
  10.     }
  11.     public String getId() {
  12.         return id;
  13.     }
  14.     public void setId(String id) {
  15.         this.id = id;
  16.     }
  17.     public String getName() {
  18.         return name;
  19.     }
  20.     public void setName(String name) {
  21.         this.name = name;
  22.     }
  23.     public Integer getAge() {
  24.         return age;
  25.     }
  26.     public void setAge(Integer age) {
  27.         this.age = age;
  28.     }
  29. }
复制代码
执行建表

  1. package icu.wzk.kudu;
  2. import org.apache.kudu.ColumnSchema;
  3. import org.apache.kudu.Schema;
  4. import org.apache.kudu.Type;
  5. import org.apache.kudu.client.CreateTableOptions;
  6. import org.apache.kudu.client.KuduClient;
  7. import org.apache.kudu.client.KuduException;
  8. import java.util.ArrayList;
  9. import java.util.List;
  10. public class KuduCreateTable {
  11.     public static void main(String[] args) throws KuduException {
  12.         String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
  13.         KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(masterAddress);
  14.         KuduClient kuduClient = kuduClientBuilder.build();
  15.         String tableName = "user";
  16.         List<ColumnSchema> columnSchemas = new ArrayList<>();
  17.         ColumnSchema id = new ColumnSchema
  18.                 .ColumnSchemaBuilder("id", Type.INT32)
  19.                 .key(true)
  20.                 .build();
  21.         columnSchemas.add(id);
  22.         ColumnSchema name = new ColumnSchema
  23.                 .ColumnSchemaBuilder("name", Type.STRING)
  24.                 .key(false)
  25.                 .build();
  26.         columnSchemas.add(name);
  27.         ColumnSchema age = new ColumnSchema
  28.                 .ColumnSchemaBuilder("age", Type.INT32)
  29.                 .key(false)
  30.                 .build();
  31.         columnSchemas.add(age);
  32.         Schema schema = new Schema(columnSchemas);
  33.         CreateTableOptions options = new CreateTableOptions();
  34.         // 副本数量为1
  35.         options.setNumReplicas(1);
  36.         List<String> colrule = new ArrayList<>();
  37.         colrule.add("id");
  38.         options.addHashPartitions(colrule, 3);
  39.         kuduClient.createTable(tableName, schema, options);
  40.         kuduClient.close();
  41.     }
  42. }
复制代码
主逻辑代码

  1. package icu.wzk.kudu;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. import java.util.stream.Stream;
  9. public class SinkToKuduTest {
  10.     public static void main(String[] args) throws Exception {
  11.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.         DataStreamSource<UserInfo> dataSource = env.fromElements(
  13.                 new UserInfo("001", "Jack", 18),
  14.                 new UserInfo("002", "Rose", 20),
  15.                 new UserInfo("003", "Cris", 22),
  16.                 new UserInfo("004", "Lily", 19),
  17.                 new UserInfo("005", "Lucy", 21),
  18.                 new UserInfo("006", "Json", 24)
  19.         );
  20.         SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource
  21.                 .map(new MapFunction<UserInfo, Map<String, Object>>() {
  22.                     @Override
  23.                     public Map<String, Object> map(UserInfo value) throws Exception {
  24.                         Map<String, Object> map = new HashMap<>();
  25.                         map.put("id", value.getId());
  26.                         map.put("name", value.getName());
  27.                         map.put("age", value.getAge());
  28.                         return map;
  29.                     }
  30.                 });
  31.         String kuduMasterAddr = "localhost:7051,localhost:7151,localhost:7251";
  32.         String tableInfo = "user";
  33.         mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));
  34.         env.execute("SinkToKuduTest");
  35.     }
  36. }
复制代码
解释分析

环境设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();:初始化 Flink 的执行环境,这是 Flink 应用的入口。
数据源创建

DataStreamSource dataSource = env.fromElements(…):创建了一个包罗多个 UserInfo 对象的数据源,模拟了一个输入流。
数据转换

SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource.map(…):使用 map 函数将 UserInfo 对象转换为 Map<String, Object>,便于后续处置惩罚和写入 Kudu。每个 UserInfo 的属性都被放入一个 HashMap 中。
Kudu 设置信息

String kuduMasterAddr = “localhost:7051,localhost:7151,localhost:7251”; 和 String tableInfo = “user”;:界说 Kudu 的主节点地点和目标表的信息。
数据下沉

mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));:将转换后的数据流添加到 Kudu 的自界说 Sink 中。MyFlinkSinkToKudu 类应该实现了将数据写入 Kudu 的逻辑。
执行作业

env.execute(“SinkToKuduTest”);:启动 Flink 作业,执行整个数据流处置惩罚流程。
测试运行



  • 先运行建表
  • 再运行主逻辑
我们建表之后,确认user表存在。然后我们运行Flink步伐,将数据写入Kudu。

确认有表后,执行 Flink 步伐:

注意事项



  • 并发性:根据 Kudu 集群的规模和设置,可以调解 Flink 作业的并发性,以进步写入性能。
  • 批量写入:Kudu 支持批量插入,可以通过适当设置 Flink 的 sink 来进步性能。
  • 故障处置惩罚:确保在作业中处置惩罚异常和重试逻辑,以确保数据不会丢失。
  • 监控与调试:使用 Flink 的监控工具和 Kudu 的工具(如 Kudu UI)来监控数据流和性能。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

络腮胡菲菲

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表