ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flink-05 Flink Java 3分钟上手 Redis FlinkJedisPoolConfig 从Kafka写入Re [打印本页]

作者: 饭宝    时间: 2024-10-14 20:45
标题: Flink-05 Flink Java 3分钟上手 Redis FlinkJedisPoolConfig 从Kafka写入Re
代码堆栈

会同步代码到 GitHub
https://github.com/turbo-duck/flink-demo

内容先容

上节我们已经实现了,对Kafka数据的消费和盘算,最终把结果输出到了控制台上。如下图:
Kafka In Docker


TestKafkaProducer

将数据写入到Kafka中的结果

FlinkConsumer

Flink消费Kafka的结果如下图,已经按照我们的需求进行盘算了。

这节内容

本节依然使用Flink对Kafka进行消费,但与上节差别的是(上节将结果输出到控制台上),本节将把Flink盘算的结果输出到Redis中进行生存(当然也可以存储到别的地方,这里以Redis为例)。
pom.xml

重点关注 flink-connector-redis_2.11 这个包。这是Redis相关的依赖。
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <modelVersion>4.0.0</modelVersion>
  6.     <groupId>org.example</groupId>
  7.     <artifactId>flink-demo-01</artifactId>
  8.     <version>1.0-SNAPSHOT</version>
  9.     <properties>
  10.         <maven.compiler.source>8</maven.compiler.source>
  11.         <maven.compiler.target>8</maven.compiler.target>
  12.         <flink.version>1.13.2</flink.version>
  13.         <scala.binary.version>2.12</scala.binary.version>
  14.     </properties>
  15.     <dependencies>
  16.         <dependency>
  17.             <groupId>org.apache.flink</groupId>
  18.             <artifactId>flink-java</artifactId>
  19.             <version>${flink.version}</version>
  20.         </dependency>
  21.         <dependency>
  22.             <groupId>org.apache.flink</groupId>
  23.             <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  24.             <version>${flink.version}</version>
  25.         </dependency>
  26.         <dependency>
  27.             <groupId>org.apache.flink</groupId>
  28.             <artifactId>flink-clients_${scala.binary.version}</artifactId>
  29.             <version>${flink.version}</version>
  30.         </dependency>
  31.         <dependency>
  32.             <groupId>org.apache.flink</groupId>
  33.             <artifactId>flink-connector-kafka_2.11</artifactId>
  34.             <version>${flink.version}</version>
  35.         </dependency>
  36.         <dependency>
  37.             <groupId>org.apache.kafka</groupId>
  38.             <artifactId>kafka-clients</artifactId>
  39.             <version>3.0.0</version>
  40.         </dependency>
  41.         <dependency>
  42.             <groupId>org.apache.flink</groupId>
  43.             <artifactId>flink-connector-redis_2.11</artifactId>
  44.             <version>1.1.0</version>
  45.         </dependency>
  46.     </dependencies>
  47. </project>
复制代码
KafkaProducer.java

生产数据存入到Kafka这种
  1. package icu.wzk.demo05;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.Producer;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import java.util.Properties;
  6. public class TestKafkaProducer {
  7.     public static void main(String[] args) throws InterruptedException {
  8.         Properties props = new Properties();
  9.         props.put("bootstrap.servers", "0.0.0.0:9092");
  10.         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  11.         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  12.         Producer<String, String> producer = new KafkaProducer<>(props);
  13.         for (int i = 0; i < 500; i++) {
  14.             String key = "key-" + i;
  15.             String value = "value-" + i;
  16.             ProducerRecord<String, String> record = new ProducerRecord<>("test", key, value);
  17.             producer.send(record);
  18.             System.out.println("send: " + key);
  19.             Thread.sleep(200);
  20.         }
  21.         producer.close();
  22.     }
  23. }
复制代码
StartApp

Flink消费Kafka,盘算后写入到Redis中。
FlinkJedisPoolConfig

毗连池的配置

MyRedisMapper

自定义的Mapper,需要实现RedisMapper

完整代码

  1. package icu.wzk.demo05;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  9. import org.apache.flink.streaming.connectors.redis.RedisSink;
  10. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
  11. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
  12. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
  13. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
  14. import java.util.Properties;
  15. public class StartApp {
  16.     private static final String KAFKA_SERVER = "0.0.0.0:9092";
  17.     private static final Integer KAFKA_PORT = 9092;
  18.     private static final String KAFKA_TOPIC = "test";
  19.     private static final String REDIS_SERVER = "0.0.0.0";
  20.     private static final Integer REDIS_PORT = 6379;
  21.     public static void main(String[] args) throws Exception {
  22.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23.         Properties properties = new Properties();
  24.         properties.setProperty("bootstrap.servers", String.format("%s:%d", KAFKA_SERVER, KAFKA_PORT));
  25.         FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(KAFKA_TOPIC, new SimpleStringSchema(), properties);
  26.         DataStreamSource<String> data = env.addSource(consumer);
  27.         SingleOutputStreamOperator<Tuple2<String, String>> wordData = data.map(new MapFunction<String, Tuple2<String, String>>() {
  28.             @Override
  29.             public Tuple2<String, String> map(String value) throws Exception {
  30.                 return new Tuple2<>("l_words", value);
  31.             }
  32.         });
  33.         FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig
  34.                 .Builder()
  35.                 .setHost(REDIS_SERVER)
  36.                 .setPort(REDIS_PORT)
  37.                 .build();
  38.         RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
  39.         wordData.addSink(redisSink);
  40.         env.execute();
  41.     }
  42.     public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {
  43.         @Override
  44.         public RedisCommandDescription getCommandDescription() {
  45.             return new RedisCommandDescription(RedisCommand.LPUSH);
  46.         }
  47.         @Override
  48.         public String getKeyFromData(Tuple2<String,String> data) {
  49.             return data.f0;
  50.         }
  51.         @Override
  52.         public String getValueFromData(Tuple2<String,String> data) {
  53.             return data.f1;
  54.         }
  55.     }
  56. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4