Flink mongo & Kafka

农民  金牌会员 | 2024-6-19 22:46:27 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 797|帖子 797|积分 2391

Apache Flink 是一个流处置惩罚和批处置惩罚的开源平台,用于在分布式环境中处置惩罚无界和有界数据流。它提供了用于数据处置惩罚的数据流 API(DataStream API)和表 API(Table API),并可以与各种外部数据源和存储系统进行交互。
MongoDB 是一个基于文档的 NoSQL 数据库,它提供了高性能、可扩展和机动的数据存储。而 Apache Kafka 是一个流处置惩罚平台,它允许发布和订阅记录流,类似于消息队列或企业消息系统。
当 Flink 与 MongoDB 和 Kafka 结合使用时,可以构建强大的数据处置惩罚管道,用于实时数据流分析和批处置惩罚使命。以下是这些组件结合使用时可能的一些用途:

  • Flink 与 Kafka:


  • Flink 可以作为 Kafka 的消费者(Consumer),从 Kafka 主题(Topics)中读取数据流,并对其进行实时处置惩罚。
  • Flink 也可以将数据写入 Kafka,使其成为一个中间存储或数据通报的桥梁。
  • 通过 Flink 的时间窗口和状态管理等特性,可以对 Kafka 中的数据流进行复杂的实时分析。

  • Flink 与 MongoDB:


  • Flink 可以从 MongoDB 中读取数据,用于批处置惩罚或实时分析。
  • Flink 也可以将处置惩罚后的数据写入 MongoDB,用于长期化存储或进一步的数据分析。
    使用 Flink 的表 API(Table API)和 SQL 支持,可以方便地对 MongoDB 中的数据进行查询和分析。

  • Kafka、Flink 和 MongoDB 结合使用:


  • Kafka 可以作为数据源,提供实时数据流给 Flink 进行处置惩罚。
  • Flink 对 Kafka 中的数据流进行实时分析,并可能将效果写入 MongoDB 进行存储。
  • MongoDB 中的数据也可以作为 Flink 批处置惩罚使命的输入,用于汗青数据分析或与其他数据源进行连合分析。
MONGO 2 KAFKA

下面例子是从mongo获取数据插入到kafka:
代码:
  1. public class MongoDBToKafka {
  2.     public static void main(String[] args) throws Exception {
  3.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         // 配置MongoDB源
  5.         MongoSource<String> mongoSource = MongoSource.<String>builder()
  6.                 .setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin")
  7.                 .setDatabase("sjzz")
  8.                 .setCollection("wellCastingInfo")
  9. //                .setProjectedFields("_id", "f0", "f1")
  10.                 .setFetchSize(2048)
  11.                 .setLimit(10000)
  12.                 .setNoCursorTimeout(true)
  13.                 .setPartitionStrategy(PartitionStrategy.SAMPLE)
  14.                 .setPartitionSize(MemorySize.ofMebiBytes(64))
  15.                 .setSamplesPerPartition(10)
  16.                 .setDeserializationSchema(new MongoDeserializationSchema<String>() {
  17.                     @Override
  18.                     public String deserialize(BsonDocument document) {
  19.                         return document.toJson();
  20.                     }
  21.                     @Override
  22.                     public TypeInformation<String> getProducedType() {
  23.                         return BasicTypeInfo.STRING_TYPE_INFO;
  24.                     }
  25.                 })
  26.                 .build();
  27.         // 创建MongoDB数据流
  28.         DataStream<String> sourceStream = env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "kafka Mongo Source");
  29. //        env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDB-Source")
  30. //                .setParallelism(2)
  31. //                .print()
  32. //                .setParallelism(1);
  33.         // 配置Kafkasink
  34.         KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
  35.                 .setBootstrapServers("localhost:9092")
  36. //                .setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  37. //                .setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  38.                 // 如果你使用String类型的键
  39.                 .setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  40.                 // 如果你使用byte[]类型的值
  41.                 .setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
  42.                 .setRecordSerializer(KafkaRecordSerializationSchema.builder()
  43.                         .setTopic(Constants.TOPIC_NAME)
  44.                         .setValueSerializationSchema(new SimpleStringSchema())
  45.                         .build()
  46.                 )
  47.                 .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  48.                 .build();
  49.         // 将数据流写入Kafka
  50.         sourceStream.sinkTo(kafkaSink);
  51.         // 执行任务
  52.         env.execute("MongoDB to Kafka");
  53.     }
  54. }
复制代码
pom.xml
  1.                 <dependency>
  2.             <groupId>org.apache.flink</groupId>
  3.             <artifactId>flink-connector-mongodb</artifactId>
  4.         </dependency>
  5.         <dependency>
  6.             <groupId>org.apache.flink</groupId>
  7.             <artifactId>flink-connector-base</artifactId>
  8.         </dependency>
  9.         <dependency>
  10.             <groupId>org.apache.flink</groupId>
  11.             <artifactId>flink-streaming-java</artifactId>
  12.         </dependency>
复制代码
KAFKA 2 FILE

从kafka获取数据写入到本地文件
代码:
  1. public class KafkaToWriteText {
  2.     public static void main(String[] args) throws Exception {
  3.         // 1. 设置 Flink 执行环境
  4.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5.         String brokers = "localhost:9092";
  6.         KafkaSource<String> source = KafkaSource.<String>builder()
  7.                 .setBootstrapServers(brokers)
  8.                 .setTopics(TOPIC_NAME)
  9.                 .setGroupId("my-group")
  10.                 .setStartingOffsets(OffsetsInitializer.earliest())
  11.                 .setValueOnlyDeserializer(new SimpleStringSchema())
  12.                 .build();
  13.         DataStreamSource<String> rs = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
  14.         // 创建RollingFileSink
  15.         String outputPath = "sink.csv";
  16.         FileSink<String> sink = FileSink
  17.                 .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
  18.                 .withRollingPolicy(
  19.                         DefaultRollingPolicy.builder()
  20.                                 .withRolloverInterval(Duration.ofMinutes(15))
  21.                                 .withInactivityInterval(Duration.ofMinutes(5))
  22.                                 .withMaxPartSize(MemorySize.ofMebiBytes(1024))
  23.                                 .build())
  24.                 .build();
  25.         rs.sinkTo(sink);
  26.         // 6. 执行 Flink 作业
  27.         env.execute("Kafka Flink Job");
  28.     }
  29. }
复制代码
pom.xml
  1.                 <dependency>
  2.             <groupId>org.apache.flink</groupId>
  3.             <artifactId>flink-connector-files</artifactId>
  4.         </dependency>
  5.         <dependency>
  6.             <groupId>org.apache.flink</groupId>
  7.             <artifactId>flink-connector-base</artifactId>
  8.         </dependency>
  9.         <dependency>
  10.             <groupId>org.apache.flink</groupId>
  11.             <artifactId>flink-streaming-java</artifactId>
  12.         </dependency>
复制代码
KAFKA 部署


  • 下载地点:
    https://downloads.apache.org/kafka/3.7.0/kafka_2.12-3.7.0.tgz
  • 运行zookeeper
  1. # Start the ZooKeeper service
  2. $ bin/zookeeper-server-start.sh config/zookeeper.properties
复制代码

  • 运行kafka
  1. # Start the Kafka broker service
  2. $ bin/kafka-server-start.sh config/server.properties
复制代码

  • 验证
  1. # 接受信息
  2. kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC_WellCastingInfo --from-beginning
  3. # 发送信息
  4. kafka-console-producer.sh --bootstrap-server localhost:9092 --topic TOPIC_WellCastingInfo
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

农民

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表