ToB企服应用市场:ToB评测及商务社交产业平台

标题: 大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu [打印本页]

作者: 络腮胡菲菲    时间: 2024-10-7 20:30
标题: 大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:


章节内容

上节我们完成了如下的内容:


实现思路

将数据从 Flink 下沉到 Kudu 的基本思路如下:

添加依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <modelVersion>4.0.0</modelVersion>
  6.     <groupId>org.example</groupId>
  7.     <artifactId>flink-test</artifactId>
  8.     <version>1.0-SNAPSHOT</version>
  9.     <properties>
  10.         <maven.compiler.source>11</maven.compiler.source>
  11.         <maven.compiler.target>11</maven.compiler.target>
  12.         <flink.version>1.11.1</flink.version>
  13.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  14.     </properties>
  15.     <dependencies>
  16.         <dependency>
  17.             <groupId>org.apache.flink</groupId>
  18.             <artifactId>flink-java</artifactId>
  19.             <version>${flink.version}</version>
  20.         </dependency>
  21.         <dependency>
  22.             <groupId>org.apache.flink</groupId>
  23.             <artifactId>flink-streaming-java_2.12</artifactId>
  24.             <version>${flink.version}</version>
  25.         </dependency>
  26.         <dependency>
  27.             <groupId>org.apache.flink</groupId>
  28.             <artifactId>flink-clients_2.12</artifactId>
  29.             <version>${flink.version}</version>
  30.         </dependency>
  31.         <dependency>
  32.             <groupId>org.apache.kudu</groupId>
  33.             <artifactId>kudu-client</artifactId>
  34.             <version>1.17.0</version>
  35.         </dependency>
  36.     </dependencies>
  37. </project>
复制代码
数据源

  1. new UserInfo("001", "Jack", 18),
  2. new UserInfo("002", "Rose", 20),
  3. new UserInfo("003", "Cris", 22),
  4. new UserInfo("004", "Lily", 19),
  5. new UserInfo("005", "Lucy", 21),
  6. new UserInfo("006", "Json", 24),
复制代码
自界说下沉器

  1. package icu.wzk.kudu;
  2. import org.apache.flink.configuration.Configuration;
  3. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  4. import org.apache.kudu.Schema;
  5. import org.apache.kudu.Type;
  6. import org.apache.kudu.client.*;
  7. import org.apache.log4j.Logger;
  8. import java.io.ByteArrayOutputStream;
  9. import java.io.ObjectOutputStream;
  10. import java.util.Map;
  11. public class MyFlinkSinkToKudu extends RichSinkFunction<Map<String, Object>> {
  12.     private final static Logger logger = Logger.getLogger("MyFlinkSinkToKudu");
  13.     private KuduClient kuduClient;
  14.     private KuduTable kuduTable;
  15.     private String kuduMasterAddr;
  16.     private String tableName;
  17.     private Schema schema;
  18.     private KuduSession kuduSession;
  19.     private ByteArrayOutputStream out;
  20.     private ObjectOutputStream os;
  21.     public MyFlinkSinkToKudu(String kuduMasterAddr, String tableName) {
  22.         this.kuduMasterAddr = kuduMasterAddr;
  23.         this.tableName = tableName;
  24.     }
  25.     @Override
  26.     public void open(Configuration parameters) throws Exception {
  27.         out = new ByteArrayOutputStream();
  28.         os = new ObjectOutputStream(out);
  29.         kuduClient = new KuduClient.KuduClientBuilder(kuduMasterAddr).build();
  30.         kuduTable = kuduClient.openTable(tableName);
  31.         schema = kuduTable.getSchema();
  32.         kuduSession = kuduClient.newSession();
  33.         kuduSession.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
  34.     }
  35.     @Override
  36.     public void invoke(Map<String, Object> map, Context context) throws Exception {
  37.         if (null == map) {
  38.             return;
  39.         }
  40.         try {
  41.             int columnCount = schema.getColumnCount();
  42.             Insert insert = kuduTable.newInsert();
  43.             PartialRow row = insert.getRow();
  44.             for (int i = 0; i < columnCount; i ++) {
  45.                 Object value = map.get(schema.getColumnByIndex(i).getName());
  46.                 insertData(row, schema.getColumnByIndex(i).getType(), schema.getColumnByIndex(i).getName(), value);
  47.                 OperationResponse response = kuduSession.apply(insert);
  48.                 if (null != response) {
  49.                     logger.error(response.getRowError().toString());
  50.                 }
  51.             }
  52.         } catch (Exception e) {
  53.             logger.error(e);
  54.         }
  55.     }
  56.     @Override
  57.     public void close() throws Exception {
  58.         try {
  59.             kuduSession.close();
  60.             kuduClient.close();
  61.             os.close();
  62.             out.close();
  63.         } catch (Exception e) {
  64.             logger.error(e);
  65.         }
  66.     }
  67.     private void insertData(PartialRow row, Type type, String columnName, Object value) {
  68.         try {
  69.             switch (type) {
  70.                 case STRING:
  71.                     row.addString(columnName, value.toString());
  72.                     return;
  73.                 case INT32:
  74.                     row.addInt(columnName, Integer.valueOf(value.toString()));
  75.                     return;
  76.                 case INT64:
  77.                     row.addLong(columnName, Long.valueOf(value.toString()));
  78.                     return;
  79.                 case DOUBLE:
  80.                     row.addDouble(columnName, Double.valueOf(value.toString()));
  81.                     return;
  82.                 case BOOL:
  83.                     row.addBoolean(columnName, Boolean.valueOf(value.toString()));
  84.                     return;
  85.                 case BINARY:
  86.                     os.writeObject(value);
  87.                     row.addBinary(columnName, out.toByteArray());
  88.                     return;
  89.                 case FLOAT:
  90.                     row.addFloat(columnName, Float.valueOf(value.toString()));
  91.                 default:
  92.                     throw new UnsupportedOperationException("Unknown Type: " + type);
  93.             }
  94.         } catch (Exception e) {
  95.             logger.error("插入数据异常: " + e);
  96.         }
  97.     }
  98. }
