在向flink的网页中提交一个任务,点击Show Plan出现如下错误:
进入linux查看日记信息:
cat flink-fei-standalonesession-0-hadoop102.log.1
2022-03-27 01:00:56,287 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [akka.tcp://flink@192.168.10.103:38496] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@192.168.10.103:38496]] Caused by: [java.net.ConnectException: 拒绝毗连: /192.168.10.103:38496]
2022-03-27 01:00:57,803 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [akka.tcp://flink@192.168.10.104:43792] has failed, address is now gated for [50] ms. Reason: [Disassociated]
2022-03-27 01:00:57,804 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [akka.tcp://flink-metrics@192.168.10.104:43949] has failed, address is now gated for [50] ms. Reason: [Disassociated]
2022-03-27 01:00:58,754 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2022-03-27 01:00:58,769 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StandaloneSessionClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2022-03-27 01:00:58,778 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [] - Shutting down rest endpoint.
2022-03-27 01:00:58,780 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:45143
2022-03-27 01:00:59,096 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
2022-03-27 01:00:59,096 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
2022-03-27 01:00:59,097 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
2022-03-27 01:00:59,115 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
2022-03-27 01:00:59,144 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut down.
2022-03-27 01:00:59,152 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut down.
通过观察是拒绝毗连,回到代码中看到原来着实监听一个端口输入的数据造成的错误
//从参数中提取主机名和端标语
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String hostName = parameterTool.get("host");
Integer port = parameterTool.getInt("port");
//读取文本流
DataStreamSource<String> lineDataStream = env.socketTextStream(hostName, port);
由于还是实验阶段,并没有真正的通过监听端口获取数据来计算,这里采用给定一个地点和端标语的方式来实现。即如下方式来实现:
DataStreamSource<String> lineDataStream = env.socketTextStream("hadoop102", 7777);
重新编译并打包,将达成的jar包上传到flink的网页中。
将该填的信息填好,点击Show Plan弹出下面的页面:
下一步点击Submit提交任务执行,又出现错误:
通过查看异常信息知道:
出现上面异常的原因是在linux上还没有执行如下命令:
nc -lk 7777
在hadoop102的虚拟机上执行上面的一行命令,之后重新提交一次任务
这下可以大概正常运行了,之后在linux中输入一行数据,测试程序是否好用。
linux输入一行数据用“ ”隔开
程序正在运行:
在TaskManager中查看执行结果
从上面可以看到任务着实在个节点上运行的,这也是符合集群运行的特点的。
原文链接:https://blog.csdn.net/weixin_46005650/article/details/123767403
- * 交易平台排名服务类
- * 负责统计和排名各个交易平台的交易量
- * 使用Flink进行流式计算,并将结果存储到Redis中的有序集合(ZSET)
- * 支持实时更新平台交易量排名
- */
- package com.example.service;
- import com.example.Utils.Env;
- import com.example.entity.StockTrade;
- import jakarta.annotation.PostConstruct;
- import jakarta.annotation.Resource;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.flink.api.common.functions.AggregateFunction;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
- import org.springframework.stereotype.Component;
- import redis.clients.jedis.Jedis;
- @Component
- @Slf4j
- public class TopTenPlatform {
- /** 环境配置依赖注入 */
- @Resource
- private Env env_;
- /**
- * 初始化方法,在服务启动时自动执行
- * 在新线程中启动Flink作业
- */
- @PostConstruct
- public void init() {
- new Thread(() -> {
- try {
- startFlinkJob();
- } catch (Exception e) {
- log.error("Error starting Flink job", e);
- }
- }).start();
- }
- /**
- * 启动Flink作业
- * 设置数据流处理管道,包括数据转换、窗口操作和数据聚合
- * @throws Exception 如果作业启动失败
- */
- /*
- * 这段代码是一个Apache Flink的流处理作业,它的作用是分析股票交易数据,并统计每秒钟每个交易平台的交易量。以下是代码的详细解释:
- 1. **获取执行环境**:
- ```java
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- ```
- 这行代码获取了Flink的`StreamExecutionEnvironment`,它是执行Flink作业的入口点。
- 2. **获取数据流**:
- ```java
- DataStream<StockTrade> tradeStream = env_.getEnv(env);
- ```
- 这行代码从`env_`对象(应该是一个你自己定义的类,包含Flink环境配置)获取一个`DataStream`,这个流包含了`StockTrade`类型的数据。
- 3. **记录日志**:
- ```java
- log.info("数据源接入");
- ```
- 这行代码记录了一条日志信息,表明数据源已经接入。
- 4. **转换数据流**:
- ```java
- tradeStream
- .map(new MapFunction<StockTrade, Tuple2<String, Long>>() {
- @Override
- public Tuple2<String, Long> map(StockTrade value) throws Exception {
- return new Tuple2<>(value.getTradePlatform(), value.getTradeVolume());
- }
- })
- ```
- 这部分代码将`DataStream<StockTrade>`转换为`DataStream<Tuple2<String, Long>>`。`map`函数将每个`StockTrade`对象转换为一个包含平台名称和交易量的二元组(`Tuple2`)。
- 5. **按键分组**:
- ```java
- .keyBy(value -> value.f0)
- ```
- 这行代码按二元组的第一个元素(平台名称)对数据流进行分组,为接下来的窗口操作准备。
- 6. **开窗操作**:
- ```java
- .window(TumblingEventTimeWindows.of(Time.seconds(1)))
- ```
- 这行代码应用了一个基于事件时间的滚动窗口,窗口大小为1秒。这意味着每1秒钟的数据会被分到一个窗口中进行处理。
- 7. **聚合操作**:
- ```java
- .aggregate(new TradeAggregator())
- ```
- 这行代码应用了一个自定义的聚合函数`TradeAggregator`,它将对每个窗口内的数据进行聚合操作,比如累加每个交易平台的交易量。
- 8. **添加Sink**:
- ```java
- .addSink(new AccumulatingRedisSink(redisConfig));
- ```
- 这行代码将聚合结果输出到Redis数据库。`AccumulatingRedisSink`是一个自定义的Sink,它负责将聚合结果持久化到Redis。
- 9. **执行作业**:
- ```java
- System.out.println("Top 10 stock trade volume analysis started");
- env.execute("Stock Trade Analysis");
- ```
- 这两行代码启动了Flink作业。`env.execute`方法会根据提供的作业名称("Stock Trade Analysis")执行定义好的流处理作业。
- 总结来说,这段代码定义了一个Flink流处理作业,它从某个数据源获取股票交易数据,将数据转换为平台名称和交易量的二元组,然后按平台名称分组,对每组数据应用1秒的滚动窗口,聚合每个窗口内的交易量,并将结果输出到Redis数据库。这个作业可以帮助分析每个交易平台每秒钟的交易量。
- * */
- public void startFlinkJob() throws Exception {
- // 获取Flink的流执行环境,这是执行Flink作业的配置环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // 从env_对象获取DataStream,该对象是StockTrade类型的数据流
- DataStream<StockTrade> tradeStream = env_.getEnv(env);
- // 记录日志信息,表明数据源已经接入
- log.info("数据源接入");
- // 对DataStream进行处理,1小时窗口的操作
- tradeStream
- // 将StockTrade对象转换为平台名称和交易量的二元组
- .map(new MapFunction<StockTrade, Tuple2<String, Long>>() {
- /**
- * 将StockTrade对象转换为平台名称和交易量的二元组
- * @param value 股票交易数据
- * @return 包含平台名称和交易量的二元组
- */
- @Override
- public Tuple2<String, Long> map(StockTrade value) throws Exception {
- // 返回一个包含平台名称和交易量的二元组
- return new Tuple2<>(value.getTradePlatform(), value.getTradeVolume());
- }
- })
- // 按照平台名称(二元组的第一个元素)对数据进行分组
- .keyBy(value -> value.f0)
- // 应用1秒的滚动事件时间窗口
- .window(TumblingEventTimeWindows.of(Time.seconds(1)))
- // 对每个窗口内的数据进行聚合操作,使用自定义的TradeAggregator聚合函数
- .aggregate(new TTPLTradeAggregator())
- // 将聚合结果输出到Redis数据库,使用自定义的AccumulatingRedisSink作为Sink
- .addSink(new TTPLAccumulatingRedisSink(redisConfig));
- // 在控制台打印日志,表明作业开始执行
- System.out.println("Top 10 stock trade volume analysis started");
- // 执行Flink作业,作业名为"Stock Trade Analysis"
- env.execute("Stock Trade Analysis");
- }
- /** Redis连接配置 */
- public FlinkJedisPoolConfig redisConfig = new FlinkJedisPoolConfig.Builder()
- .setHost("localhost")
- .setPort(6379)
- .build();
- /**
- * Redis数据写入Sink
- * 负责将平台交易量数据持久化到Redis的有序集合中
- * 使用ZSET数据结构实现自动排序
- */
- public static class TTPLAccumulatingRedisSink extends RichSinkFunction<Tuple2<String, Long>> {
- /** Redis客户端实例 */
- private transient Jedis jedis;
- /** Redis连接配置 */
- private final FlinkJedisPoolConfig config;
- /**
- * 构造函数
- * @param config Redis连接配置
- */
- public TTPLAccumulatingRedisSink(FlinkJedisPoolConfig config) {
- this.config = config;
- }
- /**
- * 初始化Redis连接
- * @param parameters 配置参数
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- jedis = new Jedis(config.getHost(), config.getPort());
- }
- /**
- * 关闭Redis连接
- */
- @Override
- public void close() throws Exception {
- if (jedis != null) {
- jedis.close();
- }
- super.close();
- }
- /**
- * 处理每条平台交易量数据
- * 将数据累加到Redis的有序集合中
- * @param value 平台交易量数据
- * @param context 上下文
- */
- @Override
- public void invoke(Tuple2<String, Long> value, Context context) throws Exception {
- String hashKey = "stock_trade_platform"; // 存储股票交易量的键
- String field = value.f0; // 使用股票平台作为字段
- // 检查键的类型
- String type = jedis.type(hashKey);
- if (!"zset".equals(type)) {
- // 如果键不存在或类型不正确,删除旧的键
- jedis.del(hashKey);
- // 创建新的有序集合
- jedis.zadd(hashKey, 0, field); // 初始化一个空的有序集合
- }
- // 从 Redis 中获取现有值
- Double existingVolume = jedis.zscore(hashKey, field);
- if (existingVolume == null) {
- existingVolume = (double) 0L;
- }
- // 更新交易量
- double updatedVolume = existingVolume + value.f1;
- // 将更新后的值写回 Redis
- jedis.zadd(hashKey, updatedVolume, field);
- final long delayInterval = 100;
- Thread.sleep(delayInterval);
- }
- }
- /**
- * 交易量聚合器
- * 实现AggregateFunction接口,用于聚合计算平台交易量
- */
- public static class TTPLTradeAggregator implements AggregateFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple2<String, Long>> {
- /**
- * 创建新的累加器
- * @return 空的累加器
- */
- @Override
- public Tuple2<String, Long> createAccumulator() {
- return new Tuple2<>("", 0L);
- }
- /**
- * 将新的交易数据添加到累加器中
- * @param value 新的交易数据
- * @param accumulator 当前累加器
- * @return 更新后的累加器
- */
- @Override
- public Tuple2<String, Long> add(Tuple2<String, Long> value, Tuple2<String, Long> accumulator) {
- String stockCode = value.f0;
- Long totalAmount = value.f1 + accumulator.f1;
- return new Tuple2<>(stockCode, totalAmount);
- }
- /**
- * 获取聚合结果
- * @param accumulator 累加器
- * @return 聚合结果
- */
- @Override
- public Tuple2<String, Long> getResult(Tuple2<String, Long> accumulator) {
- return accumulator;
- }
- /**
- * 合并两个累加器
- * @param a 第一个累加器
- * @param b 第二个累加器
- * @return 合并后的累加器
- */
- @Override
- public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) {
- String stockCode = a.f0; // 假设两个 accumulator 的股票代码相同
- Long totalAmount = a.f1 + b.f1;
- return new Tuple2<>(stockCode, totalAmount);
- }
- }
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |