国产时序数据库IotDB安装、与SpringBoot集成

鼠扑  金牌会员 | 2022-9-16 17:18:55 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 942|帖子 942|积分 2826

一.简介:

本文将完成一个真实业务中的设备上报数据的一个例子,完整的展示后台服务接收到设备上报的数据后,将数据添加到时序数据库,并且将数据查询出来的一个例子。本文所有代码已经上传GitHub:https://github.com/Tom-shushu/work-study 下的 iotdb-demo 下。

IoTDB 是针对时间序列数据收集、存储与分析一体化的数据管理引擎。它具有体量轻、性能高、易使用的特点,完美对接 Hadoop 与 Spark 生态,适用于工业物联网应用中海量时间序列数据高速写入和复杂分析查询的需求。

我的理解:它就是一个树形结构的数据库可以很灵活的查询各个级下面的数据,因为它特殊的数据结构也使得它的查询效率会更高一些。

二.Docker安装IotDB:

1.拉取镜像(使用0.13,在使用的过程中0.14在查询时出现了问题)
  1. docker pull apache/iotdb:0.13.1-node
复制代码
2.创建数据文件和日志的 docker 挂载目录 (docker volume)
  1. docker volume create mydata
  2. docker volume create mylogs
复制代码
3.直接运行镜像
  1. docker run --name iotdb  -p 6667:6667 -v mydata:/iotdb/data -v mylogs:/iotdb/logs -d apache/iotdb:0.13.1-node /iotdb/bin/start-server.sh
复制代码
4.进入镜像并且登录IotDB
  1. docker exec  -it iotdb  /bin/bash
  2. /iotdb/sbin/start-cli.sh -h localhost -p 6667 -u root -pw root
复制代码

这样就算安装完成,然后打开服务器6667安全组

三.IotDB与SpringBoot集成

1.引入必要的依赖
  1.     <dependency>
  2.             <groupId>org.apache.iotdb</groupId>
  3.             <artifactId>iotdb-session</artifactId>
  4.             <version>0.14.0-preview1</version>
  5.         </dependency>
  6.         <dependency>
  7.             <groupId>cn.hutool</groupId>
  8.             <artifactId>hutool-all</artifactId>
  9.             <version>5.6.3</version>
  10.         </dependency>
  11.         <dependency>
  12.             <groupId>com.alibaba</groupId>
  13.             <artifactId>fastjson</artifactId>
  14.             <version>1.2.83</version>
  15.         </dependency>
  16.         <dependency>
  17.             <groupId>org.springframework.boot</groupId>
  18.             <artifactId>spring-boot-starter-web</artifactId>
  19.         </dependency>
  20.         <dependency>
  21.             <groupId>org.projectlombok</groupId>
  22.             <artifactId>lombok</artifactId>
  23.             <optional>true</optional>
  24.         </dependency>
  25.         <dependency>
  26.             <groupId>org.springframework.boot</groupId>
  27.             <artifactId>spring-boot-starter-test</artifactId>
  28.             <scope>test</scope>
  29.             <exclusions>
  30.                 <exclusion>
  31.                     <groupId>org.junit.vintage</groupId>
  32.                     <artifactId>junit-vintage-engine</artifactId>
  33.                 </exclusion>
  34.             </exclusions>
  35.         </dependency>
复制代码
2.编写配置类并且封装对应的方法 IotDBSessionConfig
  1. package com.zhouhong.iotdbdemo.config;
  2. import lombok.extern.log4j.Log4j2;
  3. import org.apache.iotdb.rpc.IoTDBConnectionException;
  4. import org.apache.iotdb.rpc.StatementExecutionException;
  5. import org.apache.iotdb.session.Session;
  6. import org.apache.iotdb.session.SessionDataSet;
  7. import org.apache.iotdb.session.util.Version;
  8. import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
  9. import org.apache.iotdb.tsfile.write.record.Tablet;
  10. import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
  11. import org.springframework.context.annotation.Bean;
  12. import org.springframework.context.annotation.Configuration;
  13. import org.springframework.stereotype.Component;
  14. import java.rmi.ServerException;
  15. import java.util.ArrayList;
  16. import java.util.List;
  17. /**
  18. * description: iotdb 配置工具类(常用部分,如需要可以自行扩展)
  19. * 注意:可以不需要创建分组,插入时默认前两个节点名称为分组名称 比如: root.a1eaKSRpRty.CA3013A303A25467 或者
  20. * root.a1eaKSRpRty.CA3013A303A25467.heart  他们的分组都为 root.a1eaKSRpRty
  21. * author: zhouhong
  22. */
  23. @Log4j2
  24. @Component
  25. @Configuration
  26. public class IotDBSessionConfig {
  27.     private static Session session;
  28.     private static final String LOCAL_HOST = "XXX.XX.XXX.XX";
  29.     @Bean
  30.     public Session getSession() throws IoTDBConnectionException, StatementExecutionException {
  31.         if (session == null) {
  32.             log.info("正在连接iotdb.......");
  33.             session = new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").version(Version.V_0_13).build();
  34.             session.open(false);
  35.             session.setFetchSize(100);
  36.             log.info("iotdb连接成功~");
  37.             // 设置时区
  38.             session.setTimeZone("+08:00");
  39.         }
  40.         return session;
  41.     }
  42.     /**
  43.      * description: 带有数据类型的添加操作 - insertRecord没有指定类型
  44.      * author: zhouhong
  45.      * @param  * @param deviceId:节点路径如:root.a1eaKSRpRty.CA3013A303A25467
  46.      *                  time:时间戳
  47.      *                  measurementsList:物理量 即:属性
  48.      *                  type:数据类型: BOOLEAN((byte)0), INT32((byte)1),INT64((byte)2),FLOAT((byte)3),DOUBLE((byte)4),TEXT((byte)5),VECTOR((byte)6);
  49.      *                  valuesList:属性值 --- 属性必须与属性值一一对应
  50.      * @return
  51.      */
  52.     public void insertRecordType(String deviceId, Long time,List<String>  measurementsList, TSDataType type,List<Object> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
  53.         if (measurementsList.size() != valuesList.size()) {
  54.             throw new ServerException("measurementsList 与 valuesList 值不对应");
  55.         }
  56.         List<TSDataType> types = new ArrayList<>();
  57.         measurementsList.forEach(item -> {
  58.             types.add(type);
  59.         });
  60.         session.insertRecord(deviceId, time, measurementsList, types, valuesList);
  61.     }
  62.     /**
  63.      * description: 带有数据类型的添加操作 - insertRecord没有指定类型
  64.      * author: zhouhong
  65.      * @param  deviceId:节点路径如:root.a1eaKSRpRty.CA3013A303A25467
  66.      * @param  time:时间戳
  67.      * @param  measurementsList:物理量 即:属性
  68.      * @param  valuesList:属性值 --- 属性必须与属性值一一对应
  69.      * @return
  70.      */
  71.     public void insertRecord(String deviceId, Long time,List<String>  measurementsList, List<String> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
  72.         if (measurementsList.size() == valuesList.size()) {
  73.             session.insertRecord(deviceId, time, measurementsList, valuesList);
  74.         } else {
  75.             log.error("measurementsList 与 valuesList 值不对应");
  76.         }
  77.     }
  78.     /**
  79.      * description: 批量插入
  80.      * author: zhouhong
  81.      */
  82.     public void insertRecords(List<String> deviceIdList, List<Long> timeList, List<List<String>> measurementsList, List<List<String>> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
  83.         if (measurementsList.size() == valuesList.size()) {
  84.             session.insertRecords(deviceIdList, timeList, measurementsList, valuesList);
  85.         } else {
  86.             log.error("measurementsList 与 valuesList 值不对应");
  87.         }
  88.     }
  89.     /**
  90.      * description: 插入操作
  91.      * author: zhouhong
  92.      * @param  deviceId:节点路径如:root.a1eaKSRpRty.CA3013A303A25467
  93.      *  @param  time:时间戳
  94.      *  @param  schemaList: 属性值 + 数据类型 例子: List<MeasurementSchema> schemaList = new ArrayList<>();  schemaList.add(new MeasurementSchema("breath", TSDataType.INT64));
  95.      *  @param  maxRowNumber:
  96.      * @return
  97.      */
  98.     public void insertTablet(String deviceId,  Long time,List<MeasurementSchema> schemaList, List<Object> valueList,int maxRowNumber) throws StatementExecutionException, IoTDBConnectionException {
  99.         Tablet tablet = new Tablet(deviceId, schemaList, maxRowNumber);
  100.         // 向iotdb里面添加数据
  101.         int rowIndex = tablet.rowSize++;
  102.         tablet.addTimestamp(rowIndex, time);
  103.         for (int i = 0; i < valueList.size(); i++) {
  104.             tablet.addValue(schemaList.get(i).getMeasurementId(), rowIndex, valueList.get(i));
  105.         }
  106.         if (tablet.rowSize == tablet.getMaxRowNumber()) {
  107.             session.insertTablet(tablet, true);
  108.             tablet.reset();
  109.         }
  110.         if (tablet.rowSize != 0) {
  111.             session.insertTablet(tablet);
  112.             tablet.reset();
  113.         }
  114.     }
  115.     /**
  116.      * description: 根据SQL查询
  117.      * author: zhouhong
  118.      */
  119.     public SessionDataSet query(String sql) throws StatementExecutionException, IoTDBConnectionException {
  120.         return session.executeQueryStatement(sql);
  121.     }
  122.     /**
  123.      * description: 删除分组 如 root.a1eaKSRpRty
  124.      * author: zhouhong
  125.      * @param  groupName:分组名称
  126.      * @return
  127.      */
  128.     public void deleteStorageGroup(String groupName) throws StatementExecutionException, IoTDBConnectionException {
  129.         session.deleteStorageGroup(groupName);
  130.     }
  131.     /**
  132.      * description: 根据Timeseries删除  如:root.a1eaKSRpRty.CA3013A303A25467.breath  (个人理解:为具体的物理量)
  133.      * author: zhouhong
  134.      */
  135.     public void deleteTimeseries(String timeseries) throws StatementExecutionException, IoTDBConnectionException {
  136.         session.deleteTimeseries(timeseries);
  137.     }
  138.     /**
  139.      * description: 根据Timeseries批量删除
  140.      * author: zhouhong
  141.      */
  142.     public void deleteTimeserieList(List<String> timeseriesList) throws StatementExecutionException, IoTDBConnectionException {
  143.         session.deleteTimeseries(timeseriesList);
  144.     }
  145.     /**
  146.      * description: 根据分组批量删除
  147.      * author: zhouhong
  148.      */
  149.     public void deleteStorageGroupList(List<String> storageGroupList) throws StatementExecutionException, IoTDBConnectionException {
  150.         session.deleteStorageGroups(storageGroupList);
  151.     }
  152.     /**
  153.      * description: 根据路径和结束时间删除 结束时间之前的所有数据
  154.      * author: zhouhong
  155.      */
  156.     public void deleteDataByPathAndEndTime(String path, Long endTime) throws StatementExecutionException, IoTDBConnectionException {
  157.         session.deleteData(path, endTime);
  158.     }
  159.     /**
  160.      * description: 根据路径集合和结束时间批量删除 结束时间之前的所有数据
  161.      * author: zhouhong
  162.      */
  163.     public void deleteDataByPathListAndEndTime(List<String> pathList, Long endTime) throws StatementExecutionException, IoTDBConnectionException {
  164.         session.deleteData(pathList, endTime);
  165.     }
  166.     /**
  167.      * description: 根据路径集合和时间段批量删除
  168.      * author: zhouhong
  169.      */
  170.     public void deleteDataByPathListAndTime(List<String> pathList, Long startTime,Long endTime) throws StatementExecutionException, IoTDBConnectionException {
  171.         session.deleteData(pathList, startTime, endTime);
  172.     }
  173. }
复制代码
3.入参
  1. package com.zhouhong.iotdbdemo.model.param;
  2. import lombok.Data;
  3. /**
  4. * description: 入参
  5. * date: 2022/8/15 21:53
  6. * author: zhouhong
  7. */
  8. @Data
  9. public class IotDbParam {
  10.     /***
  11.      * 产品PK
  12.      */
  13.     private  String  pk;
  14.     /***
  15.      * 设备号
  16.      */
  17.     private  String  sn;
  18.     /***
  19.      * 时间
  20.      */
  21.     private Long time;
  22.     /***
  23.      * 实时呼吸
  24.      */
  25.     private String breath;
  26.     /***
  27.      * 实时心率
  28.      */
  29.     private String heart;
  30.     /***
  31.      * 查询开始时间
  32.      */
  33.     private String startTime;
  34.     /***
  35.      * 查询结束时间
  36.      */
  37.     private String endTime;
  38. }
复制代码
4.返回参数
  1. package com.zhouhong.iotdbdemo.model.result;
  2. import lombok.Data;
  3. /**
  4. * description: 返回结果
  5. * date: 2022/8/15 21:56
  6. * author: zhouhong
  7. */
  8. @Data
  9. public class IotDbResult {
  10.     /***
  11.      * 时间
  12.      */
  13.     private String time;
  14.     /***
  15.      * 产品PK
  16.      */
  17.     private  String  pk;
  18.     /***
  19.      * 设备号
  20.      */
  21.     private  String  sn;
  22.     /***
  23.      * 实时呼吸
  24.      */
  25.     private String breath;
  26.     /***
  27.      * 实时心率
  28.      */
  29.     private String heart;
  30. }
复制代码
5.使用
  1. package com.zhouhong.iotdbdemo.server.impl;
  2. import com.zhouhong.iotdbdemo.config.IotDBSessionConfig;
  3. import com.zhouhong.iotdbdemo.model.param.IotDbParam;
  4. import com.zhouhong.iotdbdemo.model.result.IotDbResult;
  5. import com.zhouhong.iotdbdemo.server.IotDbServer;
  6. import lombok.extern.log4j.Log4j2;
  7. import org.apache.iotdb.rpc.IoTDBConnectionException;
  8. import org.apache.iotdb.rpc.StatementExecutionException;
  9. import org.apache.iotdb.session.SessionDataSet;
  10. import org.apache.iotdb.tsfile.read.common.Field;
  11. import org.apache.iotdb.tsfile.read.common.RowRecord;
  12. import org.springframework.stereotype.Service;
  13. import javax.annotation.Resource;
  14. import java.rmi.ServerException;
  15. import java.util.ArrayList;
  16. import java.util.HashMap;
  17. import java.util.List;
  18. import java.util.Map;
  19. /**
  20. * description: iot服务实现类
  21. * date: 2022/8/15 9:43
  22. * author: zhouhong
  23. */
  24. @Log4j2
  25. @Service
  26. public class IotDbServerImpl implements IotDbServer {
  27.     @Resource
  28.     private IotDBSessionConfig iotDBSessionConfig;
  29.     @Override
  30.     public void insertData(IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {
  31.         // iotDbParam: 模拟设备上报消息
  32.         // bizkey: 业务唯一key  PK :产品唯一编码   SN:设备唯一编码
  33.         String deviceId = "root.bizkey."+ iotDbParam.getPk() + "." + iotDbParam.getSn();
  34.         // 将设备上报的数据存入数据库(时序数据库)
  35.         List<String> measurementsList = new ArrayList<>();
  36.         measurementsList.add("heart");
  37.         measurementsList.add("breath");
  38.         List<String> valuesList = new ArrayList<>();
  39.         valuesList.add(String.valueOf(iotDbParam.getHeart()));
  40.         valuesList.add(String.valueOf(iotDbParam.getBreath()));
  41.         iotDBSessionConfig.insertRecord(deviceId, iotDbParam.getTime(), measurementsList, valuesList);
  42.     }
  43.     @Override
  44.     public List<IotDbResult> queryDataFromIotDb(IotDbParam iotDbParam) throws Exception {
  45.         List<IotDbResult> iotDbResultList = new ArrayList<>();
  46.         if (null != iotDbParam.getPk() && null != iotDbParam.getSn()) {
  47.             String sql = "select * from root.bizkey."+ iotDbParam.getPk() +"." + iotDbParam.getSn() + " where time >= "
  48.                     + iotDbParam.getStartTime() + " and time < " + iotDbParam.getEndTime();
  49.             SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql);
  50.             List<String> columnNames = sessionDataSet.getColumnNames();
  51.             List<String> titleList = new ArrayList<>();
  52.             // 排除Time字段 -- 方便后面后面拼装数据
  53.             for (int i = 1; i < columnNames.size(); i++) {
  54.                 String[] temp = columnNames.get(i).split("\\.");
  55.                 titleList.add(temp[temp.length - 1]);
  56.             }
  57.             // 封装处理数据
  58.             packagingData(iotDbParam, iotDbResultList, sessionDataSet, titleList);
  59.         } else {
  60.             log.info("PK或者SN不能为空!!");
  61.         }
  62.         return iotDbResultList;
  63.     }
  64.     /**
  65.      * 封装处理数据
  66.      * @param iotDbParam
  67.      * @param iotDbResultList
  68.      * @param sessionDataSet
  69.      * @param titleList
  70.      * @throws StatementExecutionException
  71.      * @throws IoTDBConnectionException
  72.      */
  73.     private void packagingData(IotDbParam iotDbParam, List<IotDbResult> iotDbResultList, SessionDataSet sessionDataSet, List<String> titleList)
  74.             throws StatementExecutionException, IoTDBConnectionException {
  75.         int fetchSize = sessionDataSet.getFetchSize();
  76.         if (fetchSize > 0) {
  77.             while (sessionDataSet.hasNext()) {
  78.                 IotDbResult iotDbResult = new IotDbResult();
  79.                 RowRecord next = sessionDataSet.next();
  80.                 List<Field> fields = next.getFields();
  81.                 String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
  82.                 iotDbResult.setTime(timeString);
  83.                 Map<String, String> map = new HashMap<>();
  84.                 for (int i = 0; i < fields.size(); i++) {
  85.                     Field field = fields.get(i);
  86.                     // 这里的需要按照类型获取
  87.                     map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString());
  88.                 }
  89.                 iotDbResult.setTime(timeString);
  90.                 iotDbResult.setPk(iotDbParam.getPk());
  91.                 iotDbResult.setSn(iotDbParam.getSn());
  92.                 iotDbResult.setHeart(map.get("heart"));
  93.                 iotDbResult.setBreath(map.get("breath"));
  94.                 iotDbResultList.add(iotDbResult);
  95.             }
  96.         }
  97.     }
  98. }
复制代码
6.控制层
  1. package com.zhouhong.iotdbdemo.controller;
  2. import com.zhouhong.iotdbdemo.config.IotDBSessionConfig;
  3. import com.zhouhong.iotdbdemo.model.param.IotDbParam;
  4. import com.zhouhong.iotdbdemo.response.ResponseData;
  5. import com.zhouhong.iotdbdemo.server.IotDbServer;
  6. import lombok.extern.log4j.Log4j2;
  7. import org.apache.iotdb.rpc.IoTDBConnectionException;
  8. import org.apache.iotdb.rpc.StatementExecutionException;
  9. import org.springframework.web.bind.annotation.*;
  10. import javax.annotation.Resource;
  11. import java.rmi.ServerException;
  12. /**
  13. * description: iotdb 控制层
  14. * date: 2022/8/15 21:50
  15. * author: zhouhong
  16. */
  17. @Log4j2
  18. @RestController
  19. public class IotDbController {
  20.     @Resource
  21.     private IotDbServer iotDbServer;
  22.     @Resource
  23.     private IotDBSessionConfig iotDBSessionConfig;
  24.     /**
  25.      * 插入数据
  26.      * @param iotDbParam
  27.      */
  28.     @PostMapping("/api/device/insert")
  29.     public ResponseData insert(@RequestBody IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {
  30.         iotDbServer.insertData(iotDbParam);
  31.         return ResponseData.success();
  32.     }
  33.     /**
  34.      * 插入数据
  35.      * @param iotDbParam
  36.      */
  37.     @PostMapping("/api/device/queryData")
  38.     public ResponseData queryDataFromIotDb(@RequestBody IotDbParam iotDbParam) throws Exception {
  39.         return ResponseData.success(iotDbServer.queryDataFromIotDb(iotDbParam));
  40.     }
  41.     /**
  42.      * 删除分组
  43.      * @return
  44.      */
  45.     @PostMapping("/api/device/deleteGroup")
  46.     public ResponseData deleteGroup() throws StatementExecutionException, IoTDBConnectionException {
  47.         iotDBSessionConfig.deleteStorageGroup("root.a1eaKSRpRty");
  48.         iotDBSessionConfig.deleteStorageGroup("root.smartretirement");
  49.         return ResponseData.success();
  50.     }
  51. }
复制代码
四.PostMan测试

1.添加一条记录

接口:localhost:8080/api/device/insert

入参:
  1. {
  2.     "time":1660573444672,
  3.     "pk":"a1TTQK9TbKT",
  4.     "sn":"SN202208120945QGJLD",
  5.     "breath":"17",
  6.     "heart":"68"
  7. }
复制代码

 
 
 查看IotDB数据


 
 
 2.根据SQL查询时间区间记录(其他查询以此类推)

接口:localhost:8080/api/device/queryData

入参:
  1. {
  2.     "pk":"a1TTQK9TbKT",
  3.     "sn":"SN202208120945QGJLD",
  4.     "startTime":"2022-08-14 00:00:00",
  5.     "endTime":"2022-08-16 00:00:00"
  6. }
复制代码
结果:
  1. {
  2.     "success": true,
  3.     "code": 200,
  4.     "message": "请求成功",
  5.     "localizedMsg": "请求成功",
  6.     "data": [
  7.         {
  8.             "time": "2022-08-15 22:24:04",
  9.             "pk": "a1TTQK9TbKT",
  10.             "sn": "SN202208120945QGJLD",
  11.             "breath": "19.0",
  12.             "heart": "75.0"
  13.         },
  14.         {
  15.             "time": "2022-08-15 22:24:04",
  16.             "pk": "a1TTQK9TbKT",
  17.             "sn": "SN202208120945QGJLD",
  18.             "breath": "20.0",
  19.             "heart": "78.0"
  20.         },
  21.         {
  22.             "time": "2022-08-15 22:24:04",
  23.             "pk": "a1TTQK9TbKT",
  24.             "sn": "SN202208120945QGJLD",
  25.             "breath": "17.0",
  26.             "heart": "68.0"
  27.         }
  28.     ]
  29. }
复制代码
IotDB还支持分页、聚合等等其他操作,详细信息可以参考 https://iotdb.apache.org/zh/UserGuide/Master/Query-Data/Overview.html


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

鼠扑

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表