ToB企服应用市场:ToB评测及商务社交产业平台
标题:
Flink-05 Flink Java 3分钟上手 Redis FlinkJedisPoolConfig 从Kafka写入Re
[打印本页]
作者:
饭宝
时间:
2024-10-14 20:45
标题:
Flink-05 Flink Java 3分钟上手 Redis FlinkJedisPoolConfig 从Kafka写入Re
代码堆栈
会同步代码到 GitHub
https://github.com/turbo-duck/flink-demo
内容先容
上节我们已经实现了,对Kafka数据的消费和盘算,最终把结果输出到了控制台上。如下图:
Kafka In Docker
TestKafkaProducer
将数据写入到Kafka中的结果
FlinkConsumer
Flink消费Kafka的结果如下图,已经按照我们的需求进行盘算了。
这节内容
本节依然使用Flink对Kafka进行消费,但与上节差别的是(上节将结果输出到控制台上),本节将把Flink盘算的结果输出到Redis中进行生存(当然也可以存储到别的地方,这里以Redis为例)。
pom.xml
重点关注 flink-connector-redis_2.11 这个包。这是Redis相关的依赖。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-demo-01</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.2</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
</project>
复制代码
KafkaProducer.java
生产数据存入到Kafka这种
package icu.wzk.demo05;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class TestKafkaProducer {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "0.0.0.0:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 500; i++) {
String key = "key-" + i;
String value = "value-" + i;
ProducerRecord<String, String> record = new ProducerRecord<>("test", key, value);
producer.send(record);
System.out.println("send: " + key);
Thread.sleep(200);
}
producer.close();
}
}
复制代码
StartApp
Flink消费Kafka,盘算后写入到Redis中。
FlinkJedisPoolConfig
毗连池的配置
MyRedisMapper
自定义的Mapper,需要实现RedisMapper
完整代码
package icu.wzk.demo05;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import java.util.Properties;
public class StartApp {
private static final String KAFKA_SERVER = "0.0.0.0:9092";
private static final Integer KAFKA_PORT = 9092;
private static final String KAFKA_TOPIC = "test";
private static final String REDIS_SERVER = "0.0.0.0";
private static final Integer REDIS_PORT = 6379;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", String.format("%s:%d", KAFKA_SERVER, KAFKA_PORT));
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(KAFKA_TOPIC, new SimpleStringSchema(), properties);
DataStreamSource<String> data = env.addSource(consumer);
SingleOutputStreamOperator<Tuple2<String, String>> wordData = data.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
return new Tuple2<>("l_words", value);
}
});
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig
.Builder()
.setHost(REDIS_SERVER)
.setPort(REDIS_PORT)
.build();
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
wordData.addSink(redisSink);
env.execute();
}
public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
@Override
public String getKeyFromData(Tuple2<String,String> data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2<String,String> data) {
return data.f1;
}
}
}
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4