1.前期准备
(1)Flink底子环境安装
参考文章:
利用docker-compose来搭建flink集群-CSDN博客
显示为这样就乐成了
(2)把docker,docker-compose,kafka集群安装配置好
参考文章:
利用docker搭建kafka集群而且进行相应的实践-CSDN博客
这篇文章里面有另外两篇文章的链接,点进去就可以或许看到
(3)在windows上面,创建一个数据库mysql1(如果没有的话就须要创建),接着在这个数据库里面建一个表min_table
具体代码如下
create database if not exists mysql1; -- 解释符为‘-- '注意有个空格
use mysql1;
CREATE TABLE min_table (
id INT AUTO_INCREMENT PRIMARY KEY,
timestamp TIMESTAMP NOT NULL,
quantity INT NOT NULL,
amount DOUBLE NOT NULL,
UNIQUE KEY unique_timestamp (timestamp)
);
- create database if not exists mysql1; -- 注释符为‘-- '注意有个空格
- use mysql1;
- CREATE TABLE min_table (
- id INT AUTO_INCREMENT PRIMARY KEY,
- timestamp TIMESTAMP NOT NULL,
- quantity INT NOT NULL,
- amount DOUBLE NOT NULL,
- UNIQUE KEY unique_timestamp (timestamp)
- );
复制代码 (4)接着在安装配置了flink的linux虚拟机上面安装好mysql
参考文章:黑马大数据学习笔记4-Hive部署和根本利用_黑马大数据 hive笔记-CSDN博客
(5)然后同样的在linux虚拟机上面的mysql中创建一个数据库mysql1(如果没有的话就须要创建),接着在这个数据库里面建一个表min_table
具体代码如下
create database if not exists mysql1; -- 解释符为‘-- '注意有个空格
use mysql1;
CREATE TABLE min_table (
id INT AUTO_INCREMENT PRIMARY KEY,
timestamp TIMESTAMP NOT NULL,
quantity INT NOT NULL,
amount DOUBLE NOT NULL,
UNIQUE KEY unique_timestamp (timestamp)
);
- create database if not exists mysql1; -- 注释符为‘-- '注意有个空格
- use mysql1;
- CREATE TABLE min_table (
- id INT AUTO_INCREMENT PRIMARY KEY,
- timestamp TIMESTAMP NOT NULL,
- quantity INT NOT NULL,
- amount DOUBLE NOT NULL,
- UNIQUE KEY unique_timestamp (timestamp)
- );
复制代码
(6)在idea里面新建一个Maven项目,名字叫做FlinkDemo然后往pom.xml中添加以下配置
- <dependencies>
- <!-- Flink 的核心库 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>1.18.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- <version>1.18.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>1.18.0</version>
- </dependency>
- <!-- Flink Kafka Connector -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka</artifactId>
- <version>3.0.1-1.18</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc</artifactId>
- <version>3.1.1-1.17</version>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.33</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
复制代码 这个和上面的是一个东西,就看你喜欢一键复制照旧分别复制了
<dependencies>
<!-- Flink 的核心库 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.18.0</version>
</dependency>
<!-- Flink Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.0.1-1.18</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.1-1.17</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
(7)在该项目的com.examle目录下创建三个文件
目录结构如下
DatabaseSink.java
- package com.example;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.functions.sink.SinkFunction;
- import org.apache.flink.connector.jdbc.JdbcSink;
- import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
- import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
- import org.apache.flink.types.Row;
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.api.java.tuple.Tuple3;
- import java.sql.PreparedStatement;
- import java.sql.Timestamp;
- public class DatabaseSink {
- private String url;
- private String username;
- private String password;
- public DatabaseSink(String url, String username, String password) {
- this.url = url;
- this.username = username;
- this.password = password;
- }
- public void addSink(DataStream<Tuple3<Timestamp, Long, Double>> stream) {
- stream.addSink(JdbcSink.sink(
- "INSERT INTO min_table (timestamp, quantity, amount) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE quantity = quantity + VALUES(quantity), amount = amount + VALUES(amount)",
- (ps, t) -> {
- ps.setTimestamp(1, t.f0);
- ps.setLong(2, t.f1);
- ps.setDouble(3, t.f2);
- },
- new JdbcExecutionOptions.Builder()
- .withBatchSize(5000)
- .withBatchIntervalMs(200)
- .withMaxRetries(5)
- .build(),
- new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
- .withUrl(this.url)
- .withDriverName("com.mysql.jdbc.Driver")
- .withUsername(this.username)
- .withPassword(this.password)
- .build()
- ));
- }
- }
复制代码 LocalFlinkTest.java
- package com.example;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.functions.FilterFunction;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.common.restartstrategy.RestartStrategies;
- import org.apache.flink.api.common.serialization.SimpleStringSchema;
- import org.apache.flink.api.java.tuple.Tuple;
- import org.apache.flink.api.java.tuple.Tuple3;
- import org.apache.flink.connector.kafka.source.KafkaSource;
- import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
- import org.apache.flink.util.Collector;
- import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
- import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
- import org.apache.flink.connector.jdbc.JdbcSink;
- import org.apache.kafka.clients.consumer.OffsetResetStrategy;
- import java.sql.Timestamp;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.concurrent.TimeUnit;
- public class LocalFlinkTest {
- public static void main(String[] args) throws Exception {
- SimpleDateFormat sdf = new SimpleDateFormat(("yyyy-MM-dd HH:mm"));
- SimpleDateFormat sdf_hour = new SimpleDateFormat("yyyy-MM-dd HH");
- final StreamExecutionEnvironment env =
- StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
- 333, // 尝试重启的次数
- org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // 延迟
- ));
- env.setRestartStrategy(RestartStrategies.noRestart());
- KafkaSource<String> source = KafkaSource.<String>builder()
- .setBootstrapServers("192.168.88.101:19092,192.168.88.101:29092,192.168.88.101:39092") // 你的 Kafka 服务器地址
- .setGroupId("testGroup") // 你的消费者组 ID
- .setTopics("foo") // 你的主题
- .setValueOnlyDeserializer(new SimpleStringSchema())
- .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.
- LATEST)) // 从消费者组的最新偏移量开始消费
- .build();
- DataStream<String> stream = env.fromSource(source,
- WatermarkStrategy.noWatermarks(), "Kafka Source");
- // flatMap 函数,它接收一个输入元素,并可以输出零个、一个或多个元素。
- // 在这个函数中,输入元素是从 Kafka 中读取的一行数据,输出元素是一个包含交易量的元组。
- // 近 1 分钟与当天累计的总交易金额、交易数量
- // DataStream<String> stream = env.readTextFile("D:\\idea\\flinkTest\\src\\main\\java\\com\\springbootdemo\\2.csv", "GBK");
- DataStream<Tuple3<Timestamp, Long, Double>> transactionVolumes = stream
- .filter(new FilterFunction<String>() {
- @Override
- public boolean filter(String value) throws Exception {
- // 假设文件的第一行是表头,这里跳过它
- return !value.startsWith("time");
- }
- })
- .flatMap(new FlatMapFunction<String, Tuple3<Timestamp, Long,
- Double>>() {
- @Override
- public void flatMap(String line, Collector<Tuple3<Timestamp, Long,
- Double>> out) {
- try {
- String[] fields = line.split(",");
- String s = fields[0];
- // 解析时间字符串后,将日期时间对象的秒字段设置为 0
- Date date = sdf.parse(s);
- Timestamp sqlTimestamp = new Timestamp(date.getTime());
- double price = Double.parseDouble(fields[3]);
- long quantity = Long.parseLong(fields[4]);
- double amount = price * quantity;
- out.collect(Tuple3.of(sqlTimestamp, quantity, amount));
- // System.out.println(line);
- } catch (Exception e) {
- System.out.println(line); }
- }
- }); // 过滤掉解析失败的记录;
- // 计算每 500 毫秒的数据
- // keyBy(t -> t.f0)代表以第一个字段 Timestamp 为键,确保一个窗口内的时间都是相同的
- DataStream<Tuple3<Timestamp,Long ,Double>> oneSecondAmounts =
- transactionVolumes
- .keyBy(t -> t.f0)
- .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)
- ))
- .reduce((Tuple3<Timestamp,Long ,Double> value1,
- Tuple3<Timestamp,Long ,Double> value2) -> {
- // System.out.println(Tuple3.of(value1.f0,value1.f1 + value2.f1, value1.f2 + value2.f2));
- return Tuple3.of(value1.f0,value1.f1 + value2.f1, value1.f2 +
- value2.f2);
- });
- oneSecondAmounts.print();
- DatabaseSink dbSink = new DatabaseSink("jdbc:mysql://localhost:3306/mysql1", "root", "123456");
- dbSink.addSink(oneSecondAmounts);
- env.execute("Kafka Flink Demo");
- }
- }
复制代码 DatabaseSink dbSink = new DatabaseSink("jdbc:mysql://localhost:3306/mysql1", "root", "123456");
这里的密码应该改成你自己的。(固然博主本人的是123456)
FlinkTest.java
- package com.example;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.functions.FilterFunction;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.common.restartstrategy.RestartStrategies;
- import org.apache.flink.api.common.serialization.SimpleStringSchema;
- import org.apache.flink.api.java.tuple.Tuple;
- import org.apache.flink.api.java.tuple.Tuple3;
- import org.apache.flink.connector.kafka.source.KafkaSource;
- import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
- import org.apache.flink.util.Collector;
- import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
- import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
- import org.apache.flink.connector.jdbc.JdbcSink;
- import org.apache.kafka.clients.consumer.OffsetResetStrategy;
- import java.sql.Timestamp;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.concurrent.TimeUnit;
- public class FlinkTest {
- public static void main(String[] args) throws Exception {
- SimpleDateFormat sdf = new SimpleDateFormat(("yyyy-MM-dd HH:mm"));
- SimpleDateFormat sdf_hour = new SimpleDateFormat("yyyy-MM-dd HH");
- final StreamExecutionEnvironment env =
- StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
- 333, // 尝试重启的次数
- org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // 延迟
- ));
- env.setRestartStrategy(RestartStrategies.noRestart());
- KafkaSource<String> source = KafkaSource.<String>builder()
- .setBootstrapServers("192.168.88.101:19092,192.168.88.101:29092,192.168.88.101:39092") // 你的 Kafka 服务器地址
- .setGroupId("testGroup") // 你的消费者组 ID
- .setTopics("foo") // 你的主题
- .setValueOnlyDeserializer(new SimpleStringSchema())
- .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.
- LATEST)) // 从消费者组的最新偏移量开始消费
- .build();
- DataStream<String> stream = env.fromSource(source,
- WatermarkStrategy.noWatermarks(), "Kafka Source");
- // flatMap 函数,它接收一个输入元素,并可以输出零个、一个或多个元素。
- // 在这个函数中,输入元素是从 Kafka 中读取的一行数据,输出元素是一个包含交易量的元组。
- // 近 1 分钟与当天累计的总交易金额、交易数量
- // DataStream<String> stream = env.readTextFile("D:\\idea\\flinkTest\\src\\main\\java\\com\\springbootdemo\\2.csv", "GBK");
- DataStream<Tuple3<Timestamp, Long, Double>> transactionVolumes = stream
- .filter(new FilterFunction<String>() {
- @Override
- public boolean filter(String value) throws Exception {
- // 假设文件的第一行是表头,这里跳过它
- return !value.startsWith("time");
- }
- })
- .flatMap(new FlatMapFunction<String, Tuple3<Timestamp, Long,
- Double>>() {
- @Override
- public void flatMap(String line, Collector<Tuple3<Timestamp, Long,
- Double>> out) {
- try {
- String[] fields = line.split(",");
- String s = fields[0];
- // 解析时间字符串后,将日期时间对象的秒字段设置为 0
- Date date = sdf.parse(s);
- Timestamp sqlTimestamp = new Timestamp(date.getTime());
- double price = Double.parseDouble(fields[3]);
- long quantity = Long.parseLong(fields[4]);
- double amount = price * quantity;
- out.collect(Tuple3.of(sqlTimestamp, quantity, amount));
- // System.out.println(line);
- } catch (Exception e) {
- System.out.println(line); }
- }
- }); // 过滤掉解析失败的记录;
- // 计算每 500 毫秒的数据
- // keyBy(t -> t.f0)代表以第一个字段 Timestamp 为键,确保一个窗口内的时间都是相同的
- DataStream<Tuple3<Timestamp,Long ,Double>> oneSecondAmounts =
- transactionVolumes
- .keyBy(t -> t.f0)
- .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)
- ))
- .reduce((Tuple3<Timestamp,Long ,Double> value1,
- Tuple3<Timestamp,Long ,Double> value2) -> {
- // System.out.println(Tuple3.of(value1.f0,value1.f1 + value2.f1, value1.f2 + value2.f2));
- return Tuple3.of(value1.f0,value1.f1 + value2.f1, value1.f2 +
- value2.f2);
- });
- oneSecondAmounts.print();
- DatabaseSink dbSink = new DatabaseSink("jdbc:mysql://192.168.88.101:3306/mysql1", "root", "123456");
- dbSink.addSink(oneSecondAmounts);
- env.execute("Kafka Flink Demo");
- }
- }
复制代码- DatabaseSink dbSink = new DatabaseSink("jdbc:mysql://192.168.88.101:3306/mysql1", "root", "123456");
复制代码 这里的密码和主机号(192.168.88.101)应该改成你自己的密码和主机号
2.开始实验,分为当地测试和flink测试
(1)启动node1,打开Finalshell,启动docker,启动kafka集群,flink集群
- systemctl start docker
- cd /export/server
- docker-compose -f kafka.yml up -d
- docker-compose -f flink.yml up -d
- docker ps
复制代码 结果如下
(2)先辈行当地测试(这里只须要用到kafka集群)
打开两个node1的窗口
在第二个窗口进入kafka2容器,启动消费者进程
代码
- docker exec -it kafka2 /bin/bash
复制代码- cd /opt/bitnami/kafka/bin
复制代码- kafka-console-consumer.sh --bootstrap-server 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo
复制代码 结果如下
进入idea,运行这个文件LocalFlinkTest.java
在第一个窗口进入kafka1容器,发送文件的前5行
[root@node1 server]# docker exec -it kafka1 /bin/bash
root@a2f7152188c1:/# cd /opt/bitnami/kafka/bin
root@a2f7152188c1:/opt/bitnami/kafka/bin# head -n 5 /bitnami/kafka/stock-part10.csv | kafka-console-producer.sh --broker-list 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo
root@a2f7152188c1:/opt/bitnami/kafka/bin#
代码
- docker exec -it kafka1 /bin/bash
复制代码- cd /opt/bitnami/kafka/bin
复制代码- head -n 5 /bitnami/kafka/stock-part10.csv | kafka-console-producer.sh --broker-list 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo
复制代码
接着在idea里面查看
在mysql里查看
到这里,当地测试就已经乐成了!
(3)再进行flink测试,先在idea这里双击packge,然后去target目录看看有没有多出这两个文件(先运行文件FlinkTest.java先)
运行文件FlinkTest.java
在idea这里双击packge,然后去target目录看看有没有多出这两个文件
进入网页node1:8081,上传这个名字更长的jar包
输入这个路径
D:\JetBrains\idea-project\FlinkDemo\target
(反正就是target目录的位置)
添加乐成后
点一下谁人玩意儿填入如下内容com.example.FlinkTest
这个com.example.FlinkTest是FlinkTest.java在项目中的路径
以及选择输入3
然后点击submit提交即可,结果显示正常运行
再回到node1的第一个窗口,
在这个位置
root@41d3910fe6c9:/opt/bitnami/kafka/bin#输入以下代码(kafka1的/opt/bitnami/kafka/bin目录下)来发个文件已往
代码
- cat /bitnami/kafka/stock-part10.csv | kafka-console-producer.sh --broker-list 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo
复制代码 任意点开一个,在监控参数中选择numRecordsInPerSecond可以查看每秒处理数据速度。
4.总结
启动
- cd /export/server
- docker-compose -f kafka.yml up -d
- docker-compose -f flink.yml up -d
- docker ps
复制代码- cd /export/server
- docker-compose -f kafka.yml up -d
- docker-compose -f bigwork_flink.yml up -d
- docker ps
复制代码- redis-server be_redis.conf
复制代码
关闭
- cd /export/server
- docker-compose -f flink.yml down
- docker-compose -f kafka.yml down
- docker ps
复制代码
- cd /export/server
- docker-compose -f bigwork_flink.yml down
- docker-compose -f kafka.yml down
- docker ps
复制代码 可以新加一个bigwork_flink.yml,
- cd /export/server/
- vi bigwork_flink.yml
复制代码 内容如下
- version: '3'
- services:
- jobmanager:
- image: flink:latest
- container_name: jobmanager
- hostname: jobmanager
- ports:
- - "8081:8081"
- command: jobmanager
- environment:
- - JOB_MANAGER_RPC_ADDRESS=jobmanager
- volumes:
- - /export/server/dockerflink:/example/data
- networks:
- - flink_network
- taskmanager:
- image: flink:latest
- depends_on:
- - jobmanager
- ports:
- - "8082-8085:8081" # 确保端口范围足够
- command: taskmanager
- scale: 4 # 设置TaskManager的数量为4
- environment:
- - JOB_MANAGER_RPC_ADDRESS=jobmanager
- - TASK_MANAGER_NUMBER_OF_TASK_SLOTS=4
- networks:
- - flink_network
- networks: #注意这里,要跟services配置项目对齐(即平级的意思)
- flink_network:
- driver: bridge
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |