Apache Flink 是一个流处置惩罚和批处置惩罚的开源平台,用于在分布式环境中处置惩罚无界和有界数据流。它提供了用于数据处置惩罚的数据流 API(DataStream API)和表 API(Table API),并可以与各种外部数据源和存储系统进行交互。
MongoDB 是一个基于文档的 NoSQL 数据库,它提供了高性能、可扩展和机动的数据存储。而 Apache Kafka 是一个流处置惩罚平台,它允许发布和订阅记录流,类似于消息队列或企业消息系统。
当 Flink 与 MongoDB 和 Kafka 结合使用时,可以构建强大的数据处置惩罚管道,用于实时数据流分析和批处置惩罚使命。以下是这些组件结合使用时可能的一些用途:
- Flink 可以作为 Kafka 的消费者(Consumer),从 Kafka 主题(Topics)中读取数据流,并对其进行实时处置惩罚。
- Flink 也可以将数据写入 Kafka,使其成为一个中间存储或数据通报的桥梁。
- 通过 Flink 的时间窗口和状态管理等特性,可以对 Kafka 中的数据流进行复杂的实时分析。
- Flink 可以从 MongoDB 中读取数据,用于批处置惩罚或实时分析。
- Flink 也可以将处置惩罚后的数据写入 MongoDB,用于长期化存储或进一步的数据分析。
使用 Flink 的表 API(Table API)和 SQL 支持,可以方便地对 MongoDB 中的数据进行查询和分析。
- Kafka、Flink 和 MongoDB 结合使用:
- Kafka 可以作为数据源,提供实时数据流给 Flink 进行处置惩罚。
- Flink 对 Kafka 中的数据流进行实时分析,并可能将效果写入 MongoDB 进行存储。
- MongoDB 中的数据也可以作为 Flink 批处置惩罚使命的输入,用于汗青数据分析或与其他数据源进行连合分析。
MONGO 2 KAFKA
下面例子是从mongo获取数据插入到kafka:
代码:
- public class MongoDBToKafka {
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // 配置MongoDB源
- MongoSource<String> mongoSource = MongoSource.<String>builder()
- .setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin")
- .setDatabase("sjzz")
- .setCollection("wellCastingInfo")
- // .setProjectedFields("_id", "f0", "f1")
- .setFetchSize(2048)
- .setLimit(10000)
- .setNoCursorTimeout(true)
- .setPartitionStrategy(PartitionStrategy.SAMPLE)
- .setPartitionSize(MemorySize.ofMebiBytes(64))
- .setSamplesPerPartition(10)
- .setDeserializationSchema(new MongoDeserializationSchema<String>() {
- @Override
- public String deserialize(BsonDocument document) {
- return document.toJson();
- }
- @Override
- public TypeInformation<String> getProducedType() {
- return BasicTypeInfo.STRING_TYPE_INFO;
- }
- })
- .build();
- // 创建MongoDB数据流
- DataStream<String> sourceStream = env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "kafka Mongo Source");
- // env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDB-Source")
- // .setParallelism(2)
- // .print()
- // .setParallelism(1);
- // 配置Kafkasink
- KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
- .setBootstrapServers("localhost:9092")
- // .setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
- // .setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
- // 如果你使用String类型的键
- .setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
- // 如果你使用byte[]类型的值
- .setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
- .setRecordSerializer(KafkaRecordSerializationSchema.builder()
- .setTopic(Constants.TOPIC_NAME)
- .setValueSerializationSchema(new SimpleStringSchema())
- .build()
- )
- .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
- .build();
- // 将数据流写入Kafka
- sourceStream.sinkTo(kafkaSink);
- // 执行任务
- env.execute("MongoDB to Kafka");
- }
- }
复制代码 pom.xml
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-mongodb</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-base</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- </dependency>
复制代码 KAFKA 2 FILE
从kafka获取数据写入到本地文件
代码:
- public class KafkaToWriteText {
- public static void main(String[] args) throws Exception {
- // 1. 设置 Flink 执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- String brokers = "localhost:9092";
- KafkaSource<String> source = KafkaSource.<String>builder()
- .setBootstrapServers(brokers)
- .setTopics(TOPIC_NAME)
- .setGroupId("my-group")
- .setStartingOffsets(OffsetsInitializer.earliest())
- .setValueOnlyDeserializer(new SimpleStringSchema())
- .build();
- DataStreamSource<String> rs = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
- // 创建RollingFileSink
- String outputPath = "sink.csv";
- FileSink<String> sink = FileSink
- .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
- .withRollingPolicy(
- DefaultRollingPolicy.builder()
- .withRolloverInterval(Duration.ofMinutes(15))
- .withInactivityInterval(Duration.ofMinutes(5))
- .withMaxPartSize(MemorySize.ofMebiBytes(1024))
- .build())
- .build();
- rs.sinkTo(sink);
- // 6. 执行 Flink 作业
- env.execute("Kafka Flink Job");
- }
- }
复制代码 pom.xml
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-files</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-base</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- </dependency>
复制代码 KAFKA 部署
- 下载地点:
https://downloads.apache.org/kafka/3.7.0/kafka_2.12-3.7.0.tgz
- 运行zookeeper
- # Start the ZooKeeper service
- $ bin/zookeeper-server-start.sh config/zookeeper.properties
复制代码- # Start the Kafka broker service
- $ bin/kafka-server-start.sh config/server.properties
复制代码- # 接受信息
- kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC_WellCastingInfo --from-beginning
- # 发送信息
- kafka-console-producer.sh --bootstrap-server localhost:9092 --topic TOPIC_WellCastingInfo
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |