阐明:项目需利用时序数据库,后经调研选用了IoTDB数据库,因此需要在SpringBoot框架内集成该数据库,来进行数据查询、插入操作。根据官方文档Java可采用多种方式访问数据库,分别是Java原生接口、JDBC(不保举)和REST API,下面选用Java原生接口和MyBatis的方式操作数据库。
现实应用中需要高性能吞吐则利用Java原生接口的方式,但一些小量级的增删查可以利用MyBatis的方式。
前提:已摆设安装IoTDB数据库,具体安装摆设流程可查察官网文档。
1、IotDB-Session
1.1、pom.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>org.example</groupId>
- <artifactId>springboot-iotdb</artifactId>
- <version>1.0-SNAPSHOT</version>
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- </properties>
- <parent>
- <artifactId>spring-boot-starter-parent</artifactId>
- <groupId>org.springframework.boot</groupId>
- <version>2.1.5.RELEASE</version>
- </parent>
- <dependencies>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>RELEASE</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>cn.hutool</groupId>
- <artifactId>hutool-all</artifactId>
- <version>5.8.28</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-autoconfigure</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <!-- LOG4J -->
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.16</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-session</artifactId>
- <version>1.3.1</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba.fastjson2</groupId>
- <artifactId>fastjson2</artifactId>
- <version>2.0.51</version>
- </dependency>
- </dependencies>
- </project>
复制代码 1.2、application.yml
- spring:
- iotdb:
- username: root
- password: root
- ip: 100.100.100.100
- port: 6667
- maxSize: 100
- server:
- port: 8080
复制代码 1.3、IotDbSessionConfig
- package com.example.config;
- import org.apache.iotdb.session.pool.SessionPool;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.stereotype.Component;
- @Component
- @Configuration
- public class IotDbSessionConfig {
- @Value("${spring.iotdb.username}")
- private String username;
- @Value("${spring.iotdb.password}")
- private String password;
- @Value("${spring.iotdb.ip}")
- private String ip;
- @Value("${spring.iotdb.port}")
- private int port;
- @Value("${spring.iotdb.maxSize}")
- private int maxSize;
- private static SessionPool sessionPool;
- public SessionPool getSessionPool() {
- if (sessionPool == null) {
- sessionPool = new SessionPool(ip, port, username, password, maxSize);
- }
- return sessionPool;
- }
- }
复制代码 1.4、IotDbService
- package com.example.service;
- import com.example.dto.IotDbRecordAble;
- import com.example.dto.MeasurementSchemaValuesDTO;
- import org.apache.iotdb.common.rpc.thrift.TAggregationType;
- import org.apache.iotdb.isession.SessionDataSet;
- import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
- import org.apache.iotdb.tsfile.write.record.Tablet;
- import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
- import java.util.List;
- import java.util.Map;
- public interface IotDbService {
- void insertTablet(Tablet tablet);
- void insertTablets(Map<String, Tablet> tablets);
- void insertStringRecord(String deviceId, long time, List<String> measurements, List<String> values);
- void insertRecord(String deviceId, long time, List<String> measurements, List<TSDataType> types, List<Object> values);
- void insertStringRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList);
- void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList);
- void insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList);
- void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList);
- void deleteData(String path, long endTime);
- void deleteData(List<String> paths, long endTime);
- SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime, long timeOut);
- <T> List<T> executeRawDataQuery(List<String> paths, long startTime, long endTime, long timeOut, Class<? extends IotDbRecordAble> clazz);
- SessionDataSet executeLastDataQuery(List<String> paths, long lastTime);
- <T> List<T> executeLastDataQuery(List<String> paths, long lastTime, Class<? extends IotDbRecordAble> clazz);
- SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes);
- <T> List<T> executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes, Class<? extends IotDbRecordAble> clazz);
- SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations);
- SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime);
- SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval);
- SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval, long slidingStep);
- SessionDataSet executeQueryStatement(String sql);
- void executeNonQueryStatement(String sql);
- List<Map<String, Object>> packagingMapData(SessionDataSet sessionDataSet, List<String> columnNames);
- /**
- *(不支持聚合查询)
- **/
- <T> List<T> packagingObjectData(SessionDataSet sessionDataSet, List<String> titleList, Class<? extends IotDbRecordAble> clazz);
- List<MeasurementSchema> buildMeasurementSchemas(Object object);
- MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object object);
- }
复制代码 1.5、IotDbServiceImpl
- package com.example.service.impl;
- import com.alibaba.fastjson2.JSON;
- import com.example.config.IotDbSessionConfig;
- import com.example.dto.IotDbRecordAble;
- import com.example.dto.MeasurementSchemaValuesDTO;
- import com.example.service.IotDbService;
- import lombok.SneakyThrows;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.iotdb.common.rpc.thrift.TAggregationType;
- import org.apache.iotdb.isession.SessionDataSet;
- import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
- import org.apache.iotdb.session.pool.SessionPool;
- import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
- import org.apache.iotdb.tsfile.read.common.Field;
- import org.apache.iotdb.tsfile.read.common.RowRecord;
- import org.apache.iotdb.tsfile.write.record.Tablet;
- import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import java.lang.reflect.Type;
- import java.util.*;
- import java.util.stream.Collectors;
- @Service
- @Slf4j
- public class IotDbServiceImpl implements IotDbService {
- @Autowired
- private IotDbSessionConfig iotDBSessionConfig;
- /**
- * 单设备批量插入数据
- *
- * @param tablet
- */
- @Override
- public void insertTablet(Tablet tablet) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- try {
- log.info("iotdb数据入库:tablet:[{}]", tablet);
- sessionPool.insertTablet(tablet);
- } catch (Exception e) {
- log.error("IotDBSession insertTablet失败: tablet={}, error={}", tablet, e.getMessage());
- }
- }
- /**
- * 多设备批量插入数据
- *
- * @param tablets
- */
- @Override
- public void insertTablets(Map<String, Tablet> tablets) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- try {
- log.info("iotdb数据入库:tablets:[{}]", tablets);
- sessionPool.insertTablets(tablets);
- } catch (Exception e) {
- log.error("IotDBSession insertTablets失败: tablets={}, error={}", tablets, e.getMessage());
- }
- }
- /**
- * 单条数据插入(string类型数据项)
- *
- * @param deviceId 设备名(表名)root.ln.wf01.wt01
- * @param time 时间戳
- * @param measurements 数据项列表
- * @param values 数据项对应值列表
- */
- @Override
- public void insertStringRecord(String deviceId, long time, List<String> measurements, List<String> values) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- try {
- log.info("iotdb数据入库:device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values);
- sessionPool.insertRecord(deviceId, time, measurements, values);
- } catch (Exception e) {
- log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}",
- deviceId, time, measurements, values, e.getMessage());
- }
- }
- /**
- * 单条数据插入(不同类型数据项)
- *
- * @param deviceId 设备名(表名)root.ln.wf01.wt01
- * @param time 时间戳
- * @param measurements 数据项列表
- * @param types 数据项对应类型列表
- * @param values 数据项对应值列表
- */
- @Override
- public void insertRecord(String deviceId, long time, List<String> measurements, List<TSDataType> types, List<Object> values) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- try {
- log.info("iotdb数据入库:device_id:[{}], measurements:[{}], types:[{}], values:[{}]", deviceId, measurements, types, values);
- sessionPool.insertRecord(deviceId, time, measurements, types, values);
- } catch (Exception e) {
- log.error("IotDBSession insertRecordHasTypes失败: deviceId={}, time={}, measurements={},types={}, values={}, error={}",
- deviceId, time, measurements, types, values, e.getMessage());
- }
- }
- /**
- * 多个设备多条数据插入(string类型数据项)
- *
- * @param deviceIds 多个设备名(表名)root.ln.wf01.wt01
- * @param times 时间戳的列表
- * @param measurementsList 数据项列表的列表
- * @param valuesList 数据项对应值列表的列表
- */
- @Override
- public void insertStringRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- try {
- log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], valuesList:[{}]", deviceIds, measurementsList, valuesList);
- sessionPool.insertRecords(deviceIds, times, measurementsList, valuesList);
- } catch (Exception e) {
- log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, valuesList={}, error={}",
- deviceIds, times, measurementsList, valuesList, e.getMessage());
- }
- }
- /**
- * 多个设备多条数据插入(不同类型数据项)
- *
- * @param deviceIds 多个设备名(表名))root.ln.wf01.wt01
- * @param times 时间戳的列表
- * @param measurementsList 数据项列表的列表
- * @param typesList 数据项对应类型列表的列表
- * @param valuesList 数据项对应值列表的列表
- */
- @Override
- public void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- try {
- log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]", deviceIds, measurementsList, typesList, valuesList);
- sessionPool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList);
- } catch (Exception e) {
- log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, typesList=[],valuesList={}, error={}",
- deviceIds, times, measurementsList, typesList, valuesList, e.getMessage());
- }
- }
- /**
- * 单个设备多条数据插入(string类型数据项)
- *
- * @param deviceId 单个设备名(表名))root.ln.wf01.wt01
- * @param times 时间戳的列表
- * @param measurementsList 数据项列表的列表
- * @param valuesList 数据项对应值列表的列表
- */
- @Override
- public void insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- try {
- log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], valuesList:[{}]", deviceId, measurementsList, valuesList);
- sessionPool.insertStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList);
- } catch (Exception e) {
- log.error("IotDBSession insertStringRecordsOfOneDevice失败: deviceId={}, times={}, measurementsList={}, valuesList={}, error={}",
- deviceId, times, measurementsList, valuesList, e.getMessage());
- }
- }
- /**
- * 单个设备多条数据插入(不同类型数据项)
- *
- * @param deviceId 单个设备名(表名))root.ln.wf01.wt01
- * @param times 时间戳的列表
- * @param measurementsList 数据项列表的列表
- * @param typesList 数据项对应类型列表的列表
- * @param valuesList 数据项对应值列表的列表
- */
- @Override
- public void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- try {
- log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]", deviceId, measurementsList, typesList, valuesList);
- sessionPool.insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList);
- } catch (Exception e) {
- log.error("IotDBSession insertRecordsOfOneDevice失败: deviceId={}, times={}, measurementsList={}, typesList=[],valuesList={}, error={}", deviceId, times, measurementsList, typesList, valuesList, e.getMessage());
- }
- }
- /**
- * 删除数据(删除一个时间序列在某个时间点前或这个时间点的数据)
- *
- * @param path 单个字段 root.ln.wf01.wt01.temperature
- * @param endTime 删除时间点
- */
- @Override
- public void deleteData(String path, long endTime) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- try {
- log.info("iotdb数据删除:path:[{}], endTime:[{}]", path, endTime);
- sessionPool.deleteData(path, endTime);
- } catch (Exception e) {
- log.error("IotDBSession deleteData失败: deviceId={}, times={},error={}", path, endTime, e.getMessage());
- }
- }
- /**
- * 删除数据(删除多个时间序列在某个时间点前或这个时间点的数据)
- *
- * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
- * @param endTime 删除时间点
- */
- @Override
- public void deleteData(List<String> paths, long endTime) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- try {
- log.info("iotdb数据删除:paths:[{}], endTime:[{}]", paths, endTime);
- sessionPool.deleteData(paths, endTime);
- } catch (Exception e) {
- log.error("IotDBSession deleteData失败: paths={}, times={},error={}", paths, endTime, e.getMessage());
- }
- }
- /**
- * 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间)
- *
- * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
- * @param startTime 开始时间
- * @param endTime 结束时间
- * @param outTime 超时时间
- * @return SessionDataSet (Time,paths)
- */
- @Override
- public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- SessionDataSetWrapper sessionDataSetWrapper = null;
- try {
- log.info("iotdb数据查询:paths:[{}], startTime:[{}], endTime:[{}],outTime:[{}]", paths, startTime, endTime, outTime);
- sessionDataSetWrapper = sessionPool.executeRawDataQuery(paths, startTime, endTime, outTime);
- return sessionDataSetWrapper.getSessionDataSet();
- } catch (Exception e) {
- log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime:[{}], endTime:[{}],outTime:[{}],error={}", paths, startTime, endTime, outTime, e.getMessage());
- }
- return null;
- }
- /**
- * 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间)
- *
- * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
- * @param startTime 开始时间
- * @param endTime 结束时间
- * @param outTime 超时时间
- * @param clazz 返回数据对应的对象(对象属性必须与字段名对应)
- * @param <T>
- * @return
- */
- @Override
- public <T> List<T> executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime, Class<? extends IotDbRecordAble> clazz) {
- SessionDataSet sessionDataSet = executeRawDataQuery(paths, startTime, endTime, outTime);
- List<String> columnNames = sessionDataSet.getColumnNames();
- List<T> resultEntities = null;
- try {
- resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
- } catch (Exception e) {
- log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime:[{}], endTime:[{}],outTime:[{}],error={}", paths, startTime, endTime, outTime, e.getMessage());
- }
- return resultEntities;
- }
- /**
- * 最新点查询(查询最后一条时间戳大于等于某个时间点的数据)
- *
- * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
- * @param lastTime 结束时间
- * @return SessionDataSet
- */
- @Override
- public SessionDataSet executeLastDataQuery(List<String> paths, long lastTime) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- SessionDataSetWrapper sessionDataSetWrapper = null;
- try {
- log.info("iotdb数据查询:paths:[{}], lastTime:[{}]", paths, lastTime);
- sessionDataSetWrapper = sessionPool.executeLastDataQuery(paths, lastTime);
- return sessionDataSetWrapper.getSessionDataSet();
- } catch (Exception e) {
- log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime:[{}], error={}", paths, lastTime, e.getMessage());
- }
- return null;
- }
- /**
- * 最新点查询(查询最后一条时间戳大于等于某个时间点的数据)
- *
- * @param <T>
- * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
- * @param lastTime 结束时间
- * @param clazz 返回数据对应的对象(对象属性必须与字段名对应)
- * @return
- */
- @Override
- public <T> List<T> executeLastDataQuery(List<String> paths, long lastTime, Class<? extends IotDbRecordAble> clazz) {
- SessionDataSet sessionDataSet = executeLastDataQuery(paths, lastTime);
- List<String> columnNames = sessionDataSet.getColumnNames();
- List<T> resultEntities = null;
- try {
- resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
- } catch (Exception e) {
- log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime:[{}], error={}", paths, lastTime, e.getMessage());
- }
- return resultEntities;
- }
- /**
- * 最新点查询(快速查询单设备下指定序列最新点)
- *
- * @param db root.ln.wf01
- * @param device root.ln.wf01.wt01
- * @param sensors temperature,status(字段名)
- * @param isLegalPathNodes true(避免路径校验)
- * @return SessionDataSet
- */
- @Override
- public SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- SessionDataSetWrapper sessionDataSetWrapper = null;
- try {
- log.info("iotdb数据查询:db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}]", db, device, sensors, isLegalPathNodes);
- sessionDataSetWrapper = sessionPool.executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes);
- return sessionDataSetWrapper.getSessionDataSet();
- } catch (Exception e) {
- log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage());
- }
- return null;
- }
- /**
- * @param db root.ln.wf01
- * @param device root.ln.wf01.wt01
- * @param sensors temperature,status(字段名)
- * @param isLegalPathNodes true(避免路径校验)
- * @param clazz 返回数据对应的对象(对象属性必须与字段名对应)
- * @param <T>
- * @return
- */
- @Override
- public <T> List<T> executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes, Class<? extends IotDbRecordAble> clazz) {
- SessionDataSet sessionDataSet = executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes);
- List<String> columnNames = sessionDataSet.getColumnNames();
- List<T> resultEntities = null;
- try {
- resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
- } catch (Exception e) {
- log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage());
- }
- return resultEntities;
- }
- /**
- * 聚合查询
- *
- * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
- * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
- * @return SessionDataSet
- */
- @Override
- public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- SessionDataSetWrapper sessionDataSetWrapper = null;
- try {
- log.info("iotdb聚合查询:paths:[{}], aggregations:[{}]", paths, aggregations);
- sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations);
- return sessionDataSetWrapper.getSessionDataSet();
- } catch (Exception e) {
- log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,error={}", paths, aggregations, e.getMessage());
- }
- return null;
- }
- /**
- * 聚合查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间)
- *
- * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
- * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
- * @param startTime 开始时间(包含)
- * @param endTime 结束时间
- * @return SessionDataSet
- */
- @Override
- public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- SessionDataSetWrapper sessionDataSetWrapper = null;
- try {
- log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}]", paths, aggregations, startTime, endTime);
- sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime);
- return sessionDataSetWrapper.getSessionDataSet();
- } catch (Exception e) {
- log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,startTime:[{}], endTime:[{}],error={}", paths, aggregations, startTime, endTime, e.getMessage());
- }
- return null;
- }
- /**
- * 聚合查询(支持按照时间区间分段查询)
- *
- * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
- * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
- * @param startTime 开始时间(包含)
- * @param endTime 结束时间
- * @param interval
- * @return SessionDataSet
- */
- @Override
- public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- SessionDataSetWrapper sessionDataSetWrapper = null;
- try {
- log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}]", paths, aggregations, startTime, endTime, interval);
- sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime, interval);
- return sessionDataSetWrapper.getSessionDataSet();
- } catch (Exception e) {
- log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,startTime:[{}], endTime:[{}], interval:[{}], error={}", paths, aggregations, startTime, endTime, interval, e.getMessage());
- }
- return null;
- }
- /**
- * 聚合查询(支持按照时间区间分段查询)
- *
- * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
- * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
- * @param startTime 开始时间(包含)
- * @param endTime 结束时间
- * @param interval
- * @param slidingStep
- * @return SessionDataSet
- */
- @Override
- public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval, long slidingStep) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- SessionDataSetWrapper sessionDataSetWrapper = null;
- try {
- log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}], slidingStep:[{}]", paths, aggregations, startTime, endTime, interval, slidingStep);
- sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime, interval, slidingStep);
- return sessionDataSetWrapper.getSessionDataSet();
- } catch (Exception e) {
- log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,startTime:[{}], endTime:[{}], interval:[{}], slidingStep:[{}] ,error={}", paths, aggregations, startTime, endTime, interval, slidingStep, e.getMessage());
- }
- return null;
- }
- /**
- * SQL查询
- *
- * @param sql
- * @return
- */
- @Override
- public SessionDataSet executeQueryStatement(String sql) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- SessionDataSetWrapper sessionDataSetWrapper = null;
- try {
- log.info("iotdb SQL查询:sql:[{}]", sql);
- sessionDataSetWrapper = sessionPool.executeQueryStatement(sql);
- return sessionDataSetWrapper.getSessionDataSet();
- } catch (Exception e) {
- log.error("IotDBSession executeQueryStatement失败:sql:[{}],error={}", sql, e.getMessage());
- }
- return null;
- }
- /**
- * SQL非查询
- *
- * @param sql
- */
- @Override
- public void executeNonQueryStatement(String sql) {
- SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
- try {
- log.info("iotdb SQL无查询:sql:[{}]", sql);
- sessionPool.executeNonQueryStatement(sql);
- } catch (Exception e) {
- log.error("IotDBSession executeNonQueryStatement失败:sql:[{}],error={}", sql, e.getMessage());
- }
- }
- /**
- * 封装处理数据
- *
- * @param sessionDataSet
- * @param titleList
- */
- @SneakyThrows
- @Override
- public List<Map<String, Object>> packagingMapData(SessionDataSet sessionDataSet, List<String> titleList) {
- int fetchSize = sessionDataSet.getFetchSize();
- List<Map<String, Object>> resultList = new ArrayList<>();
- titleList.remove("Time");
- if (fetchSize > 0) {
- while (sessionDataSet.hasNext()) {
- Map<String, Object> resultMap = new HashMap<>();
- RowRecord next = sessionDataSet.next();
- List<Field> fields = next.getFields();
- String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
- resultMap.put("time", timeString);
- for (int i = 0; i < fields.size(); i++) {
- Field field = fields.get(i);
- if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) {
- resultMap.put(splitString(titleList.get(i)), null);
- } else {
- resultMap.put(splitString(titleList.get(i)), field.getObjectValue(field.getDataType()).toString());
- }
- }
- resultList.add(resultMap);
- }
- }
- return resultList;
- }
- /**
- * 封装处理数据(不支持聚合查询)
- *
- * @param sessionDataSet 查询返回的结果集
- * @param titleList 查询返回的结果集内的字段名
- * @param clazz 返回数据对应的对象(对象属性必须与字段名对应)
- * @param <T>
- * @return
- */
- @SneakyThrows
- @Override
- public <T> List<T> packagingObjectData(SessionDataSet sessionDataSet, List<String> titleList, Class<? extends IotDbRecordAble> clazz) {
- int fetchSize = sessionDataSet.getFetchSize();
- List<T> resultList = new ArrayList<>();
- titleList.remove("Time");
- if (fetchSize > 0) {
- while (sessionDataSet.hasNext()) {
- Map<String, Object> resultMap = new HashMap<>();
- RowRecord next = sessionDataSet.next();
- List<Field> fields = next.getFields();
- String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
- resultMap.put("time", timeString);
- if (titleList.stream().anyMatch(str -> str.contains("."))) {
- for (int i = 0; i < fields.size(); i++) {
- Field field = fields.get(i);
- String title = titleList.get(i);
- if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) {
- resultMap.put(splitString(title), null);
- } else {
- resultMap.put(splitString(title), field.getObjectValue(field.getDataType()).toString());
- }
- }
- } else {
- Field fieldName = fields.get(0);
- Field fieldValue = fields.get(1);
- Field fieldDataType = fields.get(2);
- if (fieldName.getDataType() != null && fieldName.getObjectValue(fieldName.getDataType()) != null) {
- String mapKey = fieldName.getObjectValue(fieldName.getDataType()).toString();
- Object mapValue = convertStringToType(fieldValue.getObjectValue(fieldValue.getDataType()).toString(), fieldDataType.getObjectValue(fieldDataType.getDataType()).toString());
- resultMap.put(splitString(mapKey), mapValue);
- }
- }
- String jsonString = JSON.toJSONString(resultMap);
- resultList.add(JSON.parseObject(jsonString, (Type) clazz));
- }
- }
- return resultList;
- }
- /**
- * 分割获取字段名
- *
- * @param str
- * @return 字段名
- */
- public static String splitString(String str) {
- String[] parts = str.split("\\.");
- if (parts.length <= 0) {
- return str;
- } else {
- return parts[parts.length - 1];
- }
- }
- /**
- * 根据数据值和数据类型返回对应数据类型数据
- *
- * @param value 数据值
- * @param typeName 数据类型
- * @return 转换后的数据值
- */
- public static Object convertStringToType(String value, String typeName) {
- String type = typeName.toLowerCase();
- if (type.isEmpty()) {
- return value;
- }
- if ("boolean".equals(type)) {
- return Boolean.parseBoolean(value);
- } else if ("double".equals(type)) {
- return Double.parseDouble(value);
- } else if ("int32".equals(type)) {
- return Integer.parseInt(value);
- } else if ("int64".equals(type)) {
- return Long.parseLong(value);
- } else if ("float".equals(type)) {
- return Float.parseFloat(value);
- } else if ("text".equals(type)) {
- return value;
- } else {
- return value;
- }
- }
- /**
- * 根据对象属性的数据类型返回对应的TSDataType
- *
- * @param type 属性的数据类型
- * @return TSDataType
- */
- public static TSDataType getTsDataTypeByString(String type) {
- String typeName = splitString(type).toLowerCase();
- if ("boolean".equals(typeName)) {
- return TSDataType.BOOLEAN;
- } else if ("double".equals(typeName)) {
- return TSDataType.DOUBLE;
- } else if ("int".equals(typeName) || "integer".equals(typeName)) {
- return TSDataType.INT32;
- } else if ("long".equals(typeName)) {
- return TSDataType.INT64;
- } else if ("float".equals(typeName)) {
- return TSDataType.FLOAT;
- } else if ("text".equals(typeName)) {
- return TSDataType.TEXT;
- } else if ("string".equals(typeName)) {
- return TSDataType.TEXT;
- } else {
- return TSDataType.UNKNOWN;
- }
- }
- /**
- * 根据对象构建MeasurementSchemas
- *
- * @param obj 对象
- * @return
- */
- @Override
- public List<MeasurementSchema> buildMeasurementSchemas(Object obj) {
- java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields();
- List<MeasurementSchema> schemaList = Arrays.stream(fields).map(field -> new MeasurementSchema(field.getName(), getTsDataTypeByString(field.getType().getName()))).collect(Collectors.toList());
- return schemaList;
- }
- /**
- * 根据对象构建MeasurementSchemaValuesDTO
- *
- * @param obj 对象
- * @return
- */
- @SneakyThrows
- @Override
- public MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object obj) {
- MeasurementSchemaValuesDTO measurementSchemaValuesDTO = new MeasurementSchemaValuesDTO();
- java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields();
- List<MeasurementSchema> schemaList = new ArrayList<>();
- List<Object> values = new ArrayList<>();
- List<Integer> valuesIsNullIndex = new ArrayList<>();
- int valueIndex = 0;
- for (java.lang.reflect.Field field : fields) {
- MeasurementSchema measurementSchema = new MeasurementSchema(field.getName(), getTsDataTypeByString(field.getType().getName()));
- schemaList.add(measurementSchema);
- Object value = field.get(obj);
- if (value == null) {
- valuesIsNullIndex.add(valueIndex);
- }
- values.add(value);
- valueIndex++;
- }
- measurementSchemaValuesDTO.setSchemaList(schemaList);
- measurementSchemaValuesDTO.setValues(values);
- return measurementSchemaValuesDTO;
- }
- }
复制代码 1.6、IotDbTest
- package com.example;
- import com.example.dto.InsertDataDTO;
- import com.example.dto.MeasurementSchemaValuesDTO;
- import com.example.dto.ResultEntity;
- import com.example.dto.TestDataType;
- import com.example.service.IotDbService;
- import lombok.SneakyThrows;
- import org.apache.iotdb.common.rpc.thrift.TAggregationType;
- import org.apache.iotdb.isession.SessionDataSet;
- import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
- import org.apache.iotdb.tsfile.write.record.Tablet;
- import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringRunner;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.Random;
- import java.util.stream.Collectors;
- import java.util.stream.IntStream;
- @SpringBootTest
- @RunWith(SpringRunner.class)
- public class IotDbTest {
- @Autowired
- private IotDbService iotDbService;
- private static final String ROOT_LN_WF01 = "root.ln.wf01";
- private static final String ROOT_LN_WF01_WT01 = "root.ln.wf01.wt01";
- private static final String ROOT_LN_WF01_WT01_STATUS = "root.ln.wf01.wt01.status";
- private static final String ROOT_LN_WF01_WT01_HARDWARE = "root.ln.wf01.wt01.hardware";
- private static final String ROOT_LN_WF01_WT01_TEMPERATURE = "root.ln.wf01.wt01.temperature";
- private static final String SELECT_ROOT_LN_WF01_WT01 = "select * from root.ln.wf01.wt01";
- private static Random random = new Random();
- /**
- * 测试单个设备批量插入数据
- */
- @Test
- public void testInsertTablet() {
- List<InsertDataDTO> insertDataDTOS = new InsertDataDTO().buildList();
- List<MeasurementSchema> schemaList = iotDbService.buildMeasurementSchemas(insertDataDTOS.get(0));
- String deviceId = ROOT_LN_WF01_WT01;
- int maxRowNumber = insertDataDTOS.size();
- Tablet tablet = new Tablet(deviceId, schemaList, maxRowNumber);
- long timestamp = System.currentTimeMillis();
- for (int row = 0; row < maxRowNumber; row++) {
- int rowIndex = tablet.rowSize++;
- tablet.addTimestamp(rowIndex, timestamp);
- MeasurementSchemaValuesDTO measurementSchemaValuesDTO = iotDbService.buildMeasurementSchemasAndValues(insertDataDTOS.get(row));
- List<Object> iotValues = measurementSchemaValuesDTO.getValues();
- List<MeasurementSchema> schemaList2 = measurementSchemaValuesDTO.getSchemaList();
- List<String> iotMeasurements = schemaList2.stream().map(MeasurementSchema::getMeasurementId).collect(Collectors.toList());
- for (int i = 0; i < iotMeasurements.size(); i++) {
- tablet.addValue(iotMeasurements.get(i), rowIndex, iotValues.get(i));
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- iotDbService.insertTablet(tablet);
- tablet.reset();
- }
- timestamp++;
- }
- }
- /**
- * 测试单个设备批量插入数据(可插入空值)
- */
- @Test
- public void testInsertTabletWithNullValues() {
- List<InsertDataDTO> insertDataDTOS = new InsertDataDTO().buildList();
- List<MeasurementSchema> schemaList = iotDbService.buildMeasurementSchemas(insertDataDTOS.get(0));
- String deviceId = ROOT_LN_WF01_WT01;
- int maxRowNumber = insertDataDTOS.size();
- Tablet tablet = new Tablet(deviceId, schemaList, maxRowNumber);
- long timestamp = System.currentTimeMillis();
- tablet.initBitMaps();
- for (int row = 0; row < maxRowNumber; row++) {
- int rowIndex = tablet.rowSize++;
- tablet.addTimestamp(rowIndex, timestamp);
- MeasurementSchemaValuesDTO measurementSchemaValuesDTO = iotDbService.buildMeasurementSchemasAndValues(insertDataDTOS.get(row));
- List<Object> iotValues = measurementSchemaValuesDTO.getValues();
- List<MeasurementSchema> schemaList2 = measurementSchemaValuesDTO.getSchemaList();
- List<String> iotMeasurements = schemaList2.stream().map(MeasurementSchema::getMeasurementId).collect(Collectors.toList());
- List<Integer> valueIsNullIndex = measurementSchemaValuesDTO.getValueIsNullIndex();
- IntStream.range(0, iotMeasurements.size()).forEach(i -> tablet.addValue(iotMeasurements.get(i), rowIndex, iotValues.get(i)));
- if (valueIsNullIndex != null && !valueIsNullIndex.isEmpty()) {
- for (Integer isNullIndex : valueIsNullIndex) {
- tablet.bitMaps[isNullIndex].mark(row);
- }
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- iotDbService.insertTablet(tablet);
- tablet.reset();
- }
- timestamp++;
- }
- }
- /**
- * 测试单条数据插入
- */
- @Test
- public void testInsertRecord() {
- String deviceId = ROOT_LN_WF01_WT01;
- long currentTime = System.currentTimeMillis();
- InsertDataDTO insertDataDto = new InsertDataDTO().buildOne();
- MeasurementSchemaValuesDTO measurementSchemaValuesDTO = iotDbService.buildMeasurementSchemasAndValues(insertDataDto);
- List<Object> iotValues = measurementSchemaValuesDTO.getValues();
- List<MeasurementSchema> schemaList = measurementSchemaValuesDTO.getSchemaList();
- List<String> iotMeasurements = schemaList.stream().map(MeasurementSchema::getMeasurementId).collect(Collectors.toList());
- List<TSDataType> tsDataTypeList = schemaList.stream().map(MeasurementSchema::getType).collect(Collectors.toList());
- iotDbService.insertRecord(deviceId, currentTime, iotMeasurements, tsDataTypeList, iotValues);
- }
- /**
- * 测试单个设备多条数据插入
- */
- @Test
- public void testInsertRecordsOfOneDevice() {
- String deviceId = ROOT_LN_WF01_WT01;
- List<InsertDataDTO> insertDataDTOS = new InsertDataDTO().buildList();
- int recordNum = insertDataDTOS.size();
- List<Long> timeList = new ArrayList<>();
- List<List<String>> iotMeasurementsList = new ArrayList<>();
- List<List<TSDataType>> tsDataTypesList = new ArrayList<>();
- List<List<Object>> valuesList = new ArrayList<>();
- for (int i = 0; i < recordNum; i++) {
- long currentTime = System.currentTimeMillis();
- timeList.add(currentTime + i);
- MeasurementSchemaValuesDTO measurementSchemaValuesDTO = iotDbService.buildMeasurementSchemasAndValues(insertDataDTOS.get(i));
- List<Object> iotValues = measurementSchemaValuesDTO.getValues();
- List<MeasurementSchema> schemaList = measurementSchemaValuesDTO.getSchemaList();
- List<String> iotMeasurements = schemaList.stream().map(MeasurementSchema::getMeasurementId).collect(Collectors.toList());
- List<TSDataType> tsDataTypeList = schemaList.stream().map(MeasurementSchema::getType).collect(Collectors.toList());
- iotMeasurementsList.add(iotMeasurements);
- tsDataTypesList.add(tsDataTypeList);
- valuesList.add(iotValues);
- }
- iotDbService.insertRecordsOfOneDevice(deviceId, timeList, iotMeasurementsList, tsDataTypesList, valuesList);
- }
- /**
- * 测试时间查询
- */
- @SneakyThrows
- @Test
- public void testExecuteRawDataQuery() {
- List<String> paths = new ArrayList<>();
- paths.add(ROOT_LN_WF01_WT01_TEMPERATURE);
- paths.add(ROOT_LN_WF01_WT01_HARDWARE);
- long startTime = 1718959439344L;
- long endTime = System.currentTimeMillis();
- long outTime = 60000;
- SessionDataSet sessionDataSet = iotDbService.executeRawDataQuery(paths, startTime, endTime, outTime);
- System.out.println(sessionDataSet.getColumnNames());
- sessionDataSet.setFetchSize(1024); // default is 10000
- while (sessionDataSet.hasNext()) {
- System.out.println(sessionDataSet.next());
- }
- List<ResultEntity> resultEntities = iotDbService.executeRawDataQuery(paths, startTime, endTime, outTime, ResultEntity.class);
- System.out.println(resultEntities);
- System.out.println(resultEntities.size());
- }
- /**
- * 测试单个设备最新时间点查询
- */
- @SneakyThrows
- @Test
- public void testExecuteLastDataQueryForOneDevice() {
- String db = ROOT_LN_WF01;
- String device = ROOT_LN_WF01_WT01;
- List<String> sensors = new ArrayList<>();
- sensors.add("temperature");
- sensors.add("hardware");
- sensors.add("status");
- SessionDataSet sessionDataSet = iotDbService.executeLastDataQueryForOneDevice(db, device, sensors, true);
- System.out.println(sessionDataSet.getColumnNames());
- sessionDataSet.setFetchSize(1024); // default is 10000
- while (sessionDataSet.hasNext()) {
- System.out.println(sessionDataSet.next());
- }
- List<ResultEntity> resultEntities = iotDbService.executeLastDataQueryForOneDevice(db, device, sensors, true, ResultEntity.class);
- System.out.println(resultEntities);
- System.out.println(resultEntities.size());
- }
- /**
- * 测试最新点查询
- */
- @SneakyThrows
- @Test
- public void testExecuteLastDataQuery() {
- List<String> paths = new ArrayList<>();
- paths.add(ROOT_LN_WF01_WT01_TEMPERATURE);
- paths.add(ROOT_LN_WF01_WT01_HARDWARE);
- long lastTime = 1718959439344L;
- long endTime = System.currentTimeMillis();
- SessionDataSet sessionDataSet = iotDbService.executeLastDataQuery(paths, lastTime);
- System.out.println(sessionDataSet.getColumnNames());
- sessionDataSet.setFetchSize(1024); // default is 10000
- while (sessionDataSet.hasNext()) {
- System.out.println(sessionDataSet.next());
- }
- List<ResultEntity> resultEntities = iotDbService.executeLastDataQuery(paths, lastTime, ResultEntity.class);
- System.out.println(resultEntities);
- System.out.println(resultEntities.size());
- }
- /**
- * 测试聚合查询
- */
- @SneakyThrows
- @Test
- public void testExecuteAggregationQuery() {
- List<String> paths = new ArrayList<>();
- paths.add(ROOT_LN_WF01_WT01_TEMPERATURE);
- List<TAggregationType> aggregations = new ArrayList<>();
- aggregations.add(TAggregationType.SUM);
- SessionDataSet sessionDataSet = iotDbService.executeAggregationQuery(paths, aggregations);
- System.out.println(sessionDataSet.getColumnNames());
- sessionDataSet.setFetchSize(1024); // default is 10000
- while (sessionDataSet.hasNext()) {
- System.out.println(sessionDataSet.next());
- }
- }
- @SneakyThrows
- @Test
- public void testQueryByIterator() {
- String sql = SELECT_ROOT_LN_WF01_WT01;
- SessionDataSet sessionDataSet = iotDbService.executeQueryStatement(sql);
- SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
- System.out.println(sessionDataSet.getColumnNames());
- sessionDataSet.setFetchSize(1024); // default is 10000
- while (iterator.next()) {
- float aFloat = iterator.getFloat(ROOT_LN_WF01_WT01_TEMPERATURE);
- String aString = iterator.getString(ROOT_LN_WF01_WT01_HARDWARE);
- boolean aBoolean = iterator.getBoolean(ROOT_LN_WF01_WT01_STATUS);
- System.out.println(aFloat);
- System.out.println(aString);
- System.out.println(aBoolean);
- }
- }
- /**
- * 测试删除数据
- */
- @Test
- public void testDeleteData() {
- List<String> paths = new ArrayList<>();
- paths.add(ROOT_LN_WF01_WT01_TEMPERATURE);
- paths.add(ROOT_LN_WF01_WT01_STATUS);
- paths.add(ROOT_LN_WF01_WT01_HARDWARE);
- long endTime = 1719212858916L;
- iotDbService.deleteData(paths, endTime);
- }
- /**
- * 测试SQL查询
- */
- @SneakyThrows
- @Test
- public void testExecuteQueryStatement() {
- String sql = SELECT_ROOT_LN_WF01_WT01;
- SessionDataSet sessionDataSet = iotDbService.executeQueryStatement(sql);
- System.out.println(sessionDataSet.getColumnNames());
- sessionDataSet.setFetchSize(1024); // default is 10000
- while (sessionDataSet.hasNext()) {
- System.out.println(sessionDataSet.next() );
- }
- }
- /**
- * 测试SQL执行(不包含查询)
- */
- @Test
- public void testExecuteNonQueryStatement() {
- String sql = "insert into root.ln.wf01.wt01(temperature, hardware, status) values (2.1, 'v2', false)";
- iotDbService.executeNonQueryStatement(sql);
- }
- /**
- * 测试封装查询结果
- */
- @Test
- public void testPackagingMapData() {
- String sql = SELECT_ROOT_LN_WF01_WT01;
- SessionDataSet sessionDataSet = iotDbService.executeQueryStatement(sql);
- List<String> columnNames = sessionDataSet.getColumnNames();
- List<Map<String, Object>> rtList = iotDbService.packagingMapData(sessionDataSet, columnNames);
- System.out.println(rtList);
- }
- /**
- * 测试封装查询结果
- */
- @Test
- public void testPackagingObjectData() {
- String sql = SELECT_ROOT_LN_WF01_WT01;
- SessionDataSet sessionDataSet = iotDbService.executeQueryStatement(sql);
- List<String> columnNames = sessionDataSet.getColumnNames();
- List<ResultEntity> resultEntities = iotDbService.packagingObjectData(sessionDataSet, columnNames, ResultEntity.class);
- System.out.println(resultEntities);
- }
- /**
- * 测试构建MeasurementSchemasAndValues
- */
- @Test
- public void testBuildMeasurementSchema() {
- TestDataType testDataType = new TestDataType();
- testDataType.setTestDouble(12D);
- testDataType.setTestLong(12L);
- testDataType.setHardware("ss");
- testDataType.setStatus(true);
- testDataType.setTemperature(12.0F);
- List<MeasurementSchema> schemaList = iotDbService.buildMeasurementSchemas(testDataType);
- System.out.println(schemaList);
- List<String> iotMeasurements = schemaList.stream().map(MeasurementSchema::getMeasurementId).collect(Collectors.toList());
- System.out.println(iotMeasurements);
- List<TSDataType> tsDataTypeList = schemaList.stream().map(MeasurementSchema::getType).collect(Collectors.toList());
- System.out.println(tsDataTypeList);
- MeasurementSchemaValuesDTO measurementSchemaValuesDTO = iotDbService.buildMeasurementSchemasAndValues(testDataType);
- System.out.println(measurementSchemaValuesDTO.getSchemaList());
- System.out.println(measurementSchemaValuesDTO.getValues());
- }
- }
复制代码 1.7、Entity & DTO
- package com.example.dto;
- import lombok.Data;
- import java.util.ArrayList;
- import java.util.List;
- @Data
- public class InsertDataDTO {
- public Float temperature;
- public String hardware;
- public Boolean status;
- public InsertDataDTO buildOne() {
- InsertDataDTO insertDataDTO = new InsertDataDTO();
- insertDataDTO.setHardware("ss");
- insertDataDTO.setStatus(true);
- insertDataDTO.setTemperature(12.0F);
- return insertDataDTO;
- }
- public List<InsertDataDTO> buildList() {
- List<InsertDataDTO> insertDataDTOS = new ArrayList<>();
- int buildNum = 10;
- for (int i = 0; i < buildNum; i++) {
- InsertDataDTO insertDataDTO = new InsertDataDTO();
- insertDataDTO.setHardware(i % 2 == 0 ? "pp" + i : null);
- insertDataDTO.setStatus(i % 2 == 0);
- insertDataDTO.setTemperature(12.0F + i);
- insertDataDTOS.add(insertDataDTO);
- }
- return insertDataDTOS;
- }
- }
复制代码- package com.example.dto;
- import lombok.Data;
- @Data
- public class IotDbRecordAble {
- }
复制代码- package com.example.dto;
- import lombok.Data;
- import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
- import java.util.List;
- @Data
- public class MeasurementSchemaValuesDTO {
- List<MeasurementSchema> schemaList;
- List<Object> values;
- List<Integer> valueIsNullIndex;
- }
复制代码- package com.example.dto;
- import lombok.Data;
- @Data
- public class ResultEntity extends IotDbRecordAble {
- public Float temperature;
- public String hardware;
- public Boolean status;
- public String time;
- }
复制代码- package com.example.dto;
- import lombok.Data;
- @Data
- public class TestDataType {
- public Float temperature;
- public String hardware;
- public Boolean status;
- public Double testDouble;
- public Long testLong;
- }
复制代码 这里利用的是Java原生接口的方式来访问数据库,进行新增、查询、删除,适用于高吞吐性能场景,大批量的数据处理。下一节内容采用mybatis的方式。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |