基于flink,kafka和vue的大数据股票看板

打印 上一主题 下一主题

主题 850|帖子 850|积分 2550

在向flink的网页中提交一个任务,点击Show Plan出现如下错误:
 进入linux查看日记信息:
cat flink-fei-standalonesession-0-hadoop102.log.1 
2022-03-27 01:00:56,287 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [akka.tcp://flink@192.168.10.103:38496] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@192.168.10.103:38496]] Caused by: [java.net.ConnectException: 拒绝毗连: /192.168.10.103:38496]
2022-03-27 01:00:57,803 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [akka.tcp://flink@192.168.10.104:43792] has failed, address is now gated for [50] ms. Reason: [Disassociated] 
2022-03-27 01:00:57,804 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [akka.tcp://flink-metrics@192.168.10.104:43949] has failed, address is now gated for [50] ms. Reason: [Disassociated] 
2022-03-27 01:00:58,754 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2022-03-27 01:00:58,769 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting StandaloneSessionClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2022-03-27 01:00:58,778 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   [] - Shutting down rest endpoint.
2022-03-27 01:00:58,780 INFO  org.apache.flink.runtime.blob.BlobServer                     [] - Stopped BLOB server at 0.0.0.0:45143
2022-03-27 01:00:59,096 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Shutting down remote daemon.
2022-03-27 01:00:59,096 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remote daemon shut down; proceeding with flushing remote transports.
2022-03-27 01:00:59,097 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Shutting down remote daemon.
2022-03-27 01:00:59,115 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remote daemon shut down; proceeding with flushing remote transports.
2022-03-27 01:00:59,144 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remoting shut down.
2022-03-27 01:00:59,152 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remoting shut down.
 通过观察是拒绝毗连,回到代码中看到原来着实监听一个端口输入的数据造成的错误
        //从参数中提取主机名和端标语
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String hostName = parameterTool.get("host");
        Integer port = parameterTool.getInt("port");
 
        //读取文本流
        DataStreamSource<String> lineDataStream = env.socketTextStream(hostName, port);
 由于还是实验阶段,并没有真正的通过监听端口获取数据来计算,这里采用给定一个地点和端标语的方式来实现。即如下方式来实现:
DataStreamSource<String> lineDataStream = env.socketTextStream("hadoop102", 7777);
重新编译并打包,将达成的jar包上传到flink的网页中。
将该填的信息填好,点击Show Plan弹出下面的页面:
下一步点击Submit提交任务执行,又出现错误:
通过查看异常信息知道:

 出现上面异常的原因是在linux上还没有执行如下命令:
nc -lk 7777
 在hadoop102的虚拟机上执行上面的一行命令,之后重新提交一次任务

这下可以大概正常运行了,之后在linux中输入一行数据,测试程序是否好用。 
linux输入一行数据用“ ”隔开

 程序正在运行:
 在TaskManager中查看执行结果
 从上面可以看到任务着实在个节点上运行的,这也是符合集群运行的特点的。
                        
原文链接:https://blog.csdn.net/weixin_46005650/article/details/123767403
  1. * 交易平台排名服务类
  2. * 负责统计和排名各个交易平台的交易量
  3. * 使用Flink进行流式计算,并将结果存储到Redis中的有序集合(ZSET)
  4. * 支持实时更新平台交易量排名
  5. */
  6. package com.example.service;
  7. import com.example.Utils.Env;
  8. import com.example.entity.StockTrade;
  9. import jakarta.annotation.PostConstruct;
  10. import jakarta.annotation.Resource;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.apache.flink.api.common.functions.AggregateFunction;
  13. import org.apache.flink.api.common.functions.MapFunction;
  14. import org.apache.flink.api.java.tuple.Tuple2;
  15. import org.apache.flink.configuration.Configuration;
  16. import org.apache.flink.streaming.api.datastream.DataStream;
  17. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  18. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  19. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  20. import org.apache.flink.streaming.api.windowing.time.Time;
  21. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
  22. import org.springframework.stereotype.Component;
  23. import redis.clients.jedis.Jedis;
  24. @Component
  25. @Slf4j
  26. public class TopTenPlatform {
  27.     /** 环境配置依赖注入 */
  28.     @Resource
  29.     private Env env_;
  30.     /**
  31.      * 初始化方法,在服务启动时自动执行
  32.      * 在新线程中启动Flink作业
  33.      */
  34.     @PostConstruct
  35.     public void init() {
  36.         new Thread(() -> {
  37.             try {
  38.                 startFlinkJob();
  39.             } catch (Exception e) {
  40.                 log.error("Error starting Flink job", e);
  41.             }
  42.         }).start();
  43.     }
  44.     /**
  45.      * 启动Flink作业
  46.      * 设置数据流处理管道,包括数据转换、窗口操作和数据聚合
  47.      * @throws Exception 如果作业启动失败
  48.      */
  49.     /*
  50.     * 这段代码是一个Apache Flink的流处理作业,它的作用是分析股票交易数据,并统计每秒钟每个交易平台的交易量。以下是代码的详细解释:
  51. 1. **获取执行环境**:
  52.    ```java
  53.    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  54.    ```
  55.    这行代码获取了Flink的`StreamExecutionEnvironment`,它是执行Flink作业的入口点。
  56. 2. **获取数据流**:
  57.    ```java
  58.    DataStream<StockTrade> tradeStream = env_.getEnv(env);
  59.    ```
  60.    这行代码从`env_`对象(应该是一个你自己定义的类,包含Flink环境配置)获取一个`DataStream`,这个流包含了`StockTrade`类型的数据。
  61. 3. **记录日志**:
  62.    ```java
  63.    log.info("数据源接入");
  64.    ```
  65.    这行代码记录了一条日志信息,表明数据源已经接入。
  66. 4. **转换数据流**:
  67.    ```java
  68.    tradeStream
  69.        .map(new MapFunction<StockTrade, Tuple2<String, Long>>() {
  70.            @Override
  71.            public Tuple2<String, Long> map(StockTrade value) throws Exception {
  72.                return new Tuple2<>(value.getTradePlatform(), value.getTradeVolume());
  73.            }
  74.        })
  75.    ```
  76.    这部分代码将`DataStream<StockTrade>`转换为`DataStream<Tuple2<String, Long>>`。`map`函数将每个`StockTrade`对象转换为一个包含平台名称和交易量的二元组(`Tuple2`)。
  77. 5. **按键分组**:
  78.    ```java
  79.    .keyBy(value -> value.f0)
  80.    ```
  81.    这行代码按二元组的第一个元素(平台名称)对数据流进行分组,为接下来的窗口操作准备。
  82. 6. **开窗操作**:
  83.    ```java
  84.    .window(TumblingEventTimeWindows.of(Time.seconds(1)))
  85.    ```
  86.    这行代码应用了一个基于事件时间的滚动窗口,窗口大小为1秒。这意味着每1秒钟的数据会被分到一个窗口中进行处理。
  87. 7. **聚合操作**:
  88.    ```java
  89.    .aggregate(new TradeAggregator())
  90.    ```
  91.    这行代码应用了一个自定义的聚合函数`TradeAggregator`,它将对每个窗口内的数据进行聚合操作,比如累加每个交易平台的交易量。
  92. 8. **添加Sink**:
  93.    ```java
  94.    .addSink(new AccumulatingRedisSink(redisConfig));
  95.    ```
  96.    这行代码将聚合结果输出到Redis数据库。`AccumulatingRedisSink`是一个自定义的Sink,它负责将聚合结果持久化到Redis。
  97. 9. **执行作业**:
  98.    ```java
  99.    System.out.println("Top 10 stock trade volume analysis started");
  100.    env.execute("Stock Trade Analysis");
  101.    ```
  102.    这两行代码启动了Flink作业。`env.execute`方法会根据提供的作业名称("Stock Trade Analysis")执行定义好的流处理作业。
  103. 总结来说,这段代码定义了一个Flink流处理作业,它从某个数据源获取股票交易数据,将数据转换为平台名称和交易量的二元组,然后按平台名称分组,对每组数据应用1秒的滚动窗口,聚合每个窗口内的交易量,并将结果输出到Redis数据库。这个作业可以帮助分析每个交易平台每秒钟的交易量。
  104.     * */
  105.     public void startFlinkJob() throws Exception {
  106.         // 获取Flink的流执行环境,这是执行Flink作业的配置环境
  107.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  108.         // 从env_对象获取DataStream,该对象是StockTrade类型的数据流
  109.         DataStream<StockTrade> tradeStream = env_.getEnv(env);
  110.         // 记录日志信息,表明数据源已经接入
  111.         log.info("数据源接入");
  112.         // 对DataStream进行处理,1小时窗口的操作
  113.         tradeStream
  114.                 // 将StockTrade对象转换为平台名称和交易量的二元组
  115.                 .map(new MapFunction<StockTrade, Tuple2<String, Long>>() {
  116.                     /**
  117.                      * 将StockTrade对象转换为平台名称和交易量的二元组
  118.                      * @param value 股票交易数据
  119.                      * @return 包含平台名称和交易量的二元组
  120.                      */
  121.                     @Override
  122.                     public Tuple2<String, Long> map(StockTrade value) throws Exception {
  123.                         // 返回一个包含平台名称和交易量的二元组
  124.                         return new Tuple2<>(value.getTradePlatform(), value.getTradeVolume());
  125.                     }
  126.                 })
  127.                 // 按照平台名称(二元组的第一个元素)对数据进行分组
  128.                 .keyBy(value -> value.f0)
  129.                 // 应用1秒的滚动事件时间窗口
  130.                 .window(TumblingEventTimeWindows.of(Time.seconds(1)))
  131.                 // 对每个窗口内的数据进行聚合操作,使用自定义的TradeAggregator聚合函数
  132.                 .aggregate(new TTPLTradeAggregator())
  133.                 // 将聚合结果输出到Redis数据库,使用自定义的AccumulatingRedisSink作为Sink
  134.                 .addSink(new TTPLAccumulatingRedisSink(redisConfig));
  135.         // 在控制台打印日志,表明作业开始执行
  136.         System.out.println("Top 10 stock trade volume analysis started");
  137.         // 执行Flink作业,作业名为"Stock Trade Analysis"
  138.         env.execute("Stock Trade Analysis");
  139.     }
  140.     /** Redis连接配置 */
  141.     public FlinkJedisPoolConfig redisConfig = new FlinkJedisPoolConfig.Builder()
  142.             .setHost("localhost")
  143.             .setPort(6379)
  144.             .build();
  145.     /**
  146.      * Redis数据写入Sink
  147.      * 负责将平台交易量数据持久化到Redis的有序集合中
  148.      * 使用ZSET数据结构实现自动排序
  149.      */
  150.     public static class TTPLAccumulatingRedisSink extends RichSinkFunction<Tuple2<String, Long>> {
  151.         /** Redis客户端实例 */
  152.         private transient Jedis jedis;
  153.         /** Redis连接配置 */
  154.         private final FlinkJedisPoolConfig config;
  155.         /**
  156.          * 构造函数
  157.          * @param config Redis连接配置
  158.          */
  159.         public TTPLAccumulatingRedisSink(FlinkJedisPoolConfig config) {
  160.             this.config = config;
  161.         }
  162.         /**
  163.          * 初始化Redis连接
  164.          * @param parameters 配置参数
  165.          */
  166.         @Override
  167.         public void open(Configuration parameters) throws Exception {
  168.             super.open(parameters);
  169.             jedis = new Jedis(config.getHost(), config.getPort());
  170.         }
  171.         /**
  172.          * 关闭Redis连接
  173.          */
  174.         @Override
  175.         public void close() throws Exception {
  176.             if (jedis != null) {
  177.                 jedis.close();
  178.             }
  179.             super.close();
  180.         }
  181.         /**
  182.          * 处理每条平台交易量数据
  183.          * 将数据累加到Redis的有序集合中
  184.          * @param value 平台交易量数据
  185.          * @param context 上下文
  186.          */
  187.         @Override
  188.         public void invoke(Tuple2<String, Long> value, Context context) throws Exception {
  189.             String hashKey = "stock_trade_platform"; // 存储股票交易量的键
  190.             String field = value.f0; // 使用股票平台作为字段
  191.             // 检查键的类型
  192.             String type = jedis.type(hashKey);
  193.             if (!"zset".equals(type)) {
  194.                 // 如果键不存在或类型不正确,删除旧的键
  195.                 jedis.del(hashKey);
  196.                 // 创建新的有序集合
  197.                 jedis.zadd(hashKey, 0, field); // 初始化一个空的有序集合
  198.             }
  199.             // 从 Redis 中获取现有值
  200.             Double existingVolume = jedis.zscore(hashKey, field);
  201.             if (existingVolume == null) {
  202.                 existingVolume = (double) 0L;
  203.             }
  204.             // 更新交易量
  205.             double updatedVolume = existingVolume + value.f1;
  206.             // 将更新后的值写回 Redis
  207.             jedis.zadd(hashKey, updatedVolume, field);
  208.             final long delayInterval = 100;
  209.             Thread.sleep(delayInterval);
  210.         }
  211.     }
  212.     /**
  213.      * 交易量聚合器
  214.      * 实现AggregateFunction接口,用于聚合计算平台交易量
  215.      */
  216.     public static class TTPLTradeAggregator implements AggregateFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple2<String, Long>> {
  217.         /**
  218.          * 创建新的累加器
  219.          * @return 空的累加器
  220.          */
  221.         @Override
  222.         public Tuple2<String, Long> createAccumulator() {
  223.             return new Tuple2<>("", 0L);
  224.         }
  225.         /**
  226.          * 将新的交易数据添加到累加器中
  227.          * @param value 新的交易数据
  228.          * @param accumulator 当前累加器
  229.          * @return 更新后的累加器
  230.          */
  231.         @Override
  232.         public Tuple2<String, Long> add(Tuple2<String, Long> value, Tuple2<String, Long> accumulator) {
  233.             String stockCode = value.f0;
  234.             Long totalAmount = value.f1 + accumulator.f1;
  235.             return new Tuple2<>(stockCode, totalAmount);
  236.         }
  237.         /**
  238.          * 获取聚合结果
  239.          * @param accumulator 累加器
  240.          * @return 聚合结果
  241.          */
  242.         @Override
  243.         public Tuple2<String, Long> getResult(Tuple2<String, Long> accumulator) {
  244.             return accumulator;
  245.         }
  246.         /**
  247.          * 合并两个累加器
  248.          * @param a 第一个累加器
  249.          * @param b 第二个累加器
  250.          * @return 合并后的累加器
  251.          */
  252.         @Override
  253.         public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) {
  254.             String stockCode = a.f0; // 假设两个 accumulator 的股票代码相同
  255.             Long totalAmount = a.f1 + b.f1;
  256.             return new Tuple2<>(stockCode, totalAmount);
  257.         }
  258.     }
  259. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

渣渣兔

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表