熊熊出没 发表于 2024-12-31 14:22:33

实验3-及时数据流处理-Flink

1.前期准备

(1)Flink底子环境安装

参考文章:
利用docker-compose来搭建flink集群-CSDN博客
显示为这样就乐成了
https://i-blog.csdnimg.cn/direct/ea4b738c342e45b5a75389c58500dffd.png
(2)把docker,docker-compose,kafka集群安装配置好

参考文章:

利用docker搭建kafka集群而且进行相应的实践-CSDN博客
这篇文章里面有另外两篇文章的链接,点进去就可以或许看到
(3)在windows上面,创建一个数据库mysql1(如果没有的话就须要创建),接着在这个数据库里面建一个表min_table

具体代码如下
create database if not exists mysql1; -- 解释符为‘-- '注意有个空格
use mysql1;
CREATE TABLE min_table (
    id INT AUTO_INCREMENT PRIMARY KEY,
    timestamp TIMESTAMP NOT NULL,
    quantity INT NOT NULL,
    amount DOUBLE NOT NULL,
    UNIQUE KEY unique_timestamp (timestamp)
);
create database if not exists mysql1; -- 注释符为‘-- '注意有个空格

use mysql1;

CREATE TABLE min_table (

    id INT AUTO_INCREMENT PRIMARY KEY,

    timestamp TIMESTAMP NOT NULL,

    quantity INT NOT NULL,

    amount DOUBLE NOT NULL,

    UNIQUE KEY unique_timestamp (timestamp)

); (4)接着在安装配置了flink的linux虚拟机上面安装好mysql

参考文章:黑马大数据学习笔记4-Hive部署和根本利用_黑马大数据 hive笔记-CSDN博客
 (5)然后同样的在linux虚拟机上面的mysql中创建一个数据库mysql1(如果没有的话就须要创建),接着在这个数据库里面建一个表min_table

具体代码如下
create database if not exists mysql1; -- 解释符为‘-- '注意有个空格
use mysql1;

CREATE TABLE min_table (
    id INT AUTO_INCREMENT PRIMARY KEY,
    timestamp TIMESTAMP NOT NULL,
    quantity INT NOT NULL,
    amount DOUBLE NOT NULL,
    UNIQUE KEY unique_timestamp (timestamp)
);
create database if not exists mysql1; -- 注释符为‘-- '注意有个空格

use mysql1;

CREATE TABLE min_table (

    id INT AUTO_INCREMENT PRIMARY KEY,

    timestamp TIMESTAMP NOT NULL,

    quantity INT NOT NULL,

    amount DOUBLE NOT NULL,

    UNIQUE KEY unique_timestamp (timestamp)

); https://i-blog.csdnimg.cn/direct/47da721868b3481c92a3846ac32bef65.png
(6)在idea里面新建一个Maven项目,名字叫做FlinkDemo然后往pom.xml中添加以下配置

<dependencies>
    <!-- Flink 的核心库 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.18.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>1.18.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>1.18.0</version>
    </dependency>

    <!-- Flink Kafka Connector -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>3.0.1-1.18</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-jdbc</artifactId>
      <version>3.1.1-1.17</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.33</version>
    </dependency>


</dependencies>
<build>
    <plugins>
      <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                  <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                  <phase>package</phase>
                  <goals>
                        <goal>single</goal>
                  </goals>
                </execution>
            </executions>
      </plugin>
    </plugins>
</build> 这个和上面的是一个东西,就看你喜欢一键复制照旧分别复制了
<dependencies>
    <!-- Flink 的核心库 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.18.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.18.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>1.18.0</version>
    </dependency>

    <!-- Flink Kafka Connector -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>3.0.1-1.18</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc</artifactId>
        <version>3.1.1-1.17</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.33</version>
    </dependency>


</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
(7)在该项目的com.examle目录下创建三个文件

     目录结构如下

https://i-blog.csdnimg.cn/direct/6fc9587f40cf4d8ca8c2cc23db2a1fbd.png
DatabaseSink.java

package com.example;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.types.Row;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;

import java.sql.PreparedStatement;
import java.sql.Timestamp;

public class DatabaseSink {
    private String url;
    private String username;
    private String password;

    public DatabaseSink(String url, String username, String password) {
      this.url = url;
      this.username = username;
      this.password = password;
    }

    public void addSink(DataStream<Tuple3<Timestamp, Long, Double>> stream) {
      stream.addSink(JdbcSink.sink(
                "INSERT INTO min_table (timestamp, quantity, amount) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE quantity = quantity + VALUES(quantity), amount = amount + VALUES(amount)",
                (ps, t) -> {
                  ps.setTimestamp(1, t.f0);
                  ps.setLong(2, t.f1);
                  ps.setDouble(3, t.f2);
                },
                new JdbcExecutionOptions.Builder()
                        .withBatchSize(5000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(this.url)
                        .withDriverName("com.mysql.jdbc.Driver")
                        .withUsername(this.username)
                        .withPassword(this.password)
                        .build()
      ));
    }
} LocalFlinkTest.java

package com.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

public class LocalFlinkTest {
    public static void main(String[] args) throws Exception {
      SimpleDateFormat sdf = new SimpleDateFormat(("yyyy-MM-dd HH:mm"));
      SimpleDateFormat sdf_hour = new SimpleDateFormat("yyyy-MM-dd HH");
      final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
      env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                333, // 尝试重启的次数
                org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // 延迟
      ));
      env.setRestartStrategy(RestartStrategies.noRestart());
      KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("192.168.88.101:19092,192.168.88.101:29092,192.168.88.101:39092") // 你的 Kafka 服务器地址
                .setGroupId("testGroup") // 你的消费者组 ID
                .setTopics("foo") // 你的主题
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.
                        LATEST)) // 从消费者组的最新偏移量开始消费
                .build();
      DataStream<String> stream = env.fromSource(source,
                WatermarkStrategy.noWatermarks(), "Kafka Source");
// flatMap 函数,它接收一个输入元素,并可以输出零个、一个或多个元素。
// 在这个函数中,输入元素是从 Kafka 中读取的一行数据,输出元素是一个包含交易量的元组。
// 近 1 分钟与当天累计的总交易金额、交易数量
//                DataStream<String> stream = env.readTextFile("D:\\idea\\flinkTest\\src\\main\\java\\com\\springbootdemo\\2.csv", "GBK");

      DataStream<Tuple3<Timestamp, Long, Double>> transactionVolumes = stream
                .filter(new FilterFunction<String>() {
                  @Override
                  public boolean filter(String value) throws Exception {
                        // 假设文件的第一行是表头,这里跳过它
                        return !value.startsWith("time");
                  }
                })
                .flatMap(new FlatMapFunction<String, Tuple3<Timestamp, Long,
                        Double>>() {


                  @Override
                  public void flatMap(String line, Collector<Tuple3<Timestamp, Long,
                            Double>> out) {

                        try {
                            String[] fields = line.split(",");
                            String s = fields;
// 解析时间字符串后,将日期时间对象的秒字段设置为 0
                            Date date = sdf.parse(s);
                            Timestamp sqlTimestamp = new Timestamp(date.getTime());
                            double price = Double.parseDouble(fields);
                            long quantity = Long.parseLong(fields);
                            double amount = price * quantity;
                            out.collect(Tuple3.of(sqlTimestamp, quantity, amount));
// System.out.println(line);
                        } catch (Exception e) {
                            System.out.println(line);                        }
                  }
                }); // 过滤掉解析失败的记录;

      // 计算每 500 毫秒的数据
// keyBy(t -> t.f0)代表以第一个字段 Timestamp 为键,确保一个窗口内的时间都是相同的
      DataStream<Tuple3<Timestamp,Long ,Double>> oneSecondAmounts =
                transactionVolumes
                        .keyBy(t -> t.f0)
                        .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)
                        ))
                        .reduce((Tuple3<Timestamp,Long ,Double> value1,
                                 Tuple3<Timestamp,Long ,Double> value2) -> {
//                            System.out.println(Tuple3.of(value1.f0,value1.f1 + value2.f1, value1.f2 + value2.f2));
                            return Tuple3.of(value1.f0,value1.f1 + value2.f1, value1.f2 +
                                    value2.f2);
                        });
      oneSecondAmounts.print();
      DatabaseSink dbSink = new DatabaseSink("jdbc:mysql://localhost:3306/mysql1", "root", "123456");
      dbSink.addSink(oneSecondAmounts);
      env.execute("Kafka Flink Demo");

    }
}
DatabaseSink dbSink = new DatabaseSink("jdbc:mysql://localhost:3306/mysql1", "root", "123456");

这里的密码应该改成你自己的。(固然博主本人的是123456)
FlinkTest.java

package com.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

public class FlinkTest {
    public static void main(String[] args) throws Exception {
      SimpleDateFormat sdf = new SimpleDateFormat(("yyyy-MM-dd HH:mm"));
      SimpleDateFormat sdf_hour = new SimpleDateFormat("yyyy-MM-dd HH");
      final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
      env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                333, // 尝试重启的次数
                org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // 延迟
      ));
      env.setRestartStrategy(RestartStrategies.noRestart());
      KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("192.168.88.101:19092,192.168.88.101:29092,192.168.88.101:39092") // 你的 Kafka 服务器地址
                .setGroupId("testGroup") // 你的消费者组 ID
                .setTopics("foo") // 你的主题
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.
                        LATEST)) // 从消费者组的最新偏移量开始消费
                .build();
      DataStream<String> stream = env.fromSource(source,
                WatermarkStrategy.noWatermarks(), "Kafka Source");
// flatMap 函数,它接收一个输入元素,并可以输出零个、一个或多个元素。
// 在这个函数中,输入元素是从 Kafka 中读取的一行数据,输出元素是一个包含交易量的元组。
// 近 1 分钟与当天累计的总交易金额、交易数量
//                DataStream<String> stream = env.readTextFile("D:\\idea\\flinkTest\\src\\main\\java\\com\\springbootdemo\\2.csv", "GBK");

      DataStream<Tuple3<Timestamp, Long, Double>> transactionVolumes = stream
                .filter(new FilterFunction<String>() {
                  @Override
                  public boolean filter(String value) throws Exception {
                        // 假设文件的第一行是表头,这里跳过它
                        return !value.startsWith("time");
                  }
                })
                .flatMap(new FlatMapFunction<String, Tuple3<Timestamp, Long,
                        Double>>() {


                  @Override
                  public void flatMap(String line, Collector<Tuple3<Timestamp, Long,
                            Double>> out) {

                        try {
                            String[] fields = line.split(",");
                            String s = fields;
// 解析时间字符串后,将日期时间对象的秒字段设置为 0
                            Date date = sdf.parse(s);
                            Timestamp sqlTimestamp = new Timestamp(date.getTime());
                            double price = Double.parseDouble(fields);
                            long quantity = Long.parseLong(fields);
                            double amount = price * quantity;
                            out.collect(Tuple3.of(sqlTimestamp, quantity, amount));
// System.out.println(line);
                        } catch (Exception e) {
                            System.out.println(line);                        }
                  }
                }); // 过滤掉解析失败的记录;

      // 计算每 500 毫秒的数据
// keyBy(t -> t.f0)代表以第一个字段 Timestamp 为键,确保一个窗口内的时间都是相同的
      DataStream<Tuple3<Timestamp,Long ,Double>> oneSecondAmounts =
                transactionVolumes
                        .keyBy(t -> t.f0)
                        .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)
                        ))
                        .reduce((Tuple3<Timestamp,Long ,Double> value1,
                                 Tuple3<Timestamp,Long ,Double> value2) -> {
//                            System.out.println(Tuple3.of(value1.f0,value1.f1 + value2.f1, value1.f2 + value2.f2));
                            return Tuple3.of(value1.f0,value1.f1 + value2.f1, value1.f2 +
                                    value2.f2);
                        });
      oneSecondAmounts.print();
      DatabaseSink dbSink = new DatabaseSink("jdbc:mysql://192.168.88.101:3306/mysql1", "root", "123456");
      dbSink.addSink(oneSecondAmounts);
      env.execute("Kafka Flink Demo");

    }
}
DatabaseSink dbSink = new DatabaseSink("jdbc:mysql://192.168.88.101:3306/mysql1", "root", "123456"); 这里的密码和主机号(192.168.88.101)应该改成你自己的密码和主机号
2.开始实验,分为当地测试和flink测试

(1)启动node1,打开Finalshell,启动docker,启动kafka集群,flink集群

systemctl start docker
cd /export/server
docker-compose -f kafka.yml up -d
docker-compose -f flink.yml up -d
docker ps 结果如下
https://i-blog.csdnimg.cn/direct/7fc9ae6976b141a7ad0c8810a7895908.png
https://i-blog.csdnimg.cn/direct/35bb007e1cac4548a9a058ea753d41c0.png
(2)先辈行当地测试(这里只须要用到kafka集群)

打开两个node1的窗口

在第二个窗口进入kafka2容器,启动消费者进程

代码
docker exec -it kafka2 /bin/bash cd /opt/bitnami/kafka/bin kafka-console-consumer.sh --bootstrap-server 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo  结果如下
https://i-blog.csdnimg.cn/direct/603c3cd2ab274909bed59a9c009e42e9.png
进入idea,运行这个文件LocalFlinkTest.java

https://i-blog.csdnimg.cn/direct/e7dcdf8d8a7044b099f9035425f2f188.png
在第一个窗口进入kafka1容器,发送文件的前5行

# docker exec -it kafka1 /bin/bash
root@a2f7152188c1:/#  cd /opt/bitnami/kafka/bin
root@a2f7152188c1:/opt/bitnami/kafka/bin# head -n 5 /bitnami/kafka/stock-part10.csv | kafka-console-producer.sh --broker-list 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo
root@a2f7152188c1:/opt/bitnami/kafka/bin#
代码
docker exec -it kafka1 /bin/bash cd /opt/bitnami/kafka/bin head -n 5 /bitnami/kafka/stock-part10.csv | kafka-console-producer.sh --broker-list 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo
https://i-blog.csdnimg.cn/direct/ad92a97a610240b0968e0cb6c2d83fc0.png
接着在idea里面查看

https://i-blog.csdnimg.cn/direct/1dba331ea5774ced95ed25f9e1e406f9.png
在mysql里查看

https://i-blog.csdnimg.cn/direct/02d5d349dcce4d19a046f00693fef5f2.png
到这里,当地测试就已经乐成了!

(3)再进行flink测试,先在idea这里双击packge,然后去target目录看看有没有多出这两个文件(先运行文件FlinkTest.java先)

运行文件FlinkTest.java

https://i-blog.csdnimg.cn/direct/f30db5c4cdba44679b3a4c44bedf3437.png
在idea这里双击packge,然后去target目录看看有没有多出这两个文件 

https://i-blog.csdnimg.cn/direct/53667f09356b416bb1ef305af9a78b96.png
https://i-blog.csdnimg.cn/direct/24b0412146e6444caeccfa1724a0f085.png
https://i-blog.csdnimg.cn/direct/a7d44c4935b140359fd7420291850019.png
进入网页node1:8081,上传这个名字更长的jar包

https://i-blog.csdnimg.cn/direct/59540e3a208649b2b5b7f95e83c8af31.png
https://i-blog.csdnimg.cn/direct/776339feb9aa48b680356856ad009a84.png
输入这个路径

D:\JetBrains\idea-project\FlinkDemo\target
(反正就是target目录的位置)

添加乐成后
https://i-blog.csdnimg.cn/direct/da3b3c6bc4ee4072a2f7da2c042b4603.png
点一下谁人玩意儿填入如下内容com.example.FlinkTest

这个com.example.FlinkTest是FlinkTest.java在项目中的路径
以及选择输入3

https://i-blog.csdnimg.cn/direct/d54fe4165e6f4a03b1a21440d65a03b9.png
https://i-blog.csdnimg.cn/direct/81a5024ea1d84536b8f3d59350295f50.png
然后点击submit提交即可,结果显示正常运行

https://i-blog.csdnimg.cn/direct/cf5fcaeb787647b1846f204fba97886e.png
再回到node1的第一个窗口,

在这个位置
root@41d3910fe6c9:/opt/bitnami/kafka/bin#输入以下代码(kafka1的/opt/bitnami/kafka/bin目录下)来发个文件已往

代码
cat /bitnami/kafka/stock-part10.csv | kafka-console-producer.sh --broker-list 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo 任意点开一个,在监控参数中选择numRecordsInPerSecond可以查看每秒处理数据速度。
https://i-blog.csdnimg.cn/direct/3d18fe2ff2164a32a0bbc4f1141e871e.png
https://i-blog.csdnimg.cn/direct/8ad09e07472d49b38c19fad1c88d8514.png

4.总结

启动

systemctl start docker cd /export/server
docker-compose -f kafka.yml up -d
docker-compose -f flink.yml up -d
docker ps cd /export/server
docker-compose -f kafka.yml up -d
docker-compose -f bigwork_flink.yml up -d
docker ps redis-server be_redis.conf

关闭

cd /export/server
docker-compose -f flink.yml down
docker-compose -f kafka.yml down
docker ps
cd /export/server
docker-compose -f bigwork_flink.yml down
docker-compose -f kafka.yml down
docker ps systemctl stop docker 可以新加一个bigwork_flink.yml,

cd /export/server/
vi bigwork_flink.yml
内容如下

version: '3'
services:
jobmanager:
    image: flink:latest
    container_name: jobmanager
    hostname: jobmanager
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    volumes:
      - /export/server/dockerflink:/example/data
    networks:
      - flink_network

taskmanager:
    image: flink:latest
    depends_on:
      - jobmanager
    ports:
      - "8082-8085:8081"# 确保端口范围足够
    command: taskmanager
    scale: 4# 设置TaskManager的数量为4
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
      - TASK_MANAGER_NUMBER_OF_TASK_SLOTS=4
    networks:
      - flink_network
networks: #注意这里,要跟services配置项目对齐(即平级的意思)
flink_network:
    driver: bridge









免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 实验3-及时数据流处理-Flink