Storm 集群的搭建及其Java编程进行简单统计计算

打印 上一主题 下一主题

主题 667|帖子 667|积分 2001

一、Storm集群构建
编写storm 与 zookeeper的yml文件
 

storm yml文件的编写

具体如下:
  1. version: '2'
  2. services:
  3.   zookeeper1:
  4.     image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8
  5.     container_name: zk1.cloud
  6.     environment:
  7.       - SERVER_ID=1
  8.       - ADDITIONAL_ZOOKEEPER_1=server.1=0.0.0.0:2888:3888
  9.       - ADDITIONAL_ZOOKEEPER_2=server.2=zk2.cloud:2888:3888
  10.       - ADDITIONAL_ZOOKEEPER_3=server.3=zk3.cloud:2888:3888
  11.   zookeeper2:
  12.     image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8
  13.     container_name: zk2.cloud
  14.     environment:
  15.       - SERVER_ID=2
  16.       - ADDITIONAL_ZOOKEEPER_1=server.1=zk1.cloud:2888:3888
  17.       - ADDITIONAL_ZOOKEEPER_2=server.2=0.0.0.0:2888:3888
  18.       - ADDITIONAL_ZOOKEEPER_3=server.3=zk3.cloud:2888:3888
  19.   zookeeper3:
  20.     image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8
  21.     container_name: zk3.cloud
  22.     environment:
  23.       - SERVER_ID=3
  24.       - ADDITIONAL_ZOOKEEPER_1=server.1=zk1.cloud:2888:3888
  25.       - ADDITIONAL_ZOOKEEPER_2=server.2=zk2.cloud:2888:3888
  26.       - ADDITIONAL_ZOOKEEPER_3=server.3=0.0.0.0:2888:3888
  27.   ui:
  28.     image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0
  29.     command: ui -c nimbus.host=nimbus
  30.     environment:
  31.       - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud
  32.     restart: always
  33.     container_name: ui
  34.     ports:
  35.       - 8080:8080
  36.     depends_on:
  37.       - nimbus
  38.   nimbus:
  39.     image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0
  40.     command: nimbus -c nimbus.host=nimbus
  41.     restart: always
  42.     environment:
  43.       - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud
  44.     container_name: nimbus
  45.     ports:
  46.       - 6627:6627
  47.   supervisor:
  48.     image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0
  49.     command: supervisor -c nimbus.host=nimbus -c supervisor.slots.ports=[6700,6701,6702,6703]
  50.     restart: always
  51.     environment:
  52.       - affinity:role!=supervisor
  53.       - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud
  54.     depends_on:
  55.       - nimbus
  56. networks:
  57.   default:
  58.     external:
  59.       name: zk-net
复制代码
 
 
拉取Storm搭建需要的镜像,这里我选择镜像版本为 zookeeper:3.4.8  storm:1.0.0
键入命令:
  1. docker pull zookeeper:3.4.8  docker pull storm:1.0.0
复制代码
 

storm镜像 获取

使用docker-compose 构建集群
在power shell中执行以下命令:
 
  1. docker-compose -f storm.yml up -d
复制代码
 

                                                                              docker-compose 构建集群
在浏览器中打开localhost:8080 可以看到storm集群的详细情况
 

storm UI 展示

二、Storm统计任务
统计股票交易情况交易量和交易总金额   (数据文件存储在csv文件中)
编写DataSourceSpout类
 

DataSourceSpout类

编写bolt类
 
 
 
编写topology类
 
 
需要注意的是 Storm Java API 下有本地模型和远端模式
在本地模式下的调试不依赖于集群环境,可以进行简单的调试
如果需要使用生产模式,则需要将
1、 编写和自身业务相关的spout和bolt类,并将其打包成一个jar包
 
2、将上述的jar包放到客户端代码能读到的任何位置,
 
3、使用如下方式定义一个拓扑(Topology)
 
 
演示结果:
本地模式下的调试:
 
正在执行:
 

根据24小时

 
 

根据股票种类

 
 
生产模式:
 

向集群提交topology

                                                       
 
 
 
三、核心计算bolt的代码
1.统计不同类型的股票交易量和交易总金额:
  1. package bolt;
  2. import java.io.IOException;
  3. import java.util.HashMap;
  4. import java.util.Map;
  5. import java.util.Set;
  6. import org.apache.storm.task.OutputCollector;
  7. import org.apache.storm.task.TopologyContext;
  8. import org.apache.storm.topology.OutputFieldsDeclarer;
  9. import org.apache.storm.topology.base.BaseRichBolt;
  10. import org.apache.storm.tuple.Tuple;
  11. import org.apache.storm.tuple.Values;
  12. @SuppressWarnings("serial")
  13. public class TypeCountBolt extends BaseRichBolt {
  14.     OutputCollector collector;
  15.     Map<String,Integer> map = new HashMap<String, Integer>();
  16.     Map<String,Float> map2 = new HashMap<String, Float>();
  17.     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  18.         this.collector = collector;
  19.     }
  20.     public void execute(Tuple input) {
  21.         String line = input.getStringByField("line");
  22.         String[] data = line.split(",");
  23.         Integer count = map.get(data[2]);
  24.         Float total_amount = map2.get(data[2]);
  25.         if(count==null){
  26.             count = 0;
  27.         }
  28.         if(total_amount==null){
  29.             total_amount = 0.0f;
  30.         }
  31.         count++;
  32.         total_amount+=Float.parseFloat(data[3]) * Integer.parseInt(data[4]);
  33.         map.put(data[2],count);
  34.         map2.put(data[2],total_amount);
  35.         System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");
  36.         Set<Map.Entry<String,Integer>> entrySet = map.entrySet();
  37.         for(Map.Entry<String,Integer> entry :entrySet){
  38.             System.out.println("交易量:");
  39.             System.out.println(entry);
  40.         }
  41.         System.out.println();
  42.         Set<Map.Entry<String,Float>> entrySet2 = map2.entrySet();
  43.         for(Map.Entry<String,Float> entry :entrySet2){
  44.             System.out.println("交易总金额:");
  45.             System.out.println(entry);
  46.         }
  47.     }
  48.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  49.     }
  50. }
复制代码
 
 
2. 统计不同每个小时的交易量和交易总金额
  1. package bolt;
  2. import org.apache.storm.task.OutputCollector;
  3. import org.apache.storm.task.TopologyContext;
  4. import org.apache.storm.topology.OutputFieldsDeclarer;
  5. import org.apache.storm.topology.base.BaseRichBolt;
  6. import org.apache.storm.tuple.Tuple;
  7. import java.text.ParseException;
  8. import java.text.SimpleDateFormat;
  9. import java.util.Date;
  10. import java.util.HashMap;
  11. import java.util.Map;
  12. import java.util.Set;
  13. public  class TimeCountBolt extends BaseRichBolt {
  14.     OutputCollector collector;
  15.     Map<Integer,Integer> map = new HashMap<Integer, Integer>();
  16.     Map<Integer,Float> map2 = new HashMap<Integer, Float>();
  17.     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  18.         this.collector = collector;
  19.     }
  20.     public void execute(Tuple input) {
  21.         String line = input.getStringByField("line");
  22.         String[] data = line.split(",");
  23.         Date date = new Date();
  24.         SimpleDateFormat dateFormat= new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
  25.         try {
  26.             date = dateFormat.parse(data[0]);
  27.         } catch (ParseException e) {
  28.             e.printStackTrace();
  29.         }
  30.         Integer count = map.get(date.getHours());
  31.         Float total_amount = map2.get(date.getHours());
  32.         if(count==null){
  33.             count = 0;
  34.         }
  35.         if(total_amount==null){
  36.             total_amount = 0.0f;
  37.         }
  38.         count++;
  39.         total_amount+=Float.parseFloat(data[3]) * Integer.parseInt(data[4]);
  40.         map.put(date.getHours(),count);
  41.         map2.put(date.getHours(),total_amount);
  42.         System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");
  43.         Set<Map.Entry<Integer,Integer>> entrySet = map.entrySet();
  44.         for(Map.Entry<Integer,Integer> entry :entrySet){
  45.             System.out.println("交易量:");
  46.             System.out.println(entry);
  47.         }
  48.         System.out.println();
  49.         Set<Map.Entry<Integer,Float>> entrySet2 = map2.entrySet();
  50.         for(Map.Entry<Integer,Float> entry :entrySet2){
  51.             System.out.println("交易总金额:");
  52.             System.out.println(entry);
  53.         }
  54.     }
  55.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  56.     }
  57. }
复制代码
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用多少眼泪才能让你相信

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表