SpringBoot集成时序数据库IoTDB(IotDB-Session方式)

守听  金牌会员 | 2025-1-23 19:46:30 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 873|帖子 873|积分 2619

阐明:项目需利用时序数据库,后经调研选用了IoTDB数据库,因此需要在SpringBoot框架内集成该数据库,来进行数据查询、插入操作。根据官方文档Java可采用多种方式访问数据库,分别是Java原生接口、JDBC(不保举)和REST API,下面选用Java原生接口和MyBatis的方式操作数据库。
现实应用中需要高性能吞吐则利用Java原生接口的方式,但一些小量级的增删查可以利用MyBatis的方式。
前提:已摆设安装IoTDB数据库,具体安装摆设流程可查察官网文档。
1、IotDB-Session

1.1、pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <modelVersion>4.0.0</modelVersion>
  6.     <groupId>org.example</groupId>
  7.     <artifactId>springboot-iotdb</artifactId>
  8.     <version>1.0-SNAPSHOT</version>
  9.     <properties>
  10.         <maven.compiler.source>8</maven.compiler.source>
  11.         <maven.compiler.target>8</maven.compiler.target>
  12.     </properties>
  13.     <parent>
  14.         <artifactId>spring-boot-starter-parent</artifactId>
  15.         <groupId>org.springframework.boot</groupId>
  16.         <version>2.1.5.RELEASE</version>
  17.     </parent>
  18.     <dependencies>
  19.         <dependency>
  20.             <groupId>org.projectlombok</groupId>
  21.             <artifactId>lombok</artifactId>
  22.             <version>RELEASE</version>
  23.             <scope>compile</scope>
  24.         </dependency>
  25.         <dependency>
  26.             <groupId>cn.hutool</groupId>
  27.             <artifactId>hutool-all</artifactId>
  28.             <version>5.8.28</version>
  29.             <scope>compile</scope>
  30.         </dependency>
  31.         <dependency>
  32.             <groupId>org.springframework.boot</groupId>
  33.             <artifactId>spring-boot-autoconfigure</artifactId>
  34.         </dependency>
  35.         <dependency>
  36.             <groupId>org.springframework.boot</groupId>
  37.             <artifactId>spring-boot-starter-web</artifactId>
  38.         </dependency>
  39.         <!-- LOG4J -->
  40.         <dependency>
  41.             <groupId>log4j</groupId>
  42.             <artifactId>log4j</artifactId>
  43.             <version>1.2.16</version>
  44.             <scope>compile</scope>
  45.         </dependency>
  46.         <dependency>
  47.             <groupId>org.springframework.boot</groupId>
  48.             <artifactId>spring-boot-starter-test</artifactId>
  49.             <scope>test</scope>
  50.         </dependency>
  51.         <dependency>
  52.             <groupId>org.apache.iotdb</groupId>
  53.             <artifactId>iotdb-session</artifactId>
  54.             <version>1.3.1</version>
  55.         </dependency>
  56.         <dependency>
  57.             <groupId>com.alibaba.fastjson2</groupId>
  58.             <artifactId>fastjson2</artifactId>
  59.             <version>2.0.51</version>
  60.         </dependency>
  61.     </dependencies>
  62. </project>
复制代码
1.2、application.yml

  1. spring:
  2.   iotdb:
  3.     username: root
  4.     password: root
  5.     ip: 100.100.100.100
  6.     port: 6667
  7.     maxSize: 100
  8. server:
  9.   port: 8080
复制代码
1.3、IotDbSessionConfig

  1. package com.example.config;
  2. import org.apache.iotdb.session.pool.SessionPool;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. @Configuration
  10. public class IotDbSessionConfig {
  11.     @Value("${spring.iotdb.username}")
  12.     private String username;
  13.     @Value("${spring.iotdb.password}")
  14.     private String password;
  15.     @Value("${spring.iotdb.ip}")
  16.     private String ip;
  17.     @Value("${spring.iotdb.port}")
  18.     private int port;
  19.     @Value("${spring.iotdb.maxSize}")
  20.     private int maxSize;
  21.     private static SessionPool sessionPool;
  22.     public SessionPool getSessionPool() {
  23.         if (sessionPool == null) {
  24.             sessionPool = new SessionPool(ip, port, username, password, maxSize);
  25.         }
  26.         return sessionPool;
  27.     }
  28. }
复制代码
1.4、IotDbService

  1. package com.example.service;
  2. import com.example.dto.IotDbRecordAble;
  3. import com.example.dto.MeasurementSchemaValuesDTO;
  4. import org.apache.iotdb.common.rpc.thrift.TAggregationType;
  5. import org.apache.iotdb.isession.SessionDataSet;
  6. import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
  7. import org.apache.iotdb.tsfile.write.record.Tablet;
  8. import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
  9. import java.util.List;
  10. import java.util.Map;
  11. public interface IotDbService {
  12.     void insertTablet(Tablet tablet);
  13.     void insertTablets(Map<String, Tablet> tablets);
  14.     void insertStringRecord(String deviceId, long time, List<String> measurements, List<String> values);
  15.     void insertRecord(String deviceId, long time, List<String> measurements, List<TSDataType> types, List<Object> values);
  16.     void insertStringRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList);
  17.     void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList);
  18.     void insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList);
  19.     void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList);
  20.     void deleteData(String path, long endTime);
  21.     void deleteData(List<String> paths, long endTime);
  22.     SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime, long timeOut);
  23.     <T> List<T> executeRawDataQuery(List<String> paths, long startTime, long endTime, long timeOut, Class<? extends IotDbRecordAble> clazz);
  24.     SessionDataSet executeLastDataQuery(List<String> paths, long lastTime);
  25.     <T> List<T> executeLastDataQuery(List<String> paths, long lastTime, Class<? extends IotDbRecordAble> clazz);
  26.     SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes);
  27.     <T> List<T> executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes, Class<? extends IotDbRecordAble> clazz);
  28.     SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations);
  29.     SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime);
  30.     SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval);
  31.     SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval, long slidingStep);
  32.     SessionDataSet executeQueryStatement(String sql);
  33.     void executeNonQueryStatement(String sql);
  34.     List<Map<String, Object>> packagingMapData(SessionDataSet sessionDataSet, List<String> columnNames);
  35.     /**
  36.     *(不支持聚合查询)
  37.     **/
  38.     <T> List<T> packagingObjectData(SessionDataSet sessionDataSet, List<String> titleList, Class<? extends IotDbRecordAble>  clazz);
  39.     List<MeasurementSchema> buildMeasurementSchemas(Object object);
  40.     MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object object);
  41. }
复制代码
1.5、IotDbServiceImpl

  1. package com.example.service.impl;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.example.config.IotDbSessionConfig;
  4. import com.example.dto.IotDbRecordAble;
  5. import com.example.dto.MeasurementSchemaValuesDTO;
  6. import com.example.service.IotDbService;
  7. import lombok.SneakyThrows;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.apache.iotdb.common.rpc.thrift.TAggregationType;
  10. import org.apache.iotdb.isession.SessionDataSet;
  11. import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
  12. import org.apache.iotdb.session.pool.SessionPool;
  13. import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
  14. import org.apache.iotdb.tsfile.read.common.Field;
  15. import org.apache.iotdb.tsfile.read.common.RowRecord;
  16. import org.apache.iotdb.tsfile.write.record.Tablet;
  17. import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. import org.springframework.stereotype.Service;
  20. import java.lang.reflect.Type;
  21. import java.util.*;
  22. import java.util.stream.Collectors;
  23. @Service
  24. @Slf4j
  25. public class IotDbServiceImpl implements IotDbService {
  26.     @Autowired
  27.     private IotDbSessionConfig iotDBSessionConfig;
  28.     /**
  29.      * 单设备批量插入数据
  30.      *
  31.      * @param tablet
  32.      */
  33.     @Override
  34.     public void insertTablet(Tablet tablet) {
  35.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  36.         try {
  37.             log.info("iotdb数据入库:tablet:[{}]", tablet);
  38.             sessionPool.insertTablet(tablet);
  39.         } catch (Exception e) {
  40.             log.error("IotDBSession insertTablet失败: tablet={},  error={}", tablet, e.getMessage());
  41.         }
  42.     }
  43.     /**
  44.      * 多设备批量插入数据
  45.      *
  46.      * @param tablets
  47.      */
  48.     @Override
  49.     public void insertTablets(Map<String, Tablet> tablets) {
  50.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  51.         try {
  52.             log.info("iotdb数据入库:tablets:[{}]", tablets);
  53.             sessionPool.insertTablets(tablets);
  54.         } catch (Exception e) {
  55.             log.error("IotDBSession insertTablets失败: tablets={},  error={}", tablets, e.getMessage());
  56.         }
  57.     }
  58.     /**
  59.      * 单条数据插入(string类型数据项)
  60.      *
  61.      * @param deviceId     设备名(表名)root.ln.wf01.wt01
  62.      * @param time         时间戳
  63.      * @param measurements 数据项列表
  64.      * @param values       数据项对应值列表
  65.      */
  66.     @Override
  67.     public void insertStringRecord(String deviceId, long time, List<String> measurements, List<String> values) {
  68.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  69.         try {
  70.             log.info("iotdb数据入库:device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values);
  71.             sessionPool.insertRecord(deviceId, time, measurements, values);
  72.         } catch (Exception e) {
  73.             log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}",
  74.                     deviceId, time, measurements, values, e.getMessage());
  75.         }
  76.     }
  77.     /**
  78.      * 单条数据插入(不同类型数据项)
  79.      *
  80.      * @param deviceId     设备名(表名)root.ln.wf01.wt01
  81.      * @param time         时间戳
  82.      * @param measurements 数据项列表
  83.      * @param types        数据项对应类型列表
  84.      * @param values       数据项对应值列表
  85.      */
  86.     @Override
  87.     public void insertRecord(String deviceId, long time, List<String> measurements, List<TSDataType> types, List<Object> values) {
  88.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  89.         try {
  90.             log.info("iotdb数据入库:device_id:[{}], measurements:[{}], types:[{}], values:[{}]", deviceId, measurements, types, values);
  91.             sessionPool.insertRecord(deviceId, time, measurements, types, values);
  92.         } catch (Exception e) {
  93.             log.error("IotDBSession insertRecordHasTypes失败: deviceId={}, time={}, measurements={},types={}, values={}, error={}",
  94.                     deviceId, time, measurements, types, values, e.getMessage());
  95.         }
  96.     }
  97.     /**
  98.      * 多个设备多条数据插入(string类型数据项)
  99.      *
  100.      * @param deviceIds        多个设备名(表名)root.ln.wf01.wt01
  101.      * @param times            时间戳的列表
  102.      * @param measurementsList 数据项列表的列表
  103.      * @param valuesList       数据项对应值列表的列表
  104.      */
  105.     @Override
  106.     public void insertStringRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList) {
  107.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  108.         try {
  109.             log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], valuesList:[{}]", deviceIds, measurementsList, valuesList);
  110.             sessionPool.insertRecords(deviceIds, times, measurementsList, valuesList);
  111.         } catch (Exception e) {
  112.             log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, valuesList={}, error={}",
  113.                     deviceIds, times, measurementsList, valuesList, e.getMessage());
  114.         }
  115.     }
  116.     /**
  117.      * 多个设备多条数据插入(不同类型数据项)
  118.      *
  119.      * @param deviceIds        多个设备名(表名))root.ln.wf01.wt01
  120.      * @param times            时间戳的列表
  121.      * @param measurementsList 数据项列表的列表
  122.      * @param typesList        数据项对应类型列表的列表
  123.      * @param valuesList       数据项对应值列表的列表
  124.      */
  125.     @Override
  126.     public void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) {
  127.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  128.         try {
  129.             log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]", deviceIds, measurementsList, typesList, valuesList);
  130.             sessionPool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList);
  131.         } catch (Exception e) {
  132.             log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, typesList=[],valuesList={}, error={}",
  133.                     deviceIds, times, measurementsList, typesList, valuesList, e.getMessage());
  134.         }
  135.     }
  136.     /**
  137.      * 单个设备多条数据插入(string类型数据项)
  138.      *
  139.      * @param deviceId         单个设备名(表名))root.ln.wf01.wt01
  140.      * @param times            时间戳的列表
  141.      * @param measurementsList 数据项列表的列表
  142.      * @param valuesList       数据项对应值列表的列表
  143.      */
  144.     @Override
  145.     public void insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList) {
  146.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  147.         try {
  148.             log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], valuesList:[{}]", deviceId, measurementsList, valuesList);
  149.             sessionPool.insertStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList);
  150.         } catch (Exception e) {
  151.             log.error("IotDBSession insertStringRecordsOfOneDevice失败: deviceId={}, times={}, measurementsList={}, valuesList={}, error={}",
  152.                     deviceId, times, measurementsList, valuesList, e.getMessage());
  153.         }
  154.     }
  155.     /**
  156.      * 单个设备多条数据插入(不同类型数据项)
  157.      *
  158.      * @param deviceId         单个设备名(表名))root.ln.wf01.wt01
  159.      * @param times            时间戳的列表
  160.      * @param measurementsList 数据项列表的列表
  161.      * @param typesList        数据项对应类型列表的列表
  162.      * @param valuesList       数据项对应值列表的列表
  163.      */
  164.     @Override
  165.     public void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) {
  166.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  167.         try {
  168.             log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]", deviceId, measurementsList, typesList, valuesList);
  169.             sessionPool.insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList);
  170.         } catch (Exception e) {
  171.             log.error("IotDBSession insertRecordsOfOneDevice失败: deviceId={}, times={}, measurementsList={}, typesList=[],valuesList={}, error={}", deviceId, times, measurementsList, typesList, valuesList, e.getMessage());
  172.         }
  173.     }
  174.     /**
  175.      * 删除数据(删除一个时间序列在某个时间点前或这个时间点的数据)
  176.      *
  177.      * @param path    单个字段 root.ln.wf01.wt01.temperature
  178.      * @param endTime 删除时间点
  179.      */
  180.     @Override
  181.     public void deleteData(String path, long endTime) {
  182.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  183.         try {
  184.             log.info("iotdb数据删除:path:[{}], endTime:[{}]", path, endTime);
  185.             sessionPool.deleteData(path, endTime);
  186.         } catch (Exception e) {
  187.             log.error("IotDBSession deleteData失败: deviceId={}, times={},error={}", path, endTime, e.getMessage());
  188.         }
  189.     }
  190.     /**
  191.      * 删除数据(删除多个时间序列在某个时间点前或这个时间点的数据)
  192.      *
  193.      * @param paths   多个字段(表名)) root.ln.wf01.wt01.temperature
  194.      * @param endTime 删除时间点
  195.      */
  196.     @Override
  197.     public void deleteData(List<String> paths, long endTime) {
  198.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  199.         try {
  200.             log.info("iotdb数据删除:paths:[{}], endTime:[{}]", paths, endTime);
  201.             sessionPool.deleteData(paths, endTime);
  202.         } catch (Exception e) {
  203.             log.error("IotDBSession deleteData失败: paths={}, times={},error={}", paths, endTime, e.getMessage());
  204.         }
  205.     }
  206.     /**
  207.      * 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间)
  208.      *
  209.      * @param paths     多个字段(表名)) root.ln.wf01.wt01.temperature
  210.      * @param startTime 开始时间
  211.      * @param endTime   结束时间
  212.      * @param outTime   超时时间
  213.      * @return SessionDataSet  (Time,paths)
  214.      */
  215.     @Override
  216.     public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime) {
  217.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  218.         SessionDataSetWrapper sessionDataSetWrapper = null;
  219.         try {
  220.             log.info("iotdb数据查询:paths:[{}], startTime:[{}], endTime:[{}],outTime:[{}]", paths, startTime, endTime, outTime);
  221.             sessionDataSetWrapper = sessionPool.executeRawDataQuery(paths, startTime, endTime, outTime);
  222.             return sessionDataSetWrapper.getSessionDataSet();
  223.         } catch (Exception e) {
  224.             log.error("IotDBSession executeRawDataQuery失败: paths={},  startTime:[{}], endTime:[{}],outTime:[{}],error={}", paths, startTime, endTime, outTime, e.getMessage());
  225.         }
  226.         return null;
  227.     }
  228.     /**
  229.      * 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间)
  230.      *
  231.      * @param paths     多个字段(表名)) root.ln.wf01.wt01.temperature
  232.      * @param startTime 开始时间
  233.      * @param endTime   结束时间
  234.      * @param outTime   超时时间
  235.      * @param clazz     返回数据对应的对象(对象属性必须与字段名对应)
  236.      * @param <T>
  237.      * @return
  238.      */
  239.     @Override
  240.     public <T> List<T> executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime, Class<? extends IotDbRecordAble> clazz) {
  241.         SessionDataSet sessionDataSet = executeRawDataQuery(paths, startTime, endTime, outTime);
  242.         List<String> columnNames = sessionDataSet.getColumnNames();
  243.         List<T> resultEntities = null;
  244.         try {
  245.             resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
  246.         } catch (Exception e) {
  247.             log.error("IotDBSession executeRawDataQuery失败: paths={},  startTime:[{}], endTime:[{}],outTime:[{}],error={}", paths, startTime, endTime, outTime, e.getMessage());
  248.         }
  249.         return resultEntities;
  250.     }
  251.     /**
  252.      * 最新点查询(查询最后一条时间戳大于等于某个时间点的数据)
  253.      *
  254.      * @param paths    多个字段(表名)) root.ln.wf01.wt01.temperature
  255.      * @param lastTime 结束时间
  256.      * @return SessionDataSet
  257.      */
  258.     @Override
  259.     public SessionDataSet executeLastDataQuery(List<String> paths, long lastTime) {
  260.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  261.         SessionDataSetWrapper sessionDataSetWrapper = null;
  262.         try {
  263.             log.info("iotdb数据查询:paths:[{}], lastTime:[{}]", paths, lastTime);
  264.             sessionDataSetWrapper = sessionPool.executeLastDataQuery(paths, lastTime);
  265.             return sessionDataSetWrapper.getSessionDataSet();
  266.         } catch (Exception e) {
  267.             log.error("IotDBSession executeLastDataQuery失败: paths={},  lastTime:[{}], error={}", paths, lastTime, e.getMessage());
  268.         }
  269.         return null;
  270.     }
  271.     /**
  272.      * 最新点查询(查询最后一条时间戳大于等于某个时间点的数据)
  273.      *
  274.      * @param <T>
  275.      * @param paths    多个字段(表名)) root.ln.wf01.wt01.temperature
  276.      * @param lastTime 结束时间
  277.      * @param clazz    返回数据对应的对象(对象属性必须与字段名对应)
  278.      * @return
  279.      */
  280.     @Override
  281.     public <T> List<T> executeLastDataQuery(List<String> paths, long lastTime, Class<? extends IotDbRecordAble> clazz) {
  282.         SessionDataSet sessionDataSet = executeLastDataQuery(paths, lastTime);
  283.         List<String> columnNames = sessionDataSet.getColumnNames();
  284.         List<T> resultEntities = null;
  285.         try {
  286.             resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
  287.         } catch (Exception e) {
  288.             log.error("IotDBSession executeLastDataQuery失败: paths={},  lastTime:[{}], error={}", paths, lastTime, e.getMessage());
  289.         }
  290.         return resultEntities;
  291.     }
  292.     /**
  293.      * 最新点查询(快速查询单设备下指定序列最新点)
  294.      *
  295.      * @param db               root.ln.wf01
  296.      * @param device           root.ln.wf01.wt01
  297.      * @param sensors          temperature,status(字段名)
  298.      * @param isLegalPathNodes true(避免路径校验)
  299.      * @return SessionDataSet
  300.      */
  301.     @Override
  302.     public SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes) {
  303.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  304.         SessionDataSetWrapper sessionDataSetWrapper = null;
  305.         try {
  306.             log.info("iotdb数据查询:db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}]", db, device, sensors, isLegalPathNodes);
  307.             sessionDataSetWrapper = sessionPool.executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes);
  308.             return sessionDataSetWrapper.getSessionDataSet();
  309.         } catch (Exception e) {
  310.             log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage());
  311.         }
  312.         return null;
  313.     }
  314.     /**
  315.      * @param db               root.ln.wf01
  316.      * @param device           root.ln.wf01.wt01
  317.      * @param sensors          temperature,status(字段名)
  318.      * @param isLegalPathNodes true(避免路径校验)
  319.      * @param clazz            返回数据对应的对象(对象属性必须与字段名对应)
  320.      * @param <T>
  321.      * @return
  322.      */
  323.     @Override
  324.     public <T> List<T> executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes, Class<? extends IotDbRecordAble> clazz) {
  325.         SessionDataSet sessionDataSet = executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes);
  326.         List<String> columnNames = sessionDataSet.getColumnNames();
  327.         List<T> resultEntities = null;
  328.         try {
  329.             resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
  330.         } catch (Exception e) {
  331.             log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage());
  332.         }
  333.         return resultEntities;
  334.     }
  335.     /**
  336.      * 聚合查询
  337.      *
  338.      * @param paths        多个字段(表名)) root.ln.wf01.wt01.temperature
  339.      * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
  340.      * @return SessionDataSet
  341.      */
  342.     @Override
  343.     public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations) {
  344.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  345.         SessionDataSetWrapper sessionDataSetWrapper = null;
  346.         try {
  347.             log.info("iotdb聚合查询:paths:[{}], aggregations:[{}]", paths, aggregations);
  348.             sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations);
  349.             return sessionDataSetWrapper.getSessionDataSet();
  350.         } catch (Exception e) {
  351.             log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,error={}", paths, aggregations, e.getMessage());
  352.         }
  353.         return null;
  354.     }
  355.     /**
  356.      * 聚合查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间)
  357.      *
  358.      * @param paths        多个字段(表名)) root.ln.wf01.wt01.temperature
  359.      * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
  360.      * @param startTime    开始时间(包含)
  361.      * @param endTime      结束时间
  362.      * @return SessionDataSet
  363.      */
  364.     @Override
  365.     public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime) {
  366.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  367.         SessionDataSetWrapper sessionDataSetWrapper = null;
  368.         try {
  369.             log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}]", paths, aggregations, startTime, endTime);
  370.             sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime);
  371.             return sessionDataSetWrapper.getSessionDataSet();
  372.         } catch (Exception e) {
  373.             log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,startTime:[{}], endTime:[{}],error={}", paths, aggregations, startTime, endTime, e.getMessage());
  374.         }
  375.         return null;
  376.     }
  377.     /**
  378.      * 聚合查询(支持按照时间区间分段查询)
  379.      *
  380.      * @param paths        多个字段(表名)) root.ln.wf01.wt01.temperature
  381.      * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
  382.      * @param startTime    开始时间(包含)
  383.      * @param endTime      结束时间
  384.      * @param interval
  385.      * @return SessionDataSet
  386.      */
  387.     @Override
  388.     public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval) {
  389.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  390.         SessionDataSetWrapper sessionDataSetWrapper = null;
  391.         try {
  392.             log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}]", paths, aggregations, startTime, endTime, interval);
  393.             sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime, interval);
  394.             return sessionDataSetWrapper.getSessionDataSet();
  395.         } catch (Exception e) {
  396.             log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,startTime:[{}], endTime:[{}], interval:[{}], error={}", paths, aggregations, startTime, endTime, interval, e.getMessage());
  397.         }
  398.         return null;
  399.     }
  400.     /**
  401.      * 聚合查询(支持按照时间区间分段查询)
  402.      *
  403.      * @param paths        多个字段(表名)) root.ln.wf01.wt01.temperature
  404.      * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
  405.      * @param startTime    开始时间(包含)
  406.      * @param endTime      结束时间
  407.      * @param interval
  408.      * @param slidingStep
  409.      * @return SessionDataSet
  410.      */
  411.     @Override
  412.     public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval, long slidingStep) {
  413.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  414.         SessionDataSetWrapper sessionDataSetWrapper = null;
  415.         try {
  416.             log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}], slidingStep:[{}]", paths, aggregations, startTime, endTime, interval, slidingStep);
  417.             sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime, interval, slidingStep);
  418.             return sessionDataSetWrapper.getSessionDataSet();
  419.         } catch (Exception e) {
  420.             log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,startTime:[{}], endTime:[{}], interval:[{}], slidingStep:[{}] ,error={}", paths, aggregations, startTime, endTime, interval, slidingStep, e.getMessage());
  421.         }
  422.         return null;
  423.     }
  424.     /**
  425.      * SQL查询
  426.      *
  427.      * @param sql
  428.      * @return
  429.      */
  430.     @Override
  431.     public SessionDataSet executeQueryStatement(String sql) {
  432.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  433.         SessionDataSetWrapper sessionDataSetWrapper = null;
  434.         try {
  435.             log.info("iotdb SQL查询:sql:[{}]", sql);
  436.              sessionDataSetWrapper = sessionPool.executeQueryStatement(sql);
  437.             return sessionDataSetWrapper.getSessionDataSet();
  438.         } catch (Exception e) {
  439.             log.error("IotDBSession executeQueryStatement失败:sql:[{}],error={}", sql, e.getMessage());
  440.         }
  441.         return null;
  442.     }
  443.     /**
  444.      * SQL非查询
  445.      *
  446.      * @param sql
  447.      */
  448.     @Override
  449.     public void executeNonQueryStatement(String sql) {
  450.         SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
  451.         try {
  452.             log.info("iotdb SQL无查询:sql:[{}]", sql);
  453.             sessionPool.executeNonQueryStatement(sql);
  454.         } catch (Exception e) {
  455.             log.error("IotDBSession executeNonQueryStatement失败:sql:[{}],error={}", sql, e.getMessage());
  456.         }
  457.     }
  458.     /**
  459.      * 封装处理数据
  460.      *
  461.      * @param sessionDataSet
  462.      * @param titleList
  463.      */
  464.     @SneakyThrows
  465.     @Override
  466.     public List<Map<String, Object>> packagingMapData(SessionDataSet sessionDataSet, List<String> titleList) {
  467.         int fetchSize = sessionDataSet.getFetchSize();
  468.         List<Map<String, Object>> resultList = new ArrayList<>();
  469.         titleList.remove("Time");
  470.         if (fetchSize > 0) {
  471.             while (sessionDataSet.hasNext()) {
  472.                 Map<String, Object> resultMap = new HashMap<>();
  473.                 RowRecord next = sessionDataSet.next();
  474.                 List<Field> fields = next.getFields();
  475.                 String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
  476.                 resultMap.put("time", timeString);
  477.                 for (int i = 0; i < fields.size(); i++) {
  478.                     Field field = fields.get(i);
  479.                     if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) {
  480.                         resultMap.put(splitString(titleList.get(i)), null);
  481.                     } else {
  482.                         resultMap.put(splitString(titleList.get(i)), field.getObjectValue(field.getDataType()).toString());
  483.                     }
  484.                 }
  485.                 resultList.add(resultMap);
  486.             }
  487.         }
  488.         return resultList;
  489.     }
  490.     /**
  491.      * 封装处理数据(不支持聚合查询)
  492.      *
  493.      * @param sessionDataSet 查询返回的结果集
  494.      * @param titleList      查询返回的结果集内的字段名
  495.      * @param clazz          返回数据对应的对象(对象属性必须与字段名对应)
  496.      * @param <T>
  497.      * @return
  498.      */
  499.     @SneakyThrows
  500.     @Override
  501.     public <T> List<T> packagingObjectData(SessionDataSet sessionDataSet, List<String> titleList, Class<? extends IotDbRecordAble> clazz) {
  502.         int fetchSize = sessionDataSet.getFetchSize();
  503.         List<T> resultList = new ArrayList<>();
  504.         titleList.remove("Time");
  505.         if (fetchSize > 0) {
  506.             while (sessionDataSet.hasNext()) {
  507.                 Map<String, Object> resultMap = new HashMap<>();
  508.                 RowRecord next = sessionDataSet.next();
  509.                 List<Field> fields = next.getFields();
  510.                 String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
  511.                 resultMap.put("time", timeString);
  512.                 if (titleList.stream().anyMatch(str -> str.contains("."))) {
  513.                     for (int i = 0; i < fields.size(); i++) {
  514.                         Field field = fields.get(i);
  515.                         String title = titleList.get(i);
  516.                         if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) {
  517.                             resultMap.put(splitString(title), null);
  518.                         } else {
  519.                             resultMap.put(splitString(title), field.getObjectValue(field.getDataType()).toString());
  520.                         }
  521.                     }
  522.                 } else {
  523.                     Field fieldName = fields.get(0);
  524.                     Field fieldValue = fields.get(1);
  525.                     Field fieldDataType = fields.get(2);
  526.                     if (fieldName.getDataType() != null && fieldName.getObjectValue(fieldName.getDataType()) != null) {
  527.                         String mapKey = fieldName.getObjectValue(fieldName.getDataType()).toString();
  528.                         Object mapValue = convertStringToType(fieldValue.getObjectValue(fieldValue.getDataType()).toString(), fieldDataType.getObjectValue(fieldDataType.getDataType()).toString());
  529.                         resultMap.put(splitString(mapKey), mapValue);
  530.                     }
  531.                 }
  532.                 String jsonString = JSON.toJSONString(resultMap);
  533.                 resultList.add(JSON.parseObject(jsonString, (Type) clazz));
  534.             }
  535.         }
  536.         return resultList;
  537.     }
  538.     /**
  539.      * 分割获取字段名
  540.      *
  541.      * @param str
  542.      * @return 字段名
  543.      */
  544.     public static String splitString(String str) {
  545.         String[] parts = str.split("\\.");
  546.         if (parts.length <= 0) {
  547.             return str;
  548.         } else {
  549.             return parts[parts.length - 1];
  550.         }
  551.     }
  552.     /**
  553.      * 根据数据值和数据类型返回对应数据类型数据
  554.      *
  555.      * @param value    数据值
  556.      * @param typeName 数据类型
  557.      * @return 转换后的数据值
  558.      */
  559.     public static Object convertStringToType(String value, String typeName) {
  560.         String type = typeName.toLowerCase();
  561.         if (type.isEmpty()) {
  562.             return value;
  563.         }
  564.         if ("boolean".equals(type)) {
  565.             return Boolean.parseBoolean(value);
  566.         } else if ("double".equals(type)) {
  567.             return Double.parseDouble(value);
  568.         } else if ("int32".equals(type)) {
  569.             return Integer.parseInt(value);
  570.         } else if ("int64".equals(type)) {
  571.             return Long.parseLong(value);
  572.         } else if ("float".equals(type)) {
  573.             return Float.parseFloat(value);
  574.         } else if ("text".equals(type)) {
  575.             return value;
  576.         } else {
  577.             return value;
  578.         }
  579.     }
  580.     /**
  581.      * 根据对象属性的数据类型返回对应的TSDataType
  582.      *
  583.      * @param type 属性的数据类型
  584.      * @return TSDataType
  585.      */
  586.     public static TSDataType getTsDataTypeByString(String type) {
  587.         String typeName = splitString(type).toLowerCase();
  588.         if ("boolean".equals(typeName)) {
  589.             return TSDataType.BOOLEAN;
  590.         } else if ("double".equals(typeName)) {
  591.             return TSDataType.DOUBLE;
  592.         } else if ("int".equals(typeName) || "integer".equals(typeName)) {
  593.             return TSDataType.INT32;
  594.         } else if ("long".equals(typeName)) {
  595.             return TSDataType.INT64;
  596.         } else if ("float".equals(typeName)) {
  597.             return TSDataType.FLOAT;
  598.         } else if ("text".equals(typeName)) {
  599.             return TSDataType.TEXT;
  600.         } else if ("string".equals(typeName)) {
  601.             return TSDataType.TEXT;
  602.         } else {
  603.             return TSDataType.UNKNOWN;
  604.         }
  605.     }
  606.     /**
  607.      * 根据对象构建MeasurementSchemas
  608.      *
  609.      * @param obj 对象
  610.      * @return
  611.      */
  612.     @Override
  613.     public List<MeasurementSchema> buildMeasurementSchemas(Object obj) {
  614.         java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields();
  615.         List<MeasurementSchema> schemaList = Arrays.stream(fields).map(field -> new MeasurementSchema(field.getName(), getTsDataTypeByString(field.getType().getName()))).collect(Collectors.toList());
  616.         return schemaList;
  617.     }
  618.     /**
  619.      * 根据对象构建MeasurementSchemaValuesDTO
  620.      *
  621.      * @param obj 对象
  622.      * @return
  623.      */
  624.     @SneakyThrows
  625.     @Override
  626.     public MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object obj) {
  627.         MeasurementSchemaValuesDTO measurementSchemaValuesDTO = new MeasurementSchemaValuesDTO();
  628.         java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields();
  629.         List<MeasurementSchema> schemaList = new ArrayList<>();
  630.         List<Object> values = new ArrayList<>();
  631.         List<Integer> valuesIsNullIndex = new ArrayList<>();
  632.         int valueIndex = 0;
  633.         for (java.lang.reflect.Field field : fields) {
  634.             MeasurementSchema measurementSchema = new MeasurementSchema(field.getName(), getTsDataTypeByString(field.getType().getName()));
  635.             schemaList.add(measurementSchema);
  636.             Object value = field.get(obj);
  637.             if (value == null) {
  638.                 valuesIsNullIndex.add(valueIndex);
  639.             }
  640.             values.add(value);
  641.             valueIndex++;
  642.         }
  643.         measurementSchemaValuesDTO.setSchemaList(schemaList);
  644.         measurementSchemaValuesDTO.setValues(values);
  645.         return measurementSchemaValuesDTO;
  646.     }
  647. }
复制代码
1.6、IotDbTest

  1. package com.example;
  2. import com.example.dto.InsertDataDTO;
  3. import com.example.dto.MeasurementSchemaValuesDTO;
  4. import com.example.dto.ResultEntity;
  5. import com.example.dto.TestDataType;
  6. import com.example.service.IotDbService;
  7. import lombok.SneakyThrows;
  8. import org.apache.iotdb.common.rpc.thrift.TAggregationType;
  9. import org.apache.iotdb.isession.SessionDataSet;
  10. import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
  11. import org.apache.iotdb.tsfile.write.record.Tablet;
  12. import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
  13. import org.junit.Test;
  14. import org.junit.runner.RunWith;
  15. import org.springframework.beans.factory.annotation.Autowired;
  16. import org.springframework.boot.test.context.SpringBootTest;
  17. import org.springframework.test.context.junit4.SpringRunner;
  18. import java.util.ArrayList;
  19. import java.util.List;
  20. import java.util.Map;
  21. import java.util.Random;
  22. import java.util.stream.Collectors;
  23. import java.util.stream.IntStream;
  24. @SpringBootTest
  25. @RunWith(SpringRunner.class)
  26. public class IotDbTest {
  27.     @Autowired
  28.     private IotDbService iotDbService;
  29.     private static final String ROOT_LN_WF01 = "root.ln.wf01";
  30.     private static final String ROOT_LN_WF01_WT01 = "root.ln.wf01.wt01";
  31.     private static final String ROOT_LN_WF01_WT01_STATUS = "root.ln.wf01.wt01.status";
  32.     private static final String ROOT_LN_WF01_WT01_HARDWARE = "root.ln.wf01.wt01.hardware";
  33.     private static final String ROOT_LN_WF01_WT01_TEMPERATURE = "root.ln.wf01.wt01.temperature";
  34.     private static final String SELECT_ROOT_LN_WF01_WT01 = "select * from root.ln.wf01.wt01";
  35.     private static Random random = new Random();
  36.     /**
  37.      * 测试单个设备批量插入数据
  38.      */
  39.     @Test
  40.     public void testInsertTablet() {
  41.         List<InsertDataDTO> insertDataDTOS = new InsertDataDTO().buildList();
  42.         List<MeasurementSchema> schemaList = iotDbService.buildMeasurementSchemas(insertDataDTOS.get(0));
  43.         String deviceId = ROOT_LN_WF01_WT01;
  44.         int maxRowNumber = insertDataDTOS.size();
  45.         Tablet tablet = new Tablet(deviceId, schemaList, maxRowNumber);
  46.         long timestamp = System.currentTimeMillis();
  47.         for (int row = 0; row < maxRowNumber; row++) {
  48.             int rowIndex = tablet.rowSize++;
  49.             tablet.addTimestamp(rowIndex, timestamp);
  50.             MeasurementSchemaValuesDTO measurementSchemaValuesDTO = iotDbService.buildMeasurementSchemasAndValues(insertDataDTOS.get(row));
  51.             List<Object> iotValues = measurementSchemaValuesDTO.getValues();
  52.             List<MeasurementSchema> schemaList2 = measurementSchemaValuesDTO.getSchemaList();
  53.             List<String> iotMeasurements = schemaList2.stream().map(MeasurementSchema::getMeasurementId).collect(Collectors.toList());
  54.             for (int i = 0; i < iotMeasurements.size(); i++) {
  55.                 tablet.addValue(iotMeasurements.get(i), rowIndex, iotValues.get(i));
  56.             }
  57.             if (tablet.rowSize == tablet.getMaxRowNumber()) {
  58.                 iotDbService.insertTablet(tablet);
  59.                 tablet.reset();
  60.             }
  61.             timestamp++;
  62.         }
  63.     }
  64.     /**
  65.      * 测试单个设备批量插入数据(可插入空值)
  66.      */
  67.     @Test
  68.     public void testInsertTabletWithNullValues() {
  69.         List<InsertDataDTO> insertDataDTOS = new InsertDataDTO().buildList();
  70.         List<MeasurementSchema> schemaList = iotDbService.buildMeasurementSchemas(insertDataDTOS.get(0));
  71.         String deviceId = ROOT_LN_WF01_WT01;
  72.         int maxRowNumber = insertDataDTOS.size();
  73.         Tablet tablet = new Tablet(deviceId, schemaList, maxRowNumber);
  74.         long timestamp = System.currentTimeMillis();
  75.         tablet.initBitMaps();
  76.         for (int row = 0; row < maxRowNumber; row++) {
  77.             int rowIndex = tablet.rowSize++;
  78.             tablet.addTimestamp(rowIndex, timestamp);
  79.             MeasurementSchemaValuesDTO measurementSchemaValuesDTO = iotDbService.buildMeasurementSchemasAndValues(insertDataDTOS.get(row));
  80.             List<Object> iotValues = measurementSchemaValuesDTO.getValues();
  81.             List<MeasurementSchema> schemaList2 = measurementSchemaValuesDTO.getSchemaList();
  82.             List<String> iotMeasurements = schemaList2.stream().map(MeasurementSchema::getMeasurementId).collect(Collectors.toList());
  83.             List<Integer> valueIsNullIndex = measurementSchemaValuesDTO.getValueIsNullIndex();
  84.             IntStream.range(0, iotMeasurements.size()).forEach(i -> tablet.addValue(iotMeasurements.get(i), rowIndex, iotValues.get(i)));
  85.             if (valueIsNullIndex != null && !valueIsNullIndex.isEmpty()) {
  86.                 for (Integer isNullIndex : valueIsNullIndex) {
  87.                     tablet.bitMaps[isNullIndex].mark(row);
  88.                 }
  89.             }
  90.             if (tablet.rowSize == tablet.getMaxRowNumber()) {
  91.                 iotDbService.insertTablet(tablet);
  92.                 tablet.reset();
  93.             }
  94.             timestamp++;
  95.         }
  96.     }
  97.     /**
  98.      * 测试单条数据插入
  99.      */
  100.     @Test
  101.     public void testInsertRecord() {
  102.         String deviceId = ROOT_LN_WF01_WT01;
  103.         long currentTime = System.currentTimeMillis();
  104.         InsertDataDTO insertDataDto = new InsertDataDTO().buildOne();
  105.         MeasurementSchemaValuesDTO measurementSchemaValuesDTO = iotDbService.buildMeasurementSchemasAndValues(insertDataDto);
  106.         List<Object> iotValues = measurementSchemaValuesDTO.getValues();
  107.         List<MeasurementSchema> schemaList = measurementSchemaValuesDTO.getSchemaList();
  108.         List<String> iotMeasurements = schemaList.stream().map(MeasurementSchema::getMeasurementId).collect(Collectors.toList());
  109.         List<TSDataType> tsDataTypeList = schemaList.stream().map(MeasurementSchema::getType).collect(Collectors.toList());
  110.         iotDbService.insertRecord(deviceId, currentTime, iotMeasurements, tsDataTypeList, iotValues);
  111.     }
  112.     /**
  113.      * 测试单个设备多条数据插入
  114.      */
  115.     @Test
  116.     public void testInsertRecordsOfOneDevice() {
  117.         String deviceId = ROOT_LN_WF01_WT01;
  118.         List<InsertDataDTO> insertDataDTOS = new InsertDataDTO().buildList();
  119.         int recordNum = insertDataDTOS.size();
  120.         List<Long> timeList = new ArrayList<>();
  121.         List<List<String>> iotMeasurementsList = new ArrayList<>();
  122.         List<List<TSDataType>> tsDataTypesList = new ArrayList<>();
  123.         List<List<Object>> valuesList = new ArrayList<>();
  124.         for (int i = 0; i < recordNum; i++) {
  125.             long currentTime = System.currentTimeMillis();
  126.             timeList.add(currentTime + i);
  127.             MeasurementSchemaValuesDTO measurementSchemaValuesDTO = iotDbService.buildMeasurementSchemasAndValues(insertDataDTOS.get(i));
  128.             List<Object> iotValues = measurementSchemaValuesDTO.getValues();
  129.             List<MeasurementSchema> schemaList = measurementSchemaValuesDTO.getSchemaList();
  130.             List<String> iotMeasurements = schemaList.stream().map(MeasurementSchema::getMeasurementId).collect(Collectors.toList());
  131.             List<TSDataType> tsDataTypeList = schemaList.stream().map(MeasurementSchema::getType).collect(Collectors.toList());
  132.             iotMeasurementsList.add(iotMeasurements);
  133.             tsDataTypesList.add(tsDataTypeList);
  134.             valuesList.add(iotValues);
  135.         }
  136.         iotDbService.insertRecordsOfOneDevice(deviceId, timeList, iotMeasurementsList, tsDataTypesList, valuesList);
  137.     }
  138.     /**
  139.      * 测试时间查询
  140.      */
  141.     @SneakyThrows
  142.     @Test
  143.     public void testExecuteRawDataQuery() {
  144.         List<String> paths = new ArrayList<>();
  145.         paths.add(ROOT_LN_WF01_WT01_TEMPERATURE);
  146.         paths.add(ROOT_LN_WF01_WT01_HARDWARE);
  147.         long startTime = 1718959439344L;
  148.         long endTime = System.currentTimeMillis();
  149.         long outTime = 60000;
  150.         SessionDataSet sessionDataSet = iotDbService.executeRawDataQuery(paths, startTime, endTime, outTime);
  151.         System.out.println(sessionDataSet.getColumnNames());
  152.         sessionDataSet.setFetchSize(1024); // default is 10000
  153.         while (sessionDataSet.hasNext()) {
  154.             System.out.println(sessionDataSet.next());
  155.         }
  156.         List<ResultEntity> resultEntities = iotDbService.executeRawDataQuery(paths, startTime, endTime, outTime, ResultEntity.class);
  157.         System.out.println(resultEntities);
  158.         System.out.println(resultEntities.size());
  159.     }
  160.     /**
  161.      * 测试单个设备最新时间点查询
  162.      */
  163.     @SneakyThrows
  164.     @Test
  165.     public void testExecuteLastDataQueryForOneDevice() {
  166.         String db = ROOT_LN_WF01;
  167.         String device = ROOT_LN_WF01_WT01;
  168.         List<String> sensors = new ArrayList<>();
  169.         sensors.add("temperature");
  170.         sensors.add("hardware");
  171.         sensors.add("status");
  172.         SessionDataSet sessionDataSet = iotDbService.executeLastDataQueryForOneDevice(db, device, sensors, true);
  173.         System.out.println(sessionDataSet.getColumnNames());
  174.         sessionDataSet.setFetchSize(1024); // default is 10000
  175.         while (sessionDataSet.hasNext()) {
  176.             System.out.println(sessionDataSet.next());
  177.         }
  178.         List<ResultEntity> resultEntities = iotDbService.executeLastDataQueryForOneDevice(db, device, sensors, true, ResultEntity.class);
  179.         System.out.println(resultEntities);
  180.         System.out.println(resultEntities.size());
  181.     }
  182.     /**
  183.      * 测试最新点查询
  184.      */
  185.     @SneakyThrows
  186.     @Test
  187.     public void testExecuteLastDataQuery() {
  188.         List<String> paths = new ArrayList<>();
  189.         paths.add(ROOT_LN_WF01_WT01_TEMPERATURE);
  190.         paths.add(ROOT_LN_WF01_WT01_HARDWARE);
  191.         long lastTime = 1718959439344L;
  192.         long endTime = System.currentTimeMillis();
  193.         SessionDataSet sessionDataSet = iotDbService.executeLastDataQuery(paths, lastTime);
  194.         System.out.println(sessionDataSet.getColumnNames());
  195.         sessionDataSet.setFetchSize(1024); // default is 10000
  196.         while (sessionDataSet.hasNext()) {
  197.             System.out.println(sessionDataSet.next());
  198.         }
  199.         List<ResultEntity> resultEntities = iotDbService.executeLastDataQuery(paths, lastTime, ResultEntity.class);
  200.         System.out.println(resultEntities);
  201.         System.out.println(resultEntities.size());
  202.     }
  203.     /**
  204.      * 测试聚合查询
  205.      */
  206.     @SneakyThrows
  207.     @Test
  208.     public void testExecuteAggregationQuery() {
  209.         List<String> paths = new ArrayList<>();
  210.         paths.add(ROOT_LN_WF01_WT01_TEMPERATURE);
  211.         List<TAggregationType> aggregations = new ArrayList<>();
  212.         aggregations.add(TAggregationType.SUM);
  213.         SessionDataSet sessionDataSet = iotDbService.executeAggregationQuery(paths, aggregations);
  214.         System.out.println(sessionDataSet.getColumnNames());
  215.         sessionDataSet.setFetchSize(1024); // default is 10000
  216.         while (sessionDataSet.hasNext()) {
  217.             System.out.println(sessionDataSet.next());
  218.         }
  219.     }
  220.     @SneakyThrows
  221.     @Test
  222.     public void testQueryByIterator() {
  223.         String sql = SELECT_ROOT_LN_WF01_WT01;
  224.         SessionDataSet sessionDataSet = iotDbService.executeQueryStatement(sql);
  225.         SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
  226.         System.out.println(sessionDataSet.getColumnNames());
  227.         sessionDataSet.setFetchSize(1024); // default is 10000
  228.         while (iterator.next()) {
  229.             float aFloat = iterator.getFloat(ROOT_LN_WF01_WT01_TEMPERATURE);
  230.             String aString = iterator.getString(ROOT_LN_WF01_WT01_HARDWARE);
  231.             boolean aBoolean = iterator.getBoolean(ROOT_LN_WF01_WT01_STATUS);
  232.             System.out.println(aFloat);
  233.             System.out.println(aString);
  234.             System.out.println(aBoolean);
  235.         }
  236.     }
  237.     /**
  238.      * 测试删除数据
  239.      */
  240.     @Test
  241.     public void testDeleteData() {
  242.         List<String> paths = new ArrayList<>();
  243.         paths.add(ROOT_LN_WF01_WT01_TEMPERATURE);
  244.         paths.add(ROOT_LN_WF01_WT01_STATUS);
  245.         paths.add(ROOT_LN_WF01_WT01_HARDWARE);
  246.         long endTime = 1719212858916L;
  247.         iotDbService.deleteData(paths, endTime);
  248.     }
  249.     /**
  250.      * 测试SQL查询
  251.      */
  252.     @SneakyThrows
  253.     @Test
  254.     public void testExecuteQueryStatement() {
  255.         String sql = SELECT_ROOT_LN_WF01_WT01;
  256.         SessionDataSet sessionDataSet = iotDbService.executeQueryStatement(sql);
  257.         System.out.println(sessionDataSet.getColumnNames());
  258.         sessionDataSet.setFetchSize(1024); // default is 10000
  259.         while (sessionDataSet.hasNext()) {
  260.             System.out.println(sessionDataSet.next()    );
  261.         }
  262.     }
  263.     /**
  264.      * 测试SQL执行(不包含查询)
  265.      */
  266.     @Test
  267.     public void testExecuteNonQueryStatement() {
  268.         String sql = "insert into root.ln.wf01.wt01(temperature, hardware, status) values (2.1, 'v2', false)";
  269.         iotDbService.executeNonQueryStatement(sql);
  270.     }
  271.     /**
  272.      * 测试封装查询结果
  273.      */
  274.     @Test
  275.     public void testPackagingMapData() {
  276.         String sql = SELECT_ROOT_LN_WF01_WT01;
  277.         SessionDataSet sessionDataSet = iotDbService.executeQueryStatement(sql);
  278.         List<String> columnNames = sessionDataSet.getColumnNames();
  279.         List<Map<String, Object>> rtList = iotDbService.packagingMapData(sessionDataSet, columnNames);
  280.         System.out.println(rtList);
  281.     }
  282.     /**
  283.      * 测试封装查询结果
  284.      */
  285.     @Test
  286.     public void testPackagingObjectData() {
  287.         String sql = SELECT_ROOT_LN_WF01_WT01;
  288.         SessionDataSet sessionDataSet = iotDbService.executeQueryStatement(sql);
  289.         List<String> columnNames = sessionDataSet.getColumnNames();
  290.         List<ResultEntity> resultEntities = iotDbService.packagingObjectData(sessionDataSet, columnNames, ResultEntity.class);
  291.         System.out.println(resultEntities);
  292.     }
  293.     /**
  294.      * 测试构建MeasurementSchemasAndValues
  295.      */
  296.     @Test
  297.     public void testBuildMeasurementSchema() {
  298.         TestDataType testDataType = new TestDataType();
  299.         testDataType.setTestDouble(12D);
  300.         testDataType.setTestLong(12L);
  301.         testDataType.setHardware("ss");
  302.         testDataType.setStatus(true);
  303.         testDataType.setTemperature(12.0F);
  304.         List<MeasurementSchema> schemaList = iotDbService.buildMeasurementSchemas(testDataType);
  305.         System.out.println(schemaList);
  306.         List<String> iotMeasurements = schemaList.stream().map(MeasurementSchema::getMeasurementId).collect(Collectors.toList());
  307.         System.out.println(iotMeasurements);
  308.         List<TSDataType> tsDataTypeList = schemaList.stream().map(MeasurementSchema::getType).collect(Collectors.toList());
  309.         System.out.println(tsDataTypeList);
  310.         MeasurementSchemaValuesDTO measurementSchemaValuesDTO = iotDbService.buildMeasurementSchemasAndValues(testDataType);
  311.         System.out.println(measurementSchemaValuesDTO.getSchemaList());
  312.         System.out.println(measurementSchemaValuesDTO.getValues());
  313.     }
  314. }
复制代码
1.7、Entity & DTO


  1. package com.example.dto;
  2. import lombok.Data;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. @Data
  6. public class InsertDataDTO {
  7.     public Float temperature;
  8.     public String hardware;
  9.     public Boolean status;
  10.     public InsertDataDTO buildOne() {
  11.         InsertDataDTO insertDataDTO = new InsertDataDTO();
  12.         insertDataDTO.setHardware("ss");
  13.         insertDataDTO.setStatus(true);
  14.         insertDataDTO.setTemperature(12.0F);
  15.         return insertDataDTO;
  16.     }
  17.     public List<InsertDataDTO> buildList() {
  18.         List<InsertDataDTO> insertDataDTOS = new ArrayList<>();
  19.         int buildNum = 10;
  20.         for (int i = 0; i < buildNum; i++) {
  21.             InsertDataDTO insertDataDTO = new InsertDataDTO();
  22.             insertDataDTO.setHardware(i % 2 == 0 ? "pp" + i : null);
  23.             insertDataDTO.setStatus(i % 2 == 0);
  24.             insertDataDTO.setTemperature(12.0F + i);
  25.             insertDataDTOS.add(insertDataDTO);
  26.         }
  27.         return insertDataDTOS;
  28.     }
  29. }
复制代码
  1. package com.example.dto;
  2. import lombok.Data;
  3. @Data
  4. public class IotDbRecordAble {
  5. }
复制代码
  1. package com.example.dto;
  2. import lombok.Data;
  3. import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
  4. import java.util.List;
  5. @Data
  6. public class MeasurementSchemaValuesDTO {
  7.     List<MeasurementSchema> schemaList;
  8.     List<Object> values;
  9.     List<Integer> valueIsNullIndex;
  10. }
复制代码
  1. package com.example.dto;
  2. import lombok.Data;
  3. @Data
  4. public class ResultEntity extends IotDbRecordAble {
  5.     public Float temperature;
  6.     public String hardware;
  7.     public Boolean status;
  8.     public String time;
  9. }
复制代码
  1. package com.example.dto;
  2. import lombok.Data;
  3. @Data
  4. public class TestDataType {
  5.     public Float temperature;
  6.     public String hardware;
  7.     public Boolean status;
  8.     public Double testDouble;
  9.     public Long testLong;
  10. }
复制代码
这里利用的是Java原生接口的方式来访问数据库,进行新增、查询、删除,适用于高吞吐性能场景,大批量的数据处理。下一节内容采用mybatis的方式。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

守听

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表