一、前言阐明
Flink版本:1.14.6
Mysql版本:8.0
JDK版本:1.8
本实例主要实现功能如下:
模拟消息生成->Kafka->Flink->Mysql
其中Flink做数据流网络并定时批量写入到Mysql
本例使用Intellij IDEA作为项目开发的IDE。
整个项目结构如图所示:
二、项目代码已提交至 gitee
flink-kafka-mysql: Flink读取Kafka 消息并批量写入到 MySQL8.0
三、项目具体代码
POM文件内容:
application-dev.properties配置文件:
- # mysql-jdbc
- jdbc.driver=com.mysql.cj.jdbc.Driver
- jdbc.url=jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true
- jdbc.username=root
- jdbc.password=root
- jdbc.druidInitialSize=10
- jdbc.druidMaxActive=20
- jdbc.druidMinIdle=5
- jdbc.druidMaxWait=20000
- jdbc.druidTimeBetweenEvictionRunsMillis=60000
- jdbc.druidMaxEvictableIdleTimeMillis=3600000
- jdbc.druidMinEvictableIdleTimeMillis=3240000
- jdbc.druidTestWhileIdle=true
- jdbc.druidTestOnBorrow=true
- jdbc.druidTestOnReturn=false
- jdbc.druidPoolPreparedStatements=true
- jdbc.druidMaxPoolPreparedStatementPerConnectionSize=10
- jdbc.druidFilters=stat,slf4j
- jdbc.druidValidationQuery=select 1
- # kafka
- kafka.servers=127.0.0.1:9092
- kafka.consumer.groupId=kafka.consumer.group
- kafka.auto.offset.reset=latest
- kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
- kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
- kafka.user.event.topic=testTopic
复制代码 Mysql数据表创建:
- CREATE TABLE `student` (
- `id` bigint(20) NOT NULL AUTO_INCREMENT,
- `name1` varchar(50),
- `name2` varchar(50),
- `name3` varchar(50),
- `name4` varchar(50),
- `name5` varchar(50),
- `name6` varchar(50),
- `name7` varchar(50),
- `name8` varchar(50),
- `name9` varchar(50),
- `name10` varchar(50),
- `name11` varchar(50),
- `name12` varchar(50),
- `name13` varchar(50),
- `name14` varchar(50),
- `name15` varchar(50),
- `name16` varchar(50),
- `name17` varchar(50),
- `name18` varchar(50),
- `name19` varchar(50),
- `name20` varchar(50),
- `name21` varchar(50),
- `name22` varchar(50),
- `name23` varchar(50),
- `name24` varchar(50),
- `name25` varchar(50),
- `name26` varchar(50),
- `name27` varchar(50),
- `name28` varchar(50),
- `name29` varchar(50),
- `name30` varchar(50),
- `name31` varchar(50),
- `name32` varchar(50),
- `name33` varchar(50),
- `name34` varchar(50),
- `name35` varchar(50),
- `name36` varchar(50),
- `name37` varchar(50),
- `name38` varchar(50),
- `name39` varchar(50),
- `name40` varchar(50),
- `name41` varchar(50),
- `name42` varchar(50),
- `name43` varchar(50),
- `name44` varchar(50),
- `name45` varchar(50),
- `name46` varchar(50),
- `name47` varchar(50),
- `name48` varchar(50),
- `name49` varchar(50),
- `name50` varchar(50),
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='学生表';
复制代码 创建Student实体类:
创建吸取Kafka消息的实体SinkLog:
创建读取配置文件实体(SinkProperties)以及方法:
主体函数
主要实现读取kafka消息,使用map和json把消息转换为Student对象数据流
创建10秒的滚动窗口聚合Student数据,末了调用自定义sink存入至于mysql
- package com.slink;
- import com.slink.entity.Student;
- import com.slink.function.StudentProcessWindowFunction;
- import com.slink.properties.SinkProperties;
- import com.slink.util.Constant;
- import com.slink.util.KafkaSourceUtil;
- import com.slink.util.SinkFunctionUtil;
- import com.slink.util.StreamExecutionEnvironmentUtil;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.sink.SinkFunction;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- public class StudentRunner {
- public static void main(String[] args) throws Exception {
- // 创建流处理执行环境
- final StreamExecutionEnvironment env = StreamExecutionEnvironmentUtil.buildStreamExecutionEnvironment();
- // 读取环境变量 jdbc、kafka
- SinkProperties properties = new SinkProperties();
- properties.create();
- // 构建kafka源
- DataStream<Student> kafkaOut = KafkaSourceUtil.buildDataStream(properties, env, Constant.TWO);
- // 构建SinkFunction
- SinkFunction<Student> sinkFunction = SinkFunctionUtil.buildSinkFunction(properties);
- /**
- * 将清洗后的数据通过窗口去聚合(每10秒滚动窗口聚合一次)写入Mysql
- * 在数据清洗完成后,将数据写入Mysql数据库。这里我们设置了写入Mysql的并行度为2(setParallelism(2))。
- * 这意味着将有2个并发任务负责将数据写入到Mysql。由于Mysql的写入通常涉及磁盘I/O操作,设置较低的并行度可以避免I/O争用
- */
- // 全局10秒滚动窗口
- kafkaOut.rebalance().windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
- .process(new StudentProcessWindowFunction())
- .disableChaining()
- .addSink(sinkFunction)
- .name("KafkaFoMysql");
- // .setParallelism(2);
- kafkaOut.print(); //调度输出
- env.execute("flink kafka to Mysql");
- }
- }
复制代码 StreamExecutionEnvironmentUtil:
- package com.slink.util;
- import org.apache.flink.streaming.api.CheckpointingMode;
- import org.apache.flink.streaming.api.environment.CheckpointConfig;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- public class StreamExecutionEnvironmentUtil {
- private static final String statebackend_address = "file:/Users/xumingzhong/Desktop/xmz";
- public static StreamExecutionEnvironment buildStreamExecutionEnvironment(){
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- /**
- * 设置全局并行度(默认并行度)所有算子,默认的并行度就都为6。一般不会在程序中设置全局并行度。
- * 因为如果在程序中对全局并行度进行硬编码,会导致无法动态扩容。
- * 注意:由于keyBy不是算子,所以无法对keyBy设置并行度
- */
- // env.setParallelism(1);
- //每隔10s进行启动一个检查点【设置checkpoint的周期】
- env.enableCheckpointing(10000);
- //设置EXACTLY_ONCE语义,默认就是这个
- env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- //确保检查点之间有1s的时间间隔【checkpoint最小间隔】
- env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
- //检查点必须在60s之内完成,或者被丢弃【checkpoint超时时间】
- env.getCheckpointConfig().setCheckpointTimeout(60000);
- //同一时间只允许进行一次检查点
- env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
- //表示一旦Flink程序被cancel后,会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint
- env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- //设置statebackend,将检查点保存在hdfs上面,默认保存在内存中。这里先保存到本地
- // env.setStateBackend(new FsStateBackend(statebackend_address));
- return env;
- }
- }
复制代码 SinkFunctionUtil:
- package com.slink.util;
- import com.slink.entity.Student;
- import com.slink.properties.SinkProperties;
- import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
- import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
- import org.apache.flink.connector.jdbc.JdbcSink;
- import org.apache.flink.streaming.api.functions.sink.SinkFunction;
- import java.sql.PreparedStatement;
- public class SinkFunctionUtil {
- /**
- * 插入Student表SQL语句
- */
- private static final String STUDENT_INSERT = "insert into student(name1, name2, name3, name4, name5, name6, name7, name8, name9, name10, name11, name12, name13, name14, name15, name16, name17, name18, name19, name20, name21, name22, name23, name24, name25, name26, name27, name28, name29, name30, name31, name32, name33, name34, name35, name36, name37, name38, name39, name40, name41, name42, name43, name44, name45, name46, name47, name48, name49, name50) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
- /**
- * 构建SinkFunction
- */
- public static SinkFunction<Student> buildSinkFunction(SinkProperties properties){
- return JdbcSink.sink(STUDENT_INSERT, // SQL 插入语句
- (PreparedStatement ps, Student student) -> {
- ps.setString(1, student.getName1());
- ps.setString(2, student.getName2());
- ps.setString(3, student.getName3());
- ps.setString(4, student.getName4());
- ps.setString(5, student.getName5());
- ps.setString(6, student.getName6());
- ps.setString(7, student.getName7());
- ps.setString(8, student.getName8());
- ps.setString(9, student.getName9());
- ps.setString(10, student.getName10());
- ps.setString(11, student.getName11());
- ps.setString(12, student.getName12());
- ps.setString(13, student.getName13());
- ps.setString(14, student.getName14());
- ps.setString(15, student.getName15());
- ps.setString(16, student.getName16());
- ps.setString(17, student.getName17());
- ps.setString(18, student.getName18());
- ps.setString(19, student.getName19());
- ps.setString(20, student.getName20());
- ps.setString(21, student.getName21());
- ps.setString(22, student.getName22());
- ps.setString(23, student.getName23());
- ps.setString(24, student.getName24());
- ps.setString(25, student.getName25());
- ps.setString(26, student.getName26());
- ps.setString(27, student.getName27());
- ps.setString(28, student.getName28());
- ps.setString(29, student.getName29());
- ps.setString(30, student.getName30());
- ps.setString(31, student.getName31());
- ps.setString(32, student.getName32());
- ps.setString(33, student.getName33());
- ps.setString(34, student.getName34());
- ps.setString(35, student.getName35());
- ps.setString(36, student.getName36());
- ps.setString(37, student.getName37());
- ps.setString(38, student.getName38());
- ps.setString(39, student.getName39());
- ps.setString(40, student.getName40());
- ps.setString(41, student.getName41());
- ps.setString(42, student.getName42());
- ps.setString(43, student.getName43());
- ps.setString(44, student.getName44());
- ps.setString(45, student.getName45());
- ps.setString(46, student.getName46());
- ps.setString(47, student.getName47());
- ps.setString(48, student.getName48());
- ps.setString(49, student.getName49());
- ps.setString(50, student.getName50());
- },
- getJdbcExecutionOptions(properties),
- getJdbcConnectionOptions(properties)
- );
- }
- /**
- * 设置jdbc批处理
- */
- private static JdbcExecutionOptions getJdbcExecutionOptions(SinkProperties properties) {
- return JdbcExecutionOptions.builder()
- .withBatchSize(5000) // 设置批量插入大小
- .withBatchIntervalMs(200) // 设置批量插入的时间间隔-毫秒
- .withMaxRetries(3) // 设置最大重试次数
- .build();
- }
- /**
- * 设置jdbc连接
- */
- public static JdbcConnectionOptions getJdbcConnectionOptions(SinkProperties properties){
- return new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
- .withUrl(properties.getJdbcUrl())
- .withDriverName(properties.getJdbcDriver())
- .withUsername(properties.getJdbcUsername())
- .withPassword(properties.getJdbcPassword())
- //.withDataSource(dataSource) 1.17.X版本支持数据池连接配置,需JDK11
- .build();
- }
- }
复制代码 KafkaSourceUtil:
- package com.slink.util;
- import com.slink.entity.SinkLog;
- import com.slink.entity.Student;
- import com.slink.properties.SinkProperties;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.serialization.SimpleStringSchema;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
- import java.util.Properties;
- public class KafkaSourceUtil {
- /**
- * 设置Kafka源
- * Kafka 消费并行度:通过 setParallelism(x) 为从 Kafka 读取数据的操作设置了并行度为x。
- * 也就是说,Flink 将会启动 x 个并行任务来从Kafka 的 kafkaTopic 主题中消费数据。
- * 这个并行度可以根据 Kafka 分区的数量调整。如果 Kafka 有 x 个分区,那么设置并行度为 x 是合理的,
- * 这样可以保证每个分区都有一个并发实例进行处理
- */
- public static DataStream<Student> buildDataStream(SinkProperties properties, StreamExecutionEnvironment env, int parallelism) {
- // 构建kafka环境变量对象
- Properties props = new Properties();
- props.put("bootstrap.servers", properties.getKafkaServer());
- props.put("group.id", properties.getKafkaConsumerGroupId());
- props.put("auto.offset.reset", properties.getKafkaAutoOffsetReset());
- return env.addSource(new FlinkKafkaConsumer<>(
- properties.getKafkaUserEventTopic(),
- new SimpleStringSchema(),
- props)).setParallelism(parallelism)
- .map(new MapFunction<String, Student>() {
- @Override
- public Student map(String val) {
- // 解析字符串转换Student对象
- return SinkLog.build(val);
- }
- }).filter(v -> v != null);
- }
- }
复制代码 工具类:EmptyNullUtil
- package com.slink.util;
- import org.apache.commons.lang3.StringUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.lang.reflect.Field;
- import java.lang.reflect.Method;
- public class EmptyNullUtil {
- private static final Logger log = LoggerFactory.getLogger(EmptyNullUtil.class);
- /**
- * 将属性类型为String,值为null的设置为 ""
- */
- public static <T> void stringNullToEmpty(T t) {
- if (null == t) {
- return;
- }
- Field[] declaredFields = t.getClass().getDeclaredFields();
- for (Field field : declaredFields) {
- field.setAccessible(true);
- if (field.getType().equals(String.class)) {
- // 将属性的首字母大写
- String methodName = field.getName().replaceFirst(field.getName().substring(0, 1), field.getName().substring(0, 1).toUpperCase());
- try {
- Method methodGet = t.getClass().getMethod("get" + methodName);
- // 调用getter方法获取属性值
- String str = (String) methodGet.invoke(t);
- if (StringUtils.isBlank(str)) {
- // 如果为null的String类型的属性则重新复制为空字符串
- field.set(t, field.getType().getConstructor(field.getType()).newInstance(StringUtils.EMPTY));
- }
- } catch (Exception e) {
- log.warn("[EmptyNullUtil.stringBlankToNull] e:{}", e);
- }
- }
- }
- }
- }
复制代码 窗口函数自定义处理类:StudentProcessWindowFunction
- package com.slink.function;
- import com.slink.StudentRunner;
- import com.slink.entity.Student;
- import org.apache.commons.compress.utils.Lists;
- import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
- import org.apache.flink.util.Collector;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.List;
- import java.util.stream.Collectors;
- import java.util.stream.StreamSupport;
- public class StudentProcessWindowFunction extends ProcessAllWindowFunction<Student, Student, TimeWindow> {
- private static final Logger log = LoggerFactory.getLogger(StudentRunner.class);
- @Override
- public void process(ProcessAllWindowFunction<Student, Student, TimeWindow>.Context context, Iterable<Student> iterable, Collector<Student> out) {
- log.info("窗口聚合数据条数:{}", StreamSupport.stream(iterable.spliterator(), Boolean.FALSE).collect(Collectors.toList()).size());
- // 直接将窗口内的每个 Student 输出
- for (Student order : iterable) {
- out.collect(order);
- }
- }
- }
复制代码 四、运行测试
模拟数据每秒推送一条数据至Kafka:
- package com.slink;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringSerializer;
- import java.util.Properties;
- public class Test2 {
- public static void main(String[] args) {
- String value = "{ "timestamp": "2024-11-05T17:38:56+08:00", "remoteAddr": "61.132.71.10","costime": "0.000","realtime": "0.000","status": 405,"xForwarded": "","referer": "","request": "POST /sink HTTP/1.1","upstrAddr": "127.0.0.1:80","bytes":157,"requestBody":{"name39":"拓扑丝路39号","name38":"拓扑丝路38号","name37":"拓扑丝路37号","name36":"拓扑丝路36号","name35":"拓扑丝路35号","name34":"拓扑丝路34号","name33":"拓扑丝路33号","name32":"拓扑丝路32号","name31":"拓扑丝路31号","name30":"拓扑丝路30号","name29":"拓扑丝路29号","name28":"拓扑丝路28号","name27":"拓扑丝路27号","name6":"拓扑丝路6号","name26":"拓扑丝路26号","name5":"拓扑丝路5号","name25":"拓扑丝路25号","name4":"拓扑丝路4号","name24":"拓扑丝路24号","name3":"拓扑丝路3号","name23":"拓扑丝路23号","name22":"拓扑丝路22号","name9":"拓扑丝路9号","name21":"拓扑丝路21号","name8":"拓扑丝路8号","name20":"拓扑丝路20号","name7":"拓扑丝路7号","name2":"拓扑丝路2号","name1":"拓扑丝路1号","name19":"拓扑丝路19号","name18":"拓扑丝路18号","name17":"拓扑丝路17号","name16":"拓扑丝路16号","name15":"拓扑丝路15号","name14":"拓扑丝路14号","name13":"拓扑丝路13号","name12":"拓扑丝路12号","name11":"拓扑丝路11号","name10":"拓扑丝路10号","name50":"拓扑丝路50号","name49":"拓扑丝路49号","name48":"拓扑丝路48号","name47":"拓扑丝路47号","name46":"拓扑丝路46号","name45":"拓扑丝路45号","name44":"拓扑丝路44号","name43":"拓扑丝路43号","name42":"拓扑丝路42号","name41":"拓扑丝路41号","name40":"拓扑丝路40号"},"agent": "PostmanRuntime/7.42.0" }";
- //创建生产者
- Properties properties = new Properties();
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
- //优化参数
- properties.put(ProducerConfig.BATCH_SIZE_CONFIG,1024*1024);//生产者尝试缓存记录,为每一个分区缓存一个mb的数据
- properties.put(ProducerConfig.LINGER_MS_CONFIG,500);//最多等待0.5秒.
- KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
- for(int i=0; i<10000; i++){
- ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "key001", value);
- kafkaProducer.send(record);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- kafkaProducer.flush();
- kafkaProducer.close();
- }
- }
复制代码 运行主体函数:StudentRunner

查询Mysql日记

实例运行乐成
五、总结
本文实例了实现了从Kafka及时读取数据,根据定制化处理数据,通过flink窗口模式批量写入数据库mysql,可根据自身需求写入其他存储(ES、Redis等)。该实例适合在对数据库及时性要求不高,大概是准及时数据分析时的场景。如若数据量大的环境下,聚合十秒钟数据达万条,那么这样批量写会比单条性能进步很多倍。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |