点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(已更完)
- ClickHouse(已更完)
- Kudu(正在更新…)
章节内容
上节我们完成了如下的内容:
- Kudu Java API
- 增删改查 编写案例测试
实现思路
将数据从 Flink 下沉到 Kudu 的基本思路如下:
- 环境准备:确保 Flink 和 Kudu 环境正常运行,并设置好相干依赖。
- 创建 Kudu 表:在 Kudu 中界说要存储的数据表,包括主键和列类型。
- 数据流计划:使用 Flink 的 DataStream API 读取输入数据流,进行必要的数据处置惩罚和转换。
- 写入 Kudu:通过 Kudu 的毗连器将处置惩罚后的数据写入 Kudu 表。需要设置 Kudu 客户端和表的相干信息。
- 执行作业:启动 Flink 作业,实时将数据流中的数据写入 Kudu,便于后续查询和分析。
添加依赖
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>org.example</groupId>
- <artifactId>flink-test</artifactId>
- <version>1.0-SNAPSHOT</version>
- <properties>
- <maven.compiler.source>11</maven.compiler.source>
- <maven.compiler.target>11</maven.compiler.target>
- <flink.version>1.11.1</flink.version>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.12</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.12</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kudu</groupId>
- <artifactId>kudu-client</artifactId>
- <version>1.17.0</version>
- </dependency>
- </dependencies>
- </project>
复制代码 数据源
- new UserInfo("001", "Jack", 18),
- new UserInfo("002", "Rose", 20),
- new UserInfo("003", "Cris", 22),
- new UserInfo("004", "Lily", 19),
- new UserInfo("005", "Lucy", 21),
- new UserInfo("006", "Json", 24),
复制代码 自界说下沉器
- package icu.wzk.kudu;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import org.apache.kudu.Schema;
- import org.apache.kudu.Type;
- import org.apache.kudu.client.*;
- import org.apache.log4j.Logger;
- import java.io.ByteArrayOutputStream;
- import java.io.ObjectOutputStream;
- import java.util.Map;
- public class MyFlinkSinkToKudu extends RichSinkFunction<Map<String, Object>> {
- private final static Logger logger = Logger.getLogger("MyFlinkSinkToKudu");
- private KuduClient kuduClient;
- private KuduTable kuduTable;
- private String kuduMasterAddr;
- private String tableName;
- private Schema schema;
- private KuduSession kuduSession;
- private ByteArrayOutputStream out;
- private ObjectOutputStream os;
- public MyFlinkSinkToKudu(String kuduMasterAddr, String tableName) {
- this.kuduMasterAddr = kuduMasterAddr;
- this.tableName = tableName;
- }
- @Override
- public void open(Configuration parameters) throws Exception {
- out = new ByteArrayOutputStream();
- os = new ObjectOutputStream(out);
- kuduClient = new KuduClient.KuduClientBuilder(kuduMasterAddr).build();
- kuduTable = kuduClient.openTable(tableName);
- schema = kuduTable.getSchema();
- kuduSession = kuduClient.newSession();
- kuduSession.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
- }
- @Override
- public void invoke(Map<String, Object> map, Context context) throws Exception {
- if (null == map) {
- return;
- }
- try {
- int columnCount = schema.getColumnCount();
- Insert insert = kuduTable.newInsert();
- PartialRow row = insert.getRow();
- for (int i = 0; i < columnCount; i ++) {
- Object value = map.get(schema.getColumnByIndex(i).getName());
- insertData(row, schema.getColumnByIndex(i).getType(), schema.getColumnByIndex(i).getName(), value);
- OperationResponse response = kuduSession.apply(insert);
- if (null != response) {
- logger.error(response.getRowError().toString());
- }
- }
- } catch (Exception e) {
- logger.error(e);
- }
- }
- @Override
- public void close() throws Exception {
- try {
- kuduSession.close();
- kuduClient.close();
- os.close();
- out.close();
- } catch (Exception e) {
- logger.error(e);
- }
- }
- private void insertData(PartialRow row, Type type, String columnName, Object value) {
- try {
- switch (type) {
- case STRING:
- row.addString(columnName, value.toString());
- return;
- case INT32:
- row.addInt(columnName, Integer.valueOf(value.toString()));
- return;
- case INT64:
- row.addLong(columnName, Long.valueOf(value.toString()));
- return;
- case DOUBLE:
- row.addDouble(columnName, Double.valueOf(value.toString()));
- return;
- case BOOL:
- row.addBoolean(columnName, Boolean.valueOf(value.toString()));
- return;
- case BINARY:
- os.writeObject(value);
- row.addBinary(columnName, out.toByteArray());
- return;
- case FLOAT:
- row.addFloat(columnName, Float.valueOf(value.toString()));
- default:
- throw new UnsupportedOperationException("Unknown Type: " + type);
- }
- } catch (Exception e) {
- logger.error("插入数据异常: " + e);
- }
- }
- }
复制代码 编写实体
- package icu.wzk.kudu;
- public class UserInfo {
- private String id;
- private String name;
- private Integer age;
- public UserInfo(String id, String name, Integer age) {
- this.id = id;
- this.name = name;
- this.age = age;
- }
- public String getId() {
- return id;
- }
- public void setId(String id) {
- this.id = id;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public Integer getAge() {
- return age;
- }
- public void setAge(Integer age) {
- this.age = age;
- }
- }
复制代码 执行建表
- package icu.wzk.kudu;
- import org.apache.kudu.ColumnSchema;
- import org.apache.kudu.Schema;
- import org.apache.kudu.Type;
- import org.apache.kudu.client.CreateTableOptions;
- import org.apache.kudu.client.KuduClient;
- import org.apache.kudu.client.KuduException;
- import java.util.ArrayList;
- import java.util.List;
- public class KuduCreateTable {
- public static void main(String[] args) throws KuduException {
- String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
- KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(masterAddress);
- KuduClient kuduClient = kuduClientBuilder.build();
- String tableName = "user";
- List<ColumnSchema> columnSchemas = new ArrayList<>();
- ColumnSchema id = new ColumnSchema
- .ColumnSchemaBuilder("id", Type.INT32)
- .key(true)
- .build();
- columnSchemas.add(id);
- ColumnSchema name = new ColumnSchema
- .ColumnSchemaBuilder("name", Type.STRING)
- .key(false)
- .build();
- columnSchemas.add(name);
- ColumnSchema age = new ColumnSchema
- .ColumnSchemaBuilder("age", Type.INT32)
- .key(false)
- .build();
- columnSchemas.add(age);
- Schema schema = new Schema(columnSchemas);
- CreateTableOptions options = new CreateTableOptions();
- // 副本数量为1
- options.setNumReplicas(1);
- List<String> colrule = new ArrayList<>();
- colrule.add("id");
- options.addHashPartitions(colrule, 3);
- kuduClient.createTable(tableName, schema, options);
- kuduClient.close();
- }
- }
复制代码 主逻辑代码
- package icu.wzk.kudu;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.stream.Stream;
- public class SinkToKuduTest {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<UserInfo> dataSource = env.fromElements(
- new UserInfo("001", "Jack", 18),
- new UserInfo("002", "Rose", 20),
- new UserInfo("003", "Cris", 22),
- new UserInfo("004", "Lily", 19),
- new UserInfo("005", "Lucy", 21),
- new UserInfo("006", "Json", 24)
- );
- SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource
- .map(new MapFunction<UserInfo, Map<String, Object>>() {
- @Override
- public Map<String, Object> map(UserInfo value) throws Exception {
- Map<String, Object> map = new HashMap<>();
- map.put("id", value.getId());
- map.put("name", value.getName());
- map.put("age", value.getAge());
- return map;
- }
- });
- String kuduMasterAddr = "localhost:7051,localhost:7151,localhost:7251";
- String tableInfo = "user";
- mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));
- env.execute("SinkToKuduTest");
- }
- }
复制代码 解释分析
环境设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();:初始化 Flink 的执行环境,这是 Flink 应用的入口。
数据源创建
DataStreamSource dataSource = env.fromElements(…):创建了一个包罗多个 UserInfo 对象的数据源,模拟了一个输入流。
数据转换
SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource.map(…):使用 map 函数将 UserInfo 对象转换为 Map<String, Object>,便于后续处置惩罚和写入 Kudu。每个 UserInfo 的属性都被放入一个 HashMap 中。
Kudu 设置信息
String kuduMasterAddr = “localhost:7051,localhost:7151,localhost:7251”; 和 String tableInfo = “user”;:界说 Kudu 的主节点地点和目标表的信息。
数据下沉
mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));:将转换后的数据流添加到 Kudu 的自界说 Sink 中。MyFlinkSinkToKudu 类应该实现了将数据写入 Kudu 的逻辑。
执行作业
env.execute(“SinkToKuduTest”);:启动 Flink 作业,执行整个数据流处置惩罚流程。
测试运行
我们建表之后,确认user表存在。然后我们运行Flink步伐,将数据写入Kudu。
确认有表后,执行 Flink 步伐:
注意事项
- 并发性:根据 Kudu 集群的规模和设置,可以调解 Flink 作业的并发性,以进步写入性能。
- 批量写入:Kudu 支持批量插入,可以通过适当设置 Flink 的 sink 来进步性能。
- 故障处置惩罚:确保在作业中处置惩罚异常和重试逻辑,以确保数据不会丢失。
- 监控与调试:使用 Flink 的监控工具和 Kudu 的工具(如 Kudu UI)来监控数据流和性能。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |