ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flink mongo & Kafka [打印本页]

作者: 农民    时间: 2024-6-19 22:46
标题: Flink mongo & Kafka
Apache Flink 是一个流处置惩罚和批处置惩罚的开源平台,用于在分布式环境中处置惩罚无界和有界数据流。它提供了用于数据处置惩罚的数据流 API(DataStream API)和表 API(Table API),并可以与各种外部数据源和存储系统进行交互。
MongoDB 是一个基于文档的 NoSQL 数据库,它提供了高性能、可扩展和机动的数据存储。而 Apache Kafka 是一个流处置惩罚平台,它允许发布和订阅记录流,类似于消息队列或企业消息系统。
当 Flink 与 MongoDB 和 Kafka 结合使用时,可以构建强大的数据处置惩罚管道,用于实时数据流分析和批处置惩罚使命。以下是这些组件结合使用时可能的一些用途:



MONGO 2 KAFKA

下面例子是从mongo获取数据插入到kafka:
代码:
  1. public class MongoDBToKafka {
  2.     public static void main(String[] args) throws Exception {
  3.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         // 配置MongoDB源
  5.         MongoSource<String> mongoSource = MongoSource.<String>builder()
  6.                 .setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin")
  7.                 .setDatabase("sjzz")
  8.                 .setCollection("wellCastingInfo")
  9. //                .setProjectedFields("_id", "f0", "f1")
  10.                 .setFetchSize(2048)
  11.                 .setLimit(10000)
  12.                 .setNoCursorTimeout(true)
  13.                 .setPartitionStrategy(PartitionStrategy.SAMPLE)
  14.                 .setPartitionSize(MemorySize.ofMebiBytes(64))
  15.                 .setSamplesPerPartition(10)
  16.                 .setDeserializationSchema(new MongoDeserializationSchema<String>() {
  17.                     @Override
  18.                     public String deserialize(BsonDocument document) {
  19.                         return document.toJson();
  20.                     }
  21.                     @Override
  22.                     public TypeInformation<String> getProducedType() {
  23.                         return BasicTypeInfo.STRING_TYPE_INFO;
  24.                     }
  25.                 })
  26.                 .build();
  27.         // 创建MongoDB数据流
  28.         DataStream<String> sourceStream = env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "kafka Mongo Source");
  29. //        env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDB-Source")
  30. //                .setParallelism(2)
  31. //                .print()
  32. //                .setParallelism(1);
  33.         // 配置Kafkasink
  34.         KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
  35.                 .setBootstrapServers("localhost:9092")
  36. //                .setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  37. //                .setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  38.                 // 如果你使用String类型的键
  39.                 .setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  40.                 // 如果你使用byte[]类型的值
  41.                 .setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
  42.                 .setRecordSerializer(KafkaRecordSerializationSchema.builder()
  43.                         .setTopic(Constants.TOPIC_NAME)
  44.                         .setValueSerializationSchema(new SimpleStringSchema())
  45.                         .build()
  46.                 )
  47.                 .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  48.                 .build();
  49.         // 将数据流写入Kafka
  50.         sourceStream.sinkTo(kafkaSink);
  51.         // 执行任务
  52.         env.execute("MongoDB to Kafka");
  53.     }
  54. }
复制代码
pom.xml
  1.                 <dependency>
  2.             <groupId>org.apache.flink</groupId>
  3.             <artifactId>flink-connector-mongodb</artifactId>
  4.         </dependency>
  5.         <dependency>
  6.             <groupId>org.apache.flink</groupId>
  7.             <artifactId>flink-connector-base</artifactId>
  8.         </dependency>
  9.         <dependency>
  10.             <groupId>org.apache.flink</groupId>
  11.             <artifactId>flink-streaming-java</artifactId>
  12.         </dependency>
复制代码
KAFKA 2 FILE

从kafka获取数据写入到本地文件
代码:
  1. public class KafkaToWriteText {
  2.     public static void main(String[] args) throws Exception {
  3.         // 1. 设置 Flink 执行环境
  4.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5.         String brokers = "localhost:9092";
  6.         KafkaSource<String> source = KafkaSource.<String>builder()
  7.                 .setBootstrapServers(brokers)
  8.                 .setTopics(TOPIC_NAME)
  9.                 .setGroupId("my-group")
  10.                 .setStartingOffsets(OffsetsInitializer.earliest())
  11.                 .setValueOnlyDeserializer(new SimpleStringSchema())
  12.                 .build();
  13.         DataStreamSource<String> rs = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
  14.         // 创建RollingFileSink
  15.         String outputPath = "sink.csv";
  16.         FileSink<String> sink = FileSink
  17.                 .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
  18.                 .withRollingPolicy(
  19.                         DefaultRollingPolicy.builder()
  20.                                 .withRolloverInterval(Duration.ofMinutes(15))
  21.                                 .withInactivityInterval(Duration.ofMinutes(5))
  22.                                 .withMaxPartSize(MemorySize.ofMebiBytes(1024))
  23.                                 .build())
  24.                 .build();
  25.         rs.sinkTo(sink);
  26.         // 6. 执行 Flink 作业
  27.         env.execute("Kafka Flink Job");
  28.     }
  29. }
复制代码
pom.xml
  1.                 <dependency>
  2.             <groupId>org.apache.flink</groupId>
  3.             <artifactId>flink-connector-files</artifactId>
  4.         </dependency>
  5.         <dependency>
  6.             <groupId>org.apache.flink</groupId>
  7.             <artifactId>flink-connector-base</artifactId>
  8.         </dependency>
  9.         <dependency>
  10.             <groupId>org.apache.flink</groupId>
  11.             <artifactId>flink-streaming-java</artifactId>
  12.         </dependency>
复制代码
KAFKA 部署

  1. # Start the ZooKeeper service
  2. $ bin/zookeeper-server-start.sh config/zookeeper.properties
复制代码
  1. # Start the Kafka broker service
  2. $ bin/kafka-server-start.sh config/server.properties
复制代码
  1. # 接受信息
  2. kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC_WellCastingInfo --from-beginning
  3. # 发送信息
  4. kafka-console-producer.sh --bootstrap-server localhost:9092 --topic TOPIC_WellCastingInfo
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4