复制代码
编写实体

  1. package icu.wzk.kudu;
  2. public class UserInfo {
  3.     private String id;
  4.     private String name;
  5.     private Integer age;
  6.     public UserInfo(String id, String name, Integer age) {
  7.         this.id = id;
  8.         this.name = name;
  9.         this.age = age;
  10.     }
  11.     public String getId() {
  12.         return id;
  13.     }
  14.     public void setId(String id) {
  15.         this.id = id;
  16.     }
  17.     public String getName() {
  18.         return name;
  19.     }
  20.     public void setName(String name) {
  21.         this.name = name;
  22.     }
  23.     public Integer getAge() {
  24.         return age;
  25.     }
  26.     public void setAge(Integer age) {
  27.         this.age = age;
  28.     }
  29. }
复制代码
执行建表

  1. package icu.wzk.kudu;
  2. import org.apache.kudu.ColumnSchema;
  3. import org.apache.kudu.Schema;
  4. import org.apache.kudu.Type;
  5. import org.apache.kudu.client.CreateTableOptions;
  6. import org.apache.kudu.client.KuduClient;
  7. import org.apache.kudu.client.KuduException;
  8. import java.util.ArrayList;
  9. import java.util.List;
  10. public class KuduCreateTable {
  11.     public static void main(String[] args) throws KuduException {
  12.         String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
  13.         KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(masterAddress);
  14.         KuduClient kuduClient = kuduClientBuilder.build();
  15.         String tableName = "user";
  16.         List<ColumnSchema> columnSchemas = new ArrayList<>();
  17.         ColumnSchema id = new ColumnSchema
  18.                 .ColumnSchemaBuilder("id", Type.INT32)
  19.                 .key(true)
  20.                 .build();
  21.         columnSchemas.add(id);
  22.         ColumnSchema name = new ColumnSchema
  23.                 .ColumnSchemaBuilder("name", Type.STRING)
  24.                 .key(false)
  25.                 .build();
  26.         columnSchemas.add(name);
  27.         ColumnSchema age = new ColumnSchema
  28.                 .ColumnSchemaBuilder("age", Type.INT32)
  29.                 .key(false)
  30.                 .build();
  31.         columnSchemas.add(age);
  32.         Schema schema = new Schema(columnSchemas);
  33.         CreateTableOptions options = new CreateTableOptions();
  34.         // 副本数量为1
  35.         options.setNumReplicas(1);
  36.         List<String> colrule = new ArrayList<>();
  37.         colrule.add("id");
  38.         options.addHashPartitions(colrule, 3);
  39.         kuduClient.createTable(tableName, schema, options);
  40.         kuduClient.close();
  41.     }
  42. }
复制代码
主逻辑代码

  1. package icu.wzk.kudu;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. import java.util.stream.Stream;
  9. public class SinkToKuduTest {
  10.     public static void main(String[] args) throws Exception {
  11.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.         DataStreamSource<UserInfo> dataSource = env.fromElements(
  13.                 new UserInfo("001", "Jack", 18),
  14.                 new UserInfo("002", "Rose", 20),
  15.                 new UserInfo("003", "Cris", 22),
  16.                 new UserInfo("004", "Lily", 19),
  17.                 new UserInfo("005", "Lucy", 21),
  18.                 new UserInfo("006", "Json", 24)
  19.         );
  20.         SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource
  21.                 .map(new MapFunction<UserInfo, Map<String, Object>>() {
  22.                     @Override
  23.                     public Map<String, Object> map(UserInfo value) throws Exception {
  24.                         Map<String, Object> map = new HashMap<>();
  25.                         map.put("id", value.getId());
  26.                         map.put("name", value.getName());
  27.                         map.put("age", value.getAge());
  28.                         return map;
  29.                     }
  30.                 });
  31.         String kuduMasterAddr = "localhost:7051,localhost:7151,localhost:7251";
  32.         String tableInfo = "user";
  33.         mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));
  34.         env.execute("SinkToKuduTest");
  35.     }
  36. }
复制代码
解释分析

环境设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();:初始化 Flink 的执行环境,这是 Flink 应用的入口。
数据源创建

DataStreamSource dataSource = env.fromElements(…):创建了一个包罗多个 UserInfo 对象的数据源,模拟了一个输入流。
数据转换

SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource.map(…):使用 map 函数将 UserInfo 对象转换为 Map<String, Object>,便于后续处置惩罚和写入 Kudu。每个 UserInfo 的属性都被放入一个 HashMap 中。
Kudu 设置信息

String kuduMasterAddr = “localhost:7051,localhost:7151,localhost:7251”; 和 String tableInfo = “user”;:界说 Kudu 的主节点地点和目标表的信息。
数据下沉

mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));:将转换后的数据流添加到 Kudu 的自界说 Sink 中。MyFlinkSinkToKudu 类应该实现了将数据写入 Kudu 的逻辑。
执行作业

env.execute(“SinkToKuduTest”);:启动 Flink 作业,执行整个数据流处置惩罚流程。
测试运行


我们建表之后,确认user表存在。然后我们运行Flink步伐,将数据写入Kudu。

确认有表后,执行 Flink 步伐:

注意事项



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4