flink sink kafka

[复制链接]
发表于 2025-11-23 20:02:29 | 显示全部楼层 |阅读模式
接上文:一文说清flink从编码到摆设上线
之前写了kafka source,如今增补kafka sink。完满kafka干系利用。
    环境分析:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;利用体系:CentOS 7.6;JDK:1.8.0_401;kafka_2.12-2.5.0。
  1. kafka 创建 topic

topic:rv-test-sink。

2.添加依赖

  1. <!--flink cdc kafka 相关依赖-->
  2.         <dependency>
  3.             <groupId>org.apache.flink</groupId>
  4.             <artifactId>flink-connector-kafka_2.11</artifactId>
  5.             <version>${flink.version}</version>
  6.         </dependency>
复制代码
3.创建运行环境

  1. package com.zl.utils;
  2. import org.apache.flink.configuration.Configuration;
  3. import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
  4. import org.apache.flink.streaming.api.CheckpointingMode;
  5. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  8. import java.time.Duration;
  9. import java.time.ZoneOffset;
  10. import java.util.concurrent.TimeUnit;
  11. /**
  12. * EnvUtil
  13. * @description:
  14. */
  15. public class EnvUtil {
  16.     /**
  17.      * 设置flink执行环境
  18.      * @param parallelism 并行度
  19.      */
  20.     public static StreamExecutionEnvironment setFlinkEnv(int parallelism) {
  21.         // System.setProperty("HADOOP_USER_NAME", "用户名") 对应的是 hdfs文件系统目录下的路径:/user/用户名的文件夹名,本文为root
  22.         System.setProperty("HADOOP_USER_NAME", "root");
  23.         Configuration conf = new Configuration();
  24.         conf.setInteger("rest.port", 1000);
  25.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
  26.         if (parallelism >0 ){
  27.             //设置并行度
  28.             env.setParallelism(parallelism);
  29.         } else {
  30.             env.setParallelism(1);// 默认1
  31.         }
  32.         // 添加重启机制
  33. //        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, Time.minutes(6)));
  34.         // 没有这个配置,会导致“Flink 任务没报错,但是无法同步数据到doris”。
  35.         // 启动checkpoint,设置模式为精确一次 (这是默认值),10*60*1000=60000
  36.         env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
  37.         //rocksdb状态后端,启用增量checkpoint
  38.         env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
  39.         //设置checkpoint路径
  40.         CheckpointConfig checkpointConfig = env.getCheckpointConfig();
  41.         // 同一时间只允许一个 checkpoint 进行(默认)
  42.         checkpointConfig.setMaxConcurrentCheckpoints(1);
  43.         //最小间隔,10*60*1000=60000
  44.         checkpointConfig.setMinPauseBetweenCheckpoints(60000);
  45.         // 取消任务后,checkpoint仍然保存
  46.         checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  47.         //checkpoint容忍失败的次数
  48.         checkpointConfig.setTolerableCheckpointFailureNumber(5);
  49.         //checkpoint超时时间 默认10分钟
  50.         checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
  51.         //禁用operator chain(方便排查反压)
  52.         env.disableOperatorChaining();
  53.         return env;
  54.     }
  55.     public static StreamTableEnvironment getFlinkTenv(StreamExecutionEnvironment env) {
  56.         StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
  57.         //设置时区 东八
  58.         tenv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));
  59.         Configuration configuration = tenv.getConfig().getConfiguration();
  60.         // 开启miniBatch
  61.         configuration.setString("table.exec.mini-batch.enabled", "true");
  62.         // 批量输出的间隔时间
  63.         configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
  64.         // 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条
  65.         configuration.setString("table.exec.mini-batch.size", "20000");
  66.         // 开启LocalGlobal
  67.         configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
  68.         //设置TTL API指定
  69.         tenv.getConfig().setIdleStateRetention(Duration.ofHours(25));
  70.         return tenv;
  71.     }
  72. }
复制代码
4.核心代码

  1. package com.zl.kafka;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.zl.utils.EnvUtil;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
  6. import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
  7. import org.apache.kafka.clients.producer.ProducerRecord;
  8. import javax.annotation.Nullable;
  9. import java.nio.charset.StandardCharsets;
  10. import java.util.Properties;
  11. public class KafkaExampleSink {
  12.     public static void main(String[] args) throws Exception {
  13.         // 配置运行环境,并行度1
  14.         StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);
  15.         // 程序间隔离,每个程序单独设置
  16.         env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/KafkaExampleSink");
  17.         /// ===== 构造kafka sink =====
  18.         // 相关参数配置可以参考下面这两个文档:①https://cloud.tencent.com/developer/article/2089393
  19.         // ②https://www.bilibili.com/opus/819228616166473783
  20.         // kafka配置
  21.         Properties prop = new Properties();
  22.         prop.setProperty("bootstrap.servers", "10.86.97.21:9092,10.86.97.21:9093,10.86.97.21:9094");
  23.         // 当设置为“true”时,生产者将确保流中只写入每条消息的一个副本。
  24.         prop.setProperty("enable.idempotence", "true");
  25.         // 指定了生产者在接收到服务器相应之前可以发送多个消息,值越高,占用的内存越大,
  26.         // 当然也可以提升吞吐量,发生错误时,可能会造成数据的发送顺序改变,其默认值是5.
  27.         prop.setProperty("max.in.flight.requests.per.connection", "5");
  28.         prop.setProperty("acks", "all");
  29.         // 在kafka中消息发送失败时,指定生产者可以重发消息的次数,默认情况下,
  30.         // 生产者在每次重试之间默认等待100ms,可以通过参数retey.backoff.ms参数来改变这个时间间隔。retries的缺省值:0.
  31.         prop.setProperty("retries", "5");
  32.         // 事务超时时间
  33.         prop.setProperty("transaction.timeout.ms", 15 * 60 * 1000 + "");
  34.         String topic = "rv-test-sink";
  35.         FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<String>(
  36.                 topic,// topic
  37.                 new KafkaSerializationSchema<String>() {
  38.                     @Override
  39.                     public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
  40.                         return new ProducerRecord<>(topic, s.getBytes(StandardCharsets.UTF_8));
  41.                     }
  42.                 },
  43.                 prop,
  44.                 FlinkKafkaProducer.Semantic.EXACTLY_ONCE
  45.         );
  46.         /// ===== 构造模拟数据 =====
  47.         JSONObject rvJsonObject = new JSONObject();
  48.         rvJsonObject.put("dt","2024-12-20");// 日期取当天
  49.         rvJsonObject.put("uuid","data-stream-1");
  50.         rvJsonObject.put("report_time",1733881971621L);
  51.         String mockJson = JSONObject.toJSONString(rvJsonObject);
  52.         /// ===== sink kafka =====
  53.         env.fromElements(mockJson).addSink(flinkKafkaProducer).setParallelism(3).name("kafka-sink").uid("kafka-sink");
  54.         env.execute("kafka-sink-job");
  55.     }// main
  56. }
复制代码
5.运行

由于不是一连输入流,运行完会竣事。

sink到kafka的数据如下:

6.完备代码

完备代码见:完备代码

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表