用多少眼泪才能让你相信 发表于 2024-2-23 01:00:59

Storm 集群的搭建及其Java编程进行简单统计计算

一、Storm集群构建
编写storm 与 zookeeper的yml文件
 https://img2023.cnblogs.com/blog/3318273/202312/3318273-20231216201235743-1205495468.png
storm yml文件的编写
具体如下:
version: '2'

services:

zookeeper1:

    image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8

    container_name: zk1.cloud

    environment:

      - SERVER_ID=1

      - ADDITIONAL_ZOOKEEPER_1=server.1=0.0.0.0:2888:3888

      - ADDITIONAL_ZOOKEEPER_2=server.2=zk2.cloud:2888:3888

      - ADDITIONAL_ZOOKEEPER_3=server.3=zk3.cloud:2888:3888

zookeeper2:

    image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8

    container_name: zk2.cloud

    environment:

      - SERVER_ID=2

      - ADDITIONAL_ZOOKEEPER_1=server.1=zk1.cloud:2888:3888

      - ADDITIONAL_ZOOKEEPER_2=server.2=0.0.0.0:2888:3888

      - ADDITIONAL_ZOOKEEPER_3=server.3=zk3.cloud:2888:3888

zookeeper3:

    image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8

    container_name: zk3.cloud

    environment:

      - SERVER_ID=3

      - ADDITIONAL_ZOOKEEPER_1=server.1=zk1.cloud:2888:3888

      - ADDITIONAL_ZOOKEEPER_2=server.2=zk2.cloud:2888:3888

      - ADDITIONAL_ZOOKEEPER_3=server.3=0.0.0.0:2888:3888

ui:

    image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0

    command: ui -c nimbus.host=nimbus

    environment:

      - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud

    restart: always

    container_name: ui

    ports:

      - 8080:8080

    depends_on:

      - nimbus

nimbus:

    image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0

    command: nimbus -c nimbus.host=nimbus

    restart: always

    environment:

      - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud

    container_name: nimbus

    ports:

      - 6627:6627

supervisor:

    image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0

    command: supervisor -c nimbus.host=nimbus -c supervisor.slots.ports=

    restart: always

    environment:

      - affinity:role!=supervisor

      - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud

    depends_on:

      - nimbus

networks:

default:

    external:

      name: zk-net 
 
拉取Storm搭建需要的镜像,这里我选择镜像版本为 zookeeper:3.4.8  storm:1.0.0
键入命令:
docker pull zookeeper:3.4.8docker pull storm:1.0.0https://img2023.cnblogs.com/blog/3318273/202312/3318273-20231216201235688-1070797795.png 
storm镜像 获取
使用docker-compose 构建集群
在power shell中执行以下命令:
 
docker-compose -f storm.yml up -d https://img2023.cnblogs.com/blog/3318273/202312/3318273-20231216201235729-1337881808.png
                                                                              docker-compose 构建集群
在浏览器中打开localhost:8080 可以看到storm集群的详细情况
 https://img2023.cnblogs.com/blog/3318273/202312/3318273-20231216201235751-846318362.png
storm UI 展示
二、Storm统计任务
统计股票交易情况交易量和交易总金额   (数据文件存储在csv文件中)
编写DataSourceSpout类
https://img2023.cnblogs.com/blog/3318273/202312/3318273-20231216201405793-2072279047.png 
DataSourceSpout类
编写bolt类
 
 
 https://img2023.cnblogs.com/blog/3318273/202312/3318273-20231216201439832-684221282.png
编写topology类
 
 https://img2023.cnblogs.com/blog/3318273/202312/3318273-20231216201502036-763100921.png
需要注意的是 Storm Java API 下有本地模型和远端模式
在本地模式下的调试不依赖于集群环境,可以进行简单的调试
如果需要使用生产模式,则需要将
1、 编写和自身业务相关的spout和bolt类,并将其打包成一个jar包
 
2、将上述的jar包放到客户端代码能读到的任何位置,
 
3、使用如下方式定义一个拓扑(Topology)
 https://img2023.cnblogs.com/blog/3318273/202312/3318273-20231216201527232-1773089695.png
 
演示结果:
本地模式下的调试:
 
正在执行:
 https://img2023.cnblogs.com/blog/3318273/202312/3318273-20231216201547759-313619165.png
根据24小时
 
 https://img2023.cnblogs.com/blog/3318273/202312/3318273-20231216201547778-1200014914.png
根据股票种类
 
 
生产模式:
 https://img2023.cnblogs.com/blog/3318273/202312/3318273-20231216201559751-1713337589.png
向集群提交topology
                                                       
 
 
 
三、核心计算bolt的代码
1.统计不同类型的股票交易量和交易总金额:
package bolt;



import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

import java.util.Set;



import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichBolt;

import org.apache.storm.tuple.Tuple;

import org.apache.storm.tuple.Values;



@SuppressWarnings("serial")

public class TypeCountBolt extends BaseRichBolt {



    OutputCollector collector;



    Map<String,Integer> map = new HashMap<String, Integer>();



    Map<String,Float> map2 = new HashMap<String, Float>();





    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

      this.collector = collector;



    }



    public void execute(Tuple input) {

      String line = input.getStringByField("line");

      String[] data = line.split(",");

      Integer count = map.get(data);

      Float total_amount = map2.get(data);

      if(count==null){

            count = 0;

      }

      if(total_amount==null){

            total_amount = 0.0f;

      }

      count++;

      total_amount+=Float.parseFloat(data) * Integer.parseInt(data);

      map.put(data,count);

      map2.put(data,total_amount);



      System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");

      Set<Map.Entry<String,Integer>> entrySet = map.entrySet();

      for(Map.Entry<String,Integer> entry :entrySet){

            System.out.println("交易量:");

            System.out.println(entry);

      }

      System.out.println();

      Set<Map.Entry<String,Float>> entrySet2 = map2.entrySet();

      for(Map.Entry<String,Float> entry :entrySet2){

            System.out.println("交易总金额:");

            System.out.println(entry);

      }

    }





    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }




 
2. 统计不同每个小时的交易量和交易总金额
package bolt;



import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichBolt;

import org.apache.storm.tuple.Tuple;



import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.HashMap;

import java.util.Map;

import java.util.Set;



publicclass TimeCountBolt extends BaseRichBolt {

    OutputCollector collector;



    Map<Integer,Integer> map = new HashMap<Integer, Integer>();



    Map<Integer,Float> map2 = new HashMap<Integer, Float>();





    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

      this.collector = collector;



    }



    public void execute(Tuple input) {

      String line = input.getStringByField("line");

      String[] data = line.split(",");



      Date date = new Date();

      SimpleDateFormat dateFormat= new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");

      try {

            date = dateFormat.parse(data);

      } catch (ParseException e) {

            e.printStackTrace();

      }



      Integer count = map.get(date.getHours());

      Float total_amount = map2.get(date.getHours());

      if(count==null){

            count = 0;

      }

      if(total_amount==null){

            total_amount = 0.0f;

      }

      count++;

      total_amount+=Float.parseFloat(data) * Integer.parseInt(data);

      map.put(date.getHours(),count);

      map2.put(date.getHours(),total_amount);



      System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");

      Set<Map.Entry<Integer,Integer>> entrySet = map.entrySet();

      for(Map.Entry<Integer,Integer> entry :entrySet){

            System.out.println("交易量:");

            System.out.println(entry);

      }

      System.out.println();

      Set<Map.Entry<Integer,Float>> entrySet2 = map2.entrySet();

      for(Map.Entry<Integer,Float> entry :entrySet2){

            System.out.println("交易总金额:");

            System.out.println(entry);

      }

    }





    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: Storm 集群的搭建及其Java编程进行简单统计计算