使用Flink进行股票盘算

打印 上一主题 下一主题

主题 651|帖子 651|积分 1953

使用Flink进行股票盘算


  
股票均匀代价盘算

代码详解

这四个类一起实现了对股票均匀代价的盘算处理,并展示了Apache Flink在实时流处理中的应用。
类名功能Stock表示股票交易数据的实体类StockStream创建流处理情况,读取股票交易数据,并盘算每分钟股票均匀代价的类ResultWindowFunction汇总每分钟内股票均匀代价的盘算结果,并输出为字符串的类AvgStockAggregateFunction盘算每个股票代码在每分钟内的均匀交易代价的类 Stock

股票代价的实体类
根据数据来看:
US2.AAPL,20200108,093003,297.260000000,100
可以分析为
股票名字交易日期交易时间股票代价股票数量US2.AAPL20200108093003297.260000000100 那么定义一个股票实体类
  1. package com.lfl.bigwork;
  2. /**
  3. * @author 叶星痕
  4. * @data 2024/6/14 上午8:56
  5. * 文件名 : Stock
  6. * 描述 : 股票实体类
  7. */
  8. public class Stock {
  9.     private String stockCode; // 股票的唯一代码或符号
  10.     private String tradeDate; // 交易发生的日期
  11.     private String tradeTime; // 交易发生的具体时间
  12.     private double price;     // 交易时的股票价格
  13.     private int volume;       // 交易的股票数量
  14.     public Stock() {
  15.     }
  16.     public Stock(String stockCode, String tradeDate, String tradeTime, double price, int volume) {
  17.         this.stockCode = stockCode;
  18.         this.tradeDate = tradeDate;
  19.         this.tradeTime = tradeTime;
  20.         this.price = price;
  21.         this.volume = volume;
  22.     }
  23.     public String getStockCode() {
  24.         return stockCode;
  25.     }
  26.     public void setStockCode(String stockCode) {
  27.         this.stockCode = stockCode;
  28.     }
  29.     public String getTradeDate() {
  30.         return tradeDate;
  31.     }
  32.     public void setTradeDate(String tradeDate) {
  33.         this.tradeDate = tradeDate;
  34.     }
  35.     public String getTradeTime() {
  36.         return tradeTime;
  37.     }
  38.     public void setTradeTime(String tradeTime) {
  39.         this.tradeTime = tradeTime;
  40.     }
  41.     public double getPrice() {
  42.         return price;
  43.     }
  44.     public void setPrice(double price) {
  45.         this.price = price;
  46.     }
  47.     public int getVolume() {
  48.         return volume;
  49.     }
  50.     public void setVolume(int volume) {
  51.         this.volume = volume;
  52.     }
  53.     @Override
  54.     public String toString() {
  55.         return "Stock{" +
  56.                 "stockCode='" + stockCode + '\'' +
  57.                 ", tradeDate='" + tradeDate + '\'' +
  58.                 ", tradeTime='" + tradeTime + '\'' +
  59.                 ", price=" + price +
  60.                 ", volume=" + volume +
  61.                 '}';
  62.     }
  63. }
复制代码
StockStream

首先,定义了一个名为 StockStream 的类,并在该类中定义了一个 main 方法作为程序的入口点。
在 main 方法中,首先获取了一个 StreamExecutionEnvironment 对象,这是 Flink 流处理的上下文情况。
然后,使用 readTextFile 方法从一个名为 “input/stock.txt” 的文本文件中读取数据,返回一个 DataStreamSource<String> 对象。
接着,使用 map 方法将每一行字符串转换为 Stock 对象,并使用 assignTimestampsAndWatermarks 方法为每个事件分配时间戳和水印。
然后,使用 keyBy 方法按照股票代码进行分组,返回一个 KeyedStream<Stock, String> 对象。
接着,使用 window 方法定义了一个滚动窗口,窗口巨细为 60 秒,然后使用 aggregate 方法聚合窗口内的数据,这里传入了 AvgStockAggregateFunction 和 ResultWindowFunction 两个函数,前者用于盘算均匀代价,后者用于格式化输出。
最后,使用 print 方法将结果打印到控制台,然后调用 StreamExecutionEnvironment 的 execute 方法启动流处理。
parseTimeToSeconds 是一个辅助方法,用于将形式为 “HHmmss” 的时间字符串解析为一天中的秒数。
这个程序的主要目的是读取股票数据,按照股票代码进行分组,然后在每个 60 秒的窗口内盘算每种股票的均匀代价,并打印到控制台。

  • StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();:获取 Flink 流处理的情况。
  • env.setParallelism(1);:这是设置并行度为 1,设置在一个线程中大概后续输出到一个同一个文件中。
  • DataStreamSource<String> source = env.readTextFile("input/stock.txt");
    :这是从一个名为 “input/stock.txt” 的文本文件中读取数据,返回一个 DataStreamSource<String> 对象。每一行字符串就是一个数据项。
  • DataStream<Stock> stockStream = source.map(...):这是将每一行字符串转换为 Stock 对象。转换的逻辑是先将字符串按照逗号分割,然后取出各个字段的值,创建 Stock 对象。
  • .assignTimestampsAndWatermarks(WatermarkStrategy.<Stock>forMonotonousTimestamps()...):这是为每个事件分配时间戳和水印。时间戳是事件的发生时间,水印是用于处理事件时间乱序的机制。
  • KeyedStream<Stock, String> keyedStream = stockStream.keyBy((KeySelector<Stock, String>) Stock::getStockCode);:这是按照股票代码进行分组,返回一个 KeyedStream<Stock, String> 对象。每个股票代码对应的所有 Stock 对象会被分到同一个组。
  • SingleOutputStreamOperator<String> result = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(60)))...:这是定义了一个滚动窗口,窗口巨细为 60 秒,然后聚合窗口内的数据。这里传入了 AvgStockAggregateFunction 和 ResultWindowFunction 两个函数,前者用于盘算均匀代价,后者用于格式化输出。
  • result.print();:将结果打印到控制台。
  • result.writeAsText("output/avgStock.txt"):将均匀代价写入到avgStock.txt文件内。
  • private static long parseTimeToSeconds(String time) {...}:这是一个辅助方法,用于将形式为 “HHmmss” 的时间字符串解析为一天中的秒数。这个方法被assignTimestampsAndWatermarks 方法调用,用于分配时间戳。
  1. package com.lfl.bigwork;
  2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  3. import org.apache.flink.api.common.functions.MapFunction;
  4. import org.apache.flink.api.java.functions.KeySelector;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.datastream.KeyedStream;
  8. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  11. import org.apache.flink.streaming.api.windowing.time.Time;
  12. /**
  13. * @author 叶星痕
  14. * @data 2024/6/14 下午12:21
  15. * 文件名 : ResultWindowFunction
  16. * 描述 : 主要的执行程序
  17. */
  18. public class StockStream {
  19.     public static void main(String[] args) throws Exception {
  20.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  21.         env.setParallelism(1);
  22.         DataStreamSource<String> source = env.readTextFile("input/stock.txt");
  23.         DataStream<Stock> stockStream = source.map((MapFunction<String, Stock>) value -> {
  24.             String[] fields = value.split(",");
  25.             if (fields.length < 5) {
  26.                 throw new RuntimeException("Invalid input data: " + value);
  27.             }
  28.             return new Stock(fields[0], fields[1], fields[2], Double.parseDouble(fields[3]), Integer.parseInt(fields[4]));
  29.         }).assignTimestampsAndWatermarks(WatermarkStrategy
  30.                 .<Stock>forMonotonousTimestamps()
  31.                 .withTimestampAssigner((event, timestamp) -> parseTimeToSeconds(event.getTradeTime())));
  32.         KeyedStream<Stock, String> keyedStream = stockStream.keyBy((KeySelector<Stock, String>) Stock::getStockCode);
  33.         SingleOutputStreamOperator<String> result = keyedStream
  34.                 .window(TumblingEventTimeWindows.of(Time.seconds(60)))
  35.                 .aggregate(new AvgStockAggregateFunction(), new ResultWindowFunction());
  36.         result.print();
  37.         result.writeAsText("output/avgStock.txt");
  38.         env.execute();
  39.     }
  40.     //将形式为“HHmmss”的时间字符串解析为一天中的秒
  41.     private static long parseTimeToSeconds(String time) {
  42.         int hours = Integer.parseInt(time.substring(0, 2));
  43.         int minutes = Integer.parseInt(time.substring(2, 4));
  44.         int seconds = Integer.parseInt(time.substring(4, 6));
  45.         return (hours * 3600L + minutes * 60L + seconds) * 1000L;
  46.     }
  47. }
复制代码
ResultWindowFunction

定义了一个名为 ResultWindowFunction 的类,这个类实现了 WindowFunction 接口。
接口的四个类型参数分别代表:
输入类型输出类型键类型窗口类型DoubleStringStringTimeWindow 在这个类中,实现了 apply 方法,这是窗口函数的核心方法,它会在每个窗口结束时被调用。方法参数包罗:
键窗口输入数据输出网络器swindowinput (Double 类型迭代器)out 在 apply 方法中,首先获取了输入数据的第一个元素作为均匀代价,然后通过输出网络器 out 发出一个格式化的字符串,这个字符串包罗了股票代码宁静均代价。
  1. package com.lfl.bigwork;
  2. import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
  3. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  4. import org.apache.flink.util.Collector;
  5. /**
  6. * @author 叶星痕
  7. * @data 2024/6/14 下午12:20
  8. * 文件名 : ResultWindowFunction
  9. * 描述 : 更好的输出
  10. */
  11. public class ResultWindowFunction implements WindowFunction<Double, String, String, TimeWindow> {
  12.     @Override
  13.     public void apply(String s, TimeWindow window, Iterable<Double> input, Collector<String> out)  {
  14.         Double averagePrice = input.iterator().next();
  15.         out.collect("股票代码:" + s + ",平均价格:" + averagePrice);
  16.     }
  17. }
复制代码
AvgStockAggregateFunction

这个 代码定义了一个名为 AvgStockAggregateFunction 的聚合函数,该函数用于盘算股票的均匀代价。
首先,这个类实现了 AggregateFunction 接口,这个接口有三个类型参数:
输入类型累加器类型输出类型StockTuple2<Double, Integer>Double 在这个类中,实现了 AggregateFunction 接口的五个方法:

  • createAccumulator():创建一个新的累加器,这里是一个包罗两个元素的元组,第一个元素是总代价(初始化为 0.0),第二个元素是总数量(初始化为 0)。
  • add(Stock value, Tuple2<Double, Integer> accumulator):将输入数据添加到累加器,这里是将股票的代价乘以数量加到总代价上,将股票的数量加到总数量上。
  • getResult(Tuple2<Double, Integer> accumulator):从累加器获取结果,这里是盘算均匀代价,即总代价除以总数量。
  • merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b):归并两个累加器,这里是将两个累加器的总代价和总数量分别相加。
  1. package com.lfl.bigwork;
  2. import org.apache.flink.api.common.functions.AggregateFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. /**
  5. * @author 叶星痕
  6. * @data 2024/6/14 下午12:20
  7. * 文件名 : ResultWindowFunction
  8. * 描述 : AggregateFunction的实现
  9. */
  10. public  class AvgStockAggregateFunction implements AggregateFunction<Stock, Tuple2<Double, Integer>, Double> {
  11.     @Override
  12.     public Tuple2<Double, Integer> createAccumulator() {
  13.         return Tuple2.of(0.0, 0);
  14.     }
  15.     @Override
  16.     public Tuple2<Double, Integer> add(Stock value, Tuple2<Double, Integer> accumulator) {
  17.         return Tuple2.of(accumulator.f0 + value.getPrice() * value.getVolume(), accumulator.f1 + value.getVolume());
  18.     }
  19.     @Override
  20.     public Double getResult(Tuple2<Double, Integer> accumulator) {
  21.         return accumulator.f0 / accumulator.f1;
  22.     }
  23.     @Override
  24.     public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
  25.         return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
  26.     }
  27. }
复制代码
流程文档

本文主要介绍怎样使用Apache Flink的AggregateFunction接口实现对股票均匀代价的盘算。整个流程包罗读取股票交易数据,对数据进行处理和聚合,最后输出每个股票代码在每分钟内的均匀交易代价。
1. 创建流处理情况

首先,我们需要创建一个StreamExecutionEnvironment对象,这是所有Flink程序的基础。然后,设置并行度为1,表示程序的并行级别。
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
复制代码
2. 读取数据

使用env.readTextFile方法从文本文件中读取数据。这里我们假设数据文件名为"input/stock.txt"。
  1. DataStreamSource<String> source = env.readTextFile("input/stock.txt");
复制代码
3. 数据处理

接着,使用map函数将读取的每行数据转换为Stock对象,并使用assignTimestampsAndWatermarks方法分配时间戳和水印。其中,我们使用了WatermarkStrategy.forMonotonousTimestamps策略,表示事件时间戳是单调递增的。
  1. DataStream<Stock> stockStream = source.map(value -> {
  2.     String[] fields = value.split(",");
  3.     if (fields.length < 5) {
  4.         throw new RuntimeException("Invalid input data: " + value);
  5.     }
  6.     return new Stock(fields[0], fields[1], fields[2], Double.parseDouble(fields[3]), Integer.parseInt(fields[4]));
  7. }).assignTimestampsAndWatermarks(WatermarkStrategy
  8.         .<Stock>forMonotonousTimestamps()
  9.         .withTimestampAssigner((event, timestamp) -> parseTimeToSeconds(event.getTradeTime())));
复制代码
4. 数据分区

使用keyBy方法按照股票代码进行分区,这样每个股票代码的数据都会被分到同一个分区进行处理。
  1. KeyedStream<Stock, String> keyedStream = stockStream.keyBy(Stock::getStockCode);
复制代码
5. 数据聚合

在每个分区上,我们定义一个滑动窗口,窗口巨细为60秒,然后使用AggregateFunction进行聚合盘算。我们定义了一个AvgStockAggregateFunction,用于盘算每个股票代码的均匀代价。
  1. SingleOutputStreamOperator<String> result = keyedStream
  2.     .window(TumblingEventTimeWindows.of(Time.seconds(60)))
  3.     .aggregate(new AvgStockAggregateFunction(), new ResultWindowFunction());
复制代码
6. 结果输出

最后,我们将盘算结果打印出来,并实行任务。
  1. result.print();
  2. env.execute();
复制代码
以上就是使用AggregateFunction盘算股票均匀代价的整个流程。通过这个流程,我们可以实时盘算每个股票代码在每分钟内的均匀交易代价,为股票交易提供有代价的信息。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

没腿的鸟

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表