实验3-及时数据流处理-Flink

打印 上一主题 下一主题

主题 833|帖子 833|积分 2499

1.前期准备

(1)Flink底子环境安装

参考文章:
利用docker-compose来搭建flink集群-CSDN博客
显示为这样就乐成了

(2)把docker,docker-compose,kafka集群安装配置好

参考文章:

利用docker搭建kafka集群而且进行相应的实践-CSDN博客
这篇文章里面有另外两篇文章的链接,点进去就可以或许看到
(3)在windows上面,创建一个数据库mysql1(如果没有的话就须要创建),接着在这个数据库里面建一个表min_table

具体代码如下
create database if not exists mysql1; -- 解释符为‘-- '注意有个空格
use mysql1;
CREATE TABLE min_table (
    id INT AUTO_INCREMENT PRIMARY KEY,
    timestamp TIMESTAMP NOT NULL,
    quantity INT NOT NULL,
    amount DOUBLE NOT NULL,
    UNIQUE KEY unique_timestamp (timestamp)
);
  1. create database if not exists mysql1; -- 注释符为‘-- '注意有个空格
  2. use mysql1;
  3. CREATE TABLE min_table (
  4.     id INT AUTO_INCREMENT PRIMARY KEY,
  5.     timestamp TIMESTAMP NOT NULL,
  6.     quantity INT NOT NULL,
  7.     amount DOUBLE NOT NULL,
  8.     UNIQUE KEY unique_timestamp (timestamp)
  9. );
复制代码
(4)接着在安装配置了flink的linux虚拟机上面安装好mysql

参考文章:黑马大数据学习笔记4-Hive部署和根本利用_黑马大数据 hive笔记-CSDN博客
 (5)然后同样的在linux虚拟机上面的mysql中创建一个数据库mysql1(如果没有的话就须要创建),接着在这个数据库里面建一个表min_table

具体代码如下
create database if not exists mysql1; -- 解释符为‘-- '注意有个空格
use mysql1;

CREATE TABLE min_table (
    id INT AUTO_INCREMENT PRIMARY KEY,
    timestamp TIMESTAMP NOT NULL,
    quantity INT NOT NULL,
    amount DOUBLE NOT NULL,
    UNIQUE KEY unique_timestamp (timestamp)
);
  1. create database if not exists mysql1; -- 注释符为‘-- '注意有个空格
  2. use mysql1;
  3. CREATE TABLE min_table (
  4.     id INT AUTO_INCREMENT PRIMARY KEY,
  5.     timestamp TIMESTAMP NOT NULL,
  6.     quantity INT NOT NULL,
  7.     amount DOUBLE NOT NULL,
  8.     UNIQUE KEY unique_timestamp (timestamp)
  9. );
复制代码

(6)在idea里面新建一个Maven项目,名字叫做FlinkDemo然后往pom.xml中添加以下配置

  1. <dependencies>
  2.     <!-- Flink 的核心库 -->
  3.     <dependency>
  4.         <groupId>org.apache.flink</groupId>
  5.         <artifactId>flink-java</artifactId>
  6.         <version>1.18.0</version>
  7.     </dependency>
  8.     <dependency>
  9.         <groupId>org.apache.flink</groupId>
  10.         <artifactId>flink-streaming-java</artifactId>
  11.         <version>1.18.0</version>
  12.     </dependency>
  13.     <dependency>
  14.         <groupId>org.apache.flink</groupId>
  15.         <artifactId>flink-clients</artifactId>
  16.         <version>1.18.0</version>
  17.     </dependency>
  18.     <!-- Flink Kafka Connector -->
  19.     <dependency>
  20.         <groupId>org.apache.flink</groupId>
  21.         <artifactId>flink-connector-kafka</artifactId>
  22.         <version>3.0.1-1.18</version>
  23.     </dependency>
  24.     <dependency>
  25.         <groupId>org.apache.flink</groupId>
  26.         <artifactId>flink-connector-jdbc</artifactId>
  27.         <version>3.1.1-1.17</version>
  28.     </dependency>
  29.     <dependency>
  30.         <groupId>mysql</groupId>
  31.         <artifactId>mysql-connector-java</artifactId>
  32.         <version>8.0.33</version>
  33.     </dependency>
  34. </dependencies>
  35. <build>
  36.     <plugins>
  37.         <plugin>
  38.             <artifactId>maven-assembly-plugin</artifactId>
  39.             <configuration>
  40.                 <descriptorRefs>
  41.                     <descriptorRef>jar-with-dependencies</descriptorRef>
  42.                 </descriptorRefs>
  43.             </configuration>
  44.             <executions>
  45.                 <execution>
  46.                     <phase>package</phase>
  47.                     <goals>
  48.                         <goal>single</goal>
  49.                     </goals>
  50.                 </execution>
  51.             </executions>
  52.         </plugin>
  53.     </plugins>
  54. </build>
复制代码
这个和上面的是一个东西,就看你喜欢一键复制照旧分别复制了
<dependencies>
    <!-- Flink 的核心库 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.18.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.18.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>1.18.0</version>
    </dependency>

    <!-- Flink Kafka Connector -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>3.0.1-1.18</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc</artifactId>
        <version>3.1.1-1.17</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.33</version>
    </dependency>


</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
(7)在该项目的com.examle目录下创建三个文件

     目录结构如下


DatabaseSink.java

  1. package com.example;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
  4. import org.apache.flink.connector.jdbc.JdbcSink;
  5. import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
  6. import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
  7. import org.apache.flink.types.Row;
  8. import org.apache.flink.api.common.typeinfo.Types;
  9. import org.apache.flink.api.java.tuple.Tuple3;
  10. import java.sql.PreparedStatement;
  11. import java.sql.Timestamp;
  12. public class DatabaseSink {
  13.     private String url;
  14.     private String username;
  15.     private String password;
  16.     public DatabaseSink(String url, String username, String password) {
  17.         this.url = url;
  18.         this.username = username;
  19.         this.password = password;
  20.     }
  21.     public void addSink(DataStream<Tuple3<Timestamp, Long, Double>> stream) {
  22.         stream.addSink(JdbcSink.sink(
  23.                 "INSERT INTO min_table (timestamp, quantity, amount) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE quantity = quantity + VALUES(quantity), amount = amount + VALUES(amount)",
  24.                 (ps, t) -> {
  25.                     ps.setTimestamp(1, t.f0);
  26.                     ps.setLong(2, t.f1);
  27.                     ps.setDouble(3, t.f2);
  28.                 },
  29.                 new JdbcExecutionOptions.Builder()
  30.                         .withBatchSize(5000)
  31.                         .withBatchIntervalMs(200)
  32.                         .withMaxRetries(5)
  33.                         .build(),
  34.                 new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  35.                         .withUrl(this.url)
  36.                         .withDriverName("com.mysql.jdbc.Driver")
  37.                         .withUsername(this.username)
  38.                         .withPassword(this.password)
  39.                         .build()
  40.         ));
  41.     }
  42. }
复制代码
LocalFlinkTest.java

  1. package com.example;
  2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  3. import org.apache.flink.api.common.functions.FilterFunction;
  4. import org.apache.flink.api.common.functions.FlatMapFunction;
  5. import org.apache.flink.api.common.restartstrategy.RestartStrategies;
  6. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  7. import org.apache.flink.api.java.tuple.Tuple;
  8. import org.apache.flink.api.java.tuple.Tuple3;
  9. import org.apache.flink.connector.kafka.source.KafkaSource;
  10. import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
  11. import org.apache.flink.streaming.api.datastream.DataStream;
  12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  13. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  14. import org.apache.flink.streaming.api.windowing.time.Time;
  15. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  16. import org.apache.flink.util.Collector;
  17. import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
  18. import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
  19. import org.apache.flink.connector.jdbc.JdbcSink;
  20. import org.apache.kafka.clients.consumer.OffsetResetStrategy;
  21. import java.sql.Timestamp;
  22. import java.text.SimpleDateFormat;
  23. import java.util.Date;
  24. import java.util.concurrent.TimeUnit;
  25. public class LocalFlinkTest {
  26.     public static void main(String[] args) throws Exception {
  27.         SimpleDateFormat sdf = new SimpleDateFormat(("yyyy-MM-dd HH:mm"));
  28.         SimpleDateFormat sdf_hour = new SimpleDateFormat("yyyy-MM-dd HH");
  29.         final StreamExecutionEnvironment env =
  30.                 StreamExecutionEnvironment.getExecutionEnvironment();
  31.         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  32.                 333, // 尝试重启的次数
  33.                 org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // 延迟
  34.         ));
  35.         env.setRestartStrategy(RestartStrategies.noRestart());
  36.         KafkaSource<String> source = KafkaSource.<String>builder()
  37.                 .setBootstrapServers("192.168.88.101:19092,192.168.88.101:29092,192.168.88.101:39092") // 你的 Kafka 服务器地址
  38.                 .setGroupId("testGroup") // 你的消费者组 ID
  39.                 .setTopics("foo") // 你的主题
  40.                 .setValueOnlyDeserializer(new SimpleStringSchema())
  41.                 .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.
  42.                         LATEST)) // 从消费者组的最新偏移量开始消费
  43.                 .build();
  44.         DataStream<String> stream = env.fromSource(source,
  45.                 WatermarkStrategy.noWatermarks(), "Kafka Source");
  46. // flatMap 函数,它接收一个输入元素,并可以输出零个、一个或多个元素。
  47. // 在这个函数中,输入元素是从 Kafka 中读取的一行数据,输出元素是一个包含交易量的元组。
  48. // 近 1 分钟与当天累计的总交易金额、交易数量
  49. //                DataStream<String> stream = env.readTextFile("D:\\idea\\flinkTest\\src\\main\\java\\com\\springbootdemo\\2.csv", "GBK");
  50.         DataStream<Tuple3<Timestamp, Long, Double>> transactionVolumes = stream
  51.                 .filter(new FilterFunction<String>() {
  52.                     @Override
  53.                     public boolean filter(String value) throws Exception {
  54.                         // 假设文件的第一行是表头,这里跳过它
  55.                         return !value.startsWith("time");
  56.                     }
  57.                 })
  58.                 .flatMap(new FlatMapFunction<String, Tuple3<Timestamp, Long,
  59.                         Double>>() {
  60.                     @Override
  61.                     public void flatMap(String line, Collector<Tuple3<Timestamp, Long,
  62.                             Double>> out) {
  63.                         try {
  64.                             String[] fields = line.split(",");
  65.                             String s = fields[0];
  66. // 解析时间字符串后,将日期时间对象的秒字段设置为 0
  67.                             Date date = sdf.parse(s);
  68.                             Timestamp sqlTimestamp = new Timestamp(date.getTime());
  69.                             double price = Double.parseDouble(fields[3]);
  70.                             long quantity = Long.parseLong(fields[4]);
  71.                             double amount = price * quantity;
  72.                             out.collect(Tuple3.of(sqlTimestamp, quantity, amount));
  73. // System.out.println(line);
  74.                         } catch (Exception e) {
  75.                             System.out.println(line);                        }
  76.                     }
  77.                 }); // 过滤掉解析失败的记录;
  78.         // 计算每 500 毫秒的数据
  79. // keyBy(t -> t.f0)代表以第一个字段 Timestamp 为键,确保一个窗口内的时间都是相同的
  80.         DataStream<Tuple3<Timestamp,Long ,Double>> oneSecondAmounts =
  81.                 transactionVolumes
  82.                         .keyBy(t -> t.f0)
  83.                         .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)
  84.                         ))
  85.                         .reduce((Tuple3<Timestamp,Long ,Double> value1,
  86.                                  Tuple3<Timestamp,Long ,Double> value2) -> {
  87. //                            System.out.println(Tuple3.of(value1.f0,value1.f1 + value2.f1, value1.f2 + value2.f2));
  88.                             return Tuple3.of(value1.f0,value1.f1 + value2.f1, value1.f2 +
  89.                                     value2.f2);
  90.                         });
  91.         oneSecondAmounts.print();
  92.         DatabaseSink dbSink = new DatabaseSink("jdbc:mysql://localhost:3306/mysql1", "root", "123456");
  93.         dbSink.addSink(oneSecondAmounts);
  94.         env.execute("Kafka Flink Demo");
  95.     }
  96. }
复制代码
DatabaseSink dbSink = new DatabaseSink("jdbc:mysql://localhost:3306/mysql1", "root", "123456");

这里的密码应该改成你自己的。(固然博主本人的是123456)
FlinkTest.java

  1. package com.example;
  2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  3. import org.apache.flink.api.common.functions.FilterFunction;
  4. import org.apache.flink.api.common.functions.FlatMapFunction;
  5. import org.apache.flink.api.common.restartstrategy.RestartStrategies;
  6. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  7. import org.apache.flink.api.java.tuple.Tuple;
  8. import org.apache.flink.api.java.tuple.Tuple3;
  9. import org.apache.flink.connector.kafka.source.KafkaSource;
  10. import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
  11. import org.apache.flink.streaming.api.datastream.DataStream;
  12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  13. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  14. import org.apache.flink.streaming.api.windowing.time.Time;
  15. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  16. import org.apache.flink.util.Collector;
  17. import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
  18. import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
  19. import org.apache.flink.connector.jdbc.JdbcSink;
  20. import org.apache.kafka.clients.consumer.OffsetResetStrategy;
  21. import java.sql.Timestamp;
  22. import java.text.SimpleDateFormat;
  23. import java.util.Date;
  24. import java.util.concurrent.TimeUnit;
  25. public class FlinkTest {
  26.     public static void main(String[] args) throws Exception {
  27.         SimpleDateFormat sdf = new SimpleDateFormat(("yyyy-MM-dd HH:mm"));
  28.         SimpleDateFormat sdf_hour = new SimpleDateFormat("yyyy-MM-dd HH");
  29.         final StreamExecutionEnvironment env =
  30.                 StreamExecutionEnvironment.getExecutionEnvironment();
  31.         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  32.                 333, // 尝试重启的次数
  33.                 org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // 延迟
  34.         ));
  35.         env.setRestartStrategy(RestartStrategies.noRestart());
  36.         KafkaSource<String> source = KafkaSource.<String>builder()
  37.                 .setBootstrapServers("192.168.88.101:19092,192.168.88.101:29092,192.168.88.101:39092") // 你的 Kafka 服务器地址
  38.                 .setGroupId("testGroup") // 你的消费者组 ID
  39.                 .setTopics("foo") // 你的主题
  40.                 .setValueOnlyDeserializer(new SimpleStringSchema())
  41.                 .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.
  42.                         LATEST)) // 从消费者组的最新偏移量开始消费
  43.                 .build();
  44.         DataStream<String> stream = env.fromSource(source,
  45.                 WatermarkStrategy.noWatermarks(), "Kafka Source");
  46. // flatMap 函数,它接收一个输入元素,并可以输出零个、一个或多个元素。
  47. // 在这个函数中,输入元素是从 Kafka 中读取的一行数据,输出元素是一个包含交易量的元组。
  48. // 近 1 分钟与当天累计的总交易金额、交易数量
  49. //                DataStream<String> stream = env.readTextFile("D:\\idea\\flinkTest\\src\\main\\java\\com\\springbootdemo\\2.csv", "GBK");
  50.         DataStream<Tuple3<Timestamp, Long, Double>> transactionVolumes = stream
  51.                 .filter(new FilterFunction<String>() {
  52.                     @Override
  53.                     public boolean filter(String value) throws Exception {
  54.                         // 假设文件的第一行是表头,这里跳过它
  55.                         return !value.startsWith("time");
  56.                     }
  57.                 })
  58.                 .flatMap(new FlatMapFunction<String, Tuple3<Timestamp, Long,
  59.                         Double>>() {
  60.                     @Override
  61.                     public void flatMap(String line, Collector<Tuple3<Timestamp, Long,
  62.                             Double>> out) {
  63.                         try {
  64.                             String[] fields = line.split(",");
  65.                             String s = fields[0];
  66. // 解析时间字符串后,将日期时间对象的秒字段设置为 0
  67.                             Date date = sdf.parse(s);
  68.                             Timestamp sqlTimestamp = new Timestamp(date.getTime());
  69.                             double price = Double.parseDouble(fields[3]);
  70.                             long quantity = Long.parseLong(fields[4]);
  71.                             double amount = price * quantity;
  72.                             out.collect(Tuple3.of(sqlTimestamp, quantity, amount));
  73. // System.out.println(line);
  74.                         } catch (Exception e) {
  75.                             System.out.println(line);                        }
  76.                     }
  77.                 }); // 过滤掉解析失败的记录;
  78.         // 计算每 500 毫秒的数据
  79. // keyBy(t -> t.f0)代表以第一个字段 Timestamp 为键,确保一个窗口内的时间都是相同的
  80.         DataStream<Tuple3<Timestamp,Long ,Double>> oneSecondAmounts =
  81.                 transactionVolumes
  82.                         .keyBy(t -> t.f0)
  83.                         .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)
  84.                         ))
  85.                         .reduce((Tuple3<Timestamp,Long ,Double> value1,
  86.                                  Tuple3<Timestamp,Long ,Double> value2) -> {
  87. //                            System.out.println(Tuple3.of(value1.f0,value1.f1 + value2.f1, value1.f2 + value2.f2));
  88.                             return Tuple3.of(value1.f0,value1.f1 + value2.f1, value1.f2 +
  89.                                     value2.f2);
  90.                         });
  91.         oneSecondAmounts.print();
  92.         DatabaseSink dbSink = new DatabaseSink("jdbc:mysql://192.168.88.101:3306/mysql1", "root", "123456");
  93.         dbSink.addSink(oneSecondAmounts);
  94.         env.execute("Kafka Flink Demo");
  95.     }
  96. }
复制代码
  1. DatabaseSink dbSink = new DatabaseSink("jdbc:mysql://192.168.88.101:3306/mysql1", "root", "123456");
复制代码
这里的密码和主机号(192.168.88.101)应该改成你自己的密码和主机号
2.开始实验,分为当地测试和flink测试

(1)启动node1,打开Finalshell,启动docker,启动kafka集群,flink集群

  1. systemctl start docker
  2. cd /export/server
  3. docker-compose -f kafka.yml up -d
  4. docker-compose -f flink.yml up -d
  5. docker ps
复制代码
结果如下


(2)先辈行当地测试(这里只须要用到kafka集群)

打开两个node1的窗口

在第二个窗口进入kafka2容器,启动消费者进程

代码
  1. docker exec -it kafka2 /bin/bash
复制代码
  1. cd /opt/bitnami/kafka/bin
复制代码
  1. kafka-console-consumer.sh --bootstrap-server 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo
复制代码
 结果如下

进入idea,运行这个文件LocalFlinkTest.java


在第一个窗口进入kafka1容器,发送文件的前5行

[root@node1 server]# docker exec -it kafka1 /bin/bash
root@a2f7152188c1:/#  cd /opt/bitnami/kafka/bin
root@a2f7152188c1:/opt/bitnami/kafka/bin# head -n 5 /bitnami/kafka/stock-part10.csv | kafka-console-producer.sh --broker-list 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo
root@a2f7152188c1:/opt/bitnami/kafka/bin#
代码
  1. docker exec -it kafka1 /bin/bash
复制代码
  1. cd /opt/bitnami/kafka/bin
复制代码
  1. head -n 5 /bitnami/kafka/stock-part10.csv | kafka-console-producer.sh --broker-list 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo
复制代码


接着在idea里面查看


在mysql里查看


到这里,当地测试就已经乐成了!

(3)再进行flink测试,先在idea这里双击packge,然后去target目录看看有没有多出这两个文件(先运行文件FlinkTest.java先)

运行文件FlinkTest.java


在idea这里双击packge,然后去target目录看看有没有多出这两个文件 




进入网页node1:8081,上传这个名字更长的jar包



输入这个路径

D:\JetBrains\idea-project\FlinkDemo\target
(反正就是target目录的位置)


添加乐成后

点一下谁人玩意儿填入如下内容com.example.FlinkTest

这个com.example.FlinkTest是FlinkTest.java在项目中的路径
以及选择输入3



然后点击submit提交即可,结果显示正常运行


再回到node1的第一个窗口,

在这个位置
root@41d3910fe6c9:/opt/bitnami/kafka/bin#输入以下代码(kafka1的/opt/bitnami/kafka/bin目录下)来发个文件已往


代码
  1. cat /bitnami/kafka/stock-part10.csv | kafka-console-producer.sh --broker-list 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo
复制代码
任意点开一个,在监控参数中选择numRecordsInPerSecond可以查看每秒处理数据速度。



4.总结

启动

  1. systemctl start docker
复制代码
  1. cd /export/server
  2. docker-compose -f kafka.yml up -d
  3. docker-compose -f flink.yml up -d
  4. docker ps
复制代码
  1. cd /export/server
  2. docker-compose -f kafka.yml up -d
  3. docker-compose -f bigwork_flink.yml up -d
  4. docker ps
复制代码
  1. redis-server be_redis.conf
复制代码

关闭

  1. cd /export/server
  2. docker-compose -f flink.yml down
  3. docker-compose -f kafka.yml down
  4. docker ps
复制代码

  1. cd /export/server
  2. docker-compose -f bigwork_flink.yml down
  3. docker-compose -f kafka.yml down
  4. docker ps
复制代码
  1. systemctl stop docker
复制代码
可以新加一个bigwork_flink.yml,

  1. cd /export/server/
  2. vi bigwork_flink.yml
复制代码
内容如下

  1. version: '3'
  2. services:
  3.   jobmanager:
  4.     image: flink:latest
  5.     container_name: jobmanager
  6.     hostname: jobmanager
  7.     ports:
  8.       - "8081:8081"
  9.     command: jobmanager
  10.     environment:
  11.       - JOB_MANAGER_RPC_ADDRESS=jobmanager
  12.     volumes:
  13.       - /export/server/dockerflink:/example/data
  14.     networks:
  15.       - flink_network
  16.   taskmanager:
  17.     image: flink:latest
  18.     depends_on:
  19.       - jobmanager
  20.     ports:
  21.       - "8082-8085:8081"  # 确保端口范围足够
  22.     command: taskmanager
  23.     scale: 4  # 设置TaskManager的数量为4
  24.     environment:
  25.       - JOB_MANAGER_RPC_ADDRESS=jobmanager
  26.       - TASK_MANAGER_NUMBER_OF_TASK_SLOTS=4
  27.     networks:
  28.       - flink_network
  29. networks: #注意这里,要跟services配置项目对齐(即平级的意思)
  30.   flink_network:
  31.     driver: bridge
复制代码










免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

熊熊出没

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表