一、Storm集群构建
编写storm 与 zookeeper的yml文件
storm yml文件的编写
具体如下:
拉取Storm搭建需要的镜像,这里我选择镜像版本为 zookeeper:3.4.8 storm:1.0.0
键入命令:- docker pull zookeeper:3.4.8 docker pull storm:1.0.0
复制代码
storm镜像 获取
使用docker-compose 构建集群
在power shell中执行以下命令:
- docker-compose -f storm.yml up -d
复制代码
docker-compose 构建集群
在浏览器中打开localhost:8080 可以看到storm集群的详细情况
storm UI 展示
二、Storm统计任务
统计股票交易情况交易量和交易总金额 (数据文件存储在csv文件中)
编写DataSourceSpout类
DataSourceSpout类
编写bolt类
编写topology类
需要注意的是 Storm Java API 下有本地模型和远端模式
在本地模式下的调试不依赖于集群环境,可以进行简单的调试
如果需要使用生产模式,则需要将
1、 编写和自身业务相关的spout和bolt类,并将其打包成一个jar包
2、将上述的jar包放到客户端代码能读到的任何位置,
3、使用如下方式定义一个拓扑(Topology)
演示结果:
本地模式下的调试:
正在执行:
根据24小时
根据股票种类
生产模式:
向集群提交topology
三、核心计算bolt的代码
1.统计不同类型的股票交易量和交易总金额:- package bolt;
-
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Set;
-
- import org.apache.storm.task.OutputCollector;
- import org.apache.storm.task.TopologyContext;
- import org.apache.storm.topology.OutputFieldsDeclarer;
- import org.apache.storm.topology.base.BaseRichBolt;
- import org.apache.storm.tuple.Tuple;
- import org.apache.storm.tuple.Values;
-
- @SuppressWarnings("serial")
- public class TypeCountBolt extends BaseRichBolt {
-
- OutputCollector collector;
-
- Map<String,Integer> map = new HashMap<String, Integer>();
-
- Map<String,Float> map2 = new HashMap<String, Float>();
-
-
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
-
- }
-
- public void execute(Tuple input) {
- String line = input.getStringByField("line");
- String[] data = line.split(",");
- Integer count = map.get(data[2]);
- Float total_amount = map2.get(data[2]);
- if(count==null){
- count = 0;
- }
- if(total_amount==null){
- total_amount = 0.0f;
- }
- count++;
- total_amount+=Float.parseFloat(data[3]) * Integer.parseInt(data[4]);
- map.put(data[2],count);
- map2.put(data[2],total_amount);
-
- System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");
- Set<Map.Entry<String,Integer>> entrySet = map.entrySet();
- for(Map.Entry<String,Integer> entry :entrySet){
- System.out.println("交易量:");
- System.out.println(entry);
- }
- System.out.println();
- Set<Map.Entry<String,Float>> entrySet2 = map2.entrySet();
- for(Map.Entry<String,Float> entry :entrySet2){
- System.out.println("交易总金额:");
- System.out.println(entry);
- }
- }
-
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
-
- }
复制代码
2. 统计不同每个小时的交易量和交易总金额- package bolt;
-
- import org.apache.storm.task.OutputCollector;
- import org.apache.storm.task.TopologyContext;
- import org.apache.storm.topology.OutputFieldsDeclarer;
- import org.apache.storm.topology.base.BaseRichBolt;
- import org.apache.storm.tuple.Tuple;
-
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Set;
-
- public class TimeCountBolt extends BaseRichBolt {
- OutputCollector collector;
-
- Map<Integer,Integer> map = new HashMap<Integer, Integer>();
-
- Map<Integer,Float> map2 = new HashMap<Integer, Float>();
-
-
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
-
- }
-
- public void execute(Tuple input) {
- String line = input.getStringByField("line");
- String[] data = line.split(",");
-
- Date date = new Date();
- SimpleDateFormat dateFormat= new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
- try {
- date = dateFormat.parse(data[0]);
- } catch (ParseException e) {
- e.printStackTrace();
- }
-
- Integer count = map.get(date.getHours());
- Float total_amount = map2.get(date.getHours());
- if(count==null){
- count = 0;
- }
- if(total_amount==null){
- total_amount = 0.0f;
- }
- count++;
- total_amount+=Float.parseFloat(data[3]) * Integer.parseInt(data[4]);
- map.put(date.getHours(),count);
- map2.put(date.getHours(),total_amount);
-
- System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");
- Set<Map.Entry<Integer,Integer>> entrySet = map.entrySet();
- for(Map.Entry<Integer,Integer> entry :entrySet){
- System.out.println("交易量:");
- System.out.println(entry);
- }
- System.out.println();
- Set<Map.Entry<Integer,Float>> entrySet2 = map2.entrySet();
- for(Map.Entry<Integer,Float> entry :entrySet2){
- System.out.println("交易总金额:");
- System.out.println(entry);
- }
- }
-
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |