基于STM32的边缘计算实时数据处理可视化系统:嵌入式C++、 FreeRTOS、Kafka ...

打印 上一主题 下一主题

主题 915|帖子 915|积分 2749

一、项目概述

本项目旨在设计并实现一个基于STM32的边缘计算实时数据处理系统。该系统可以或许在边缘装备端举行数据采集、预处理,并将处理后的数据实时传输到后端服务器举行进一步分析和存储。
本项目主要解决以下问题:

  • 减轻后端服务器的数据处理负担,提高系统整体效率
  • 降低数据传输带宽需求,淘汰通讯成本
  • 实现近实时的数据分析,提高系统相应速度
  • 加强数据隐私掩护,敏感数据可在当地处理后再传输
通过边缘计算与实时数据处理相结合,本系统可广泛应用于工业物联网、智能家居、环境监测等多个范畴,为用户提供高效、安全、实时的数据分析服务。
二、系统架构

本系统采用分层架构设计,主要包括边缘层、网络层和云端层三个部门。
2.1 硬件选型



  • 边缘装备: STM32F407VGT6 微控制器
  • 传感器: DHT22温湿度传感器、BMP280气压传感器
  • 通讯模块: ESP8266 Wi-Fi模块
2.2 软件技术栈



  • 边缘端: FreeRTOS实时操作系统、STM32 HAL库
  • 网络传输: MQTT协议
  • 后端服务: Apache Kafka消息队列、Spring Boot应用服务器
  • 数据存储: InfluxDB时序数据库
  • 数据可视化: Grafana仪表板
2.3 系统架构图

     三、环境搭建

3.1 开辟环境



  • STM32CubeIDE 1.8.0
  • STM32CubeMX 6.5.0
  • FreeRTOS 10.3.1
  • ESP8266 AT固件 v3.0.0
3.2 STM32环境配置


  • 安装STM32CubeIDE
  • 使用STM32CubeMX天生项目框架
  • 配置时钟、GPIO、UART等外设
  • 启用FreeRTOS支持
配置示例:
  1. // 时钟配置
  2. RCC_OscInitTypeDef RCC_OscInitStruct = {0};
  3. RCC_ClkInitTypeDef RCC_ClkInitStruct = {0};
  4. RCC_OscInitStruct.OscillatorType = RCC_OSCILLATORTYPE_HSE;
  5. RCC_OscInitStruct.HSEState = RCC_HSE_ON;
  6. RCC_OscInitStruct.PLL.PLLState = RCC_PLL_ON;
  7. RCC_OscInitStruct.PLL.PLLSource = RCC_PLLSOURCE_HSE;
  8. RCC_OscInitStruct.PLL.PLLM = 8;
  9. RCC_OscInitStruct.PLL.PLLN = 336;
  10. RCC_OscInitStruct.PLL.PLLP = RCC_PLLP_DIV2;
  11. RCC_OscInitStruct.PLL.PLLQ = 7;
  12. RCC_ClkInitStruct.ClockType = RCC_CLOCKTYPE_HCLK|RCC_CLOCKTYPE_SYSCLK
  13.                               |RCC_CLOCKTYPE_PCLK1|RCC_CLOCKTYPE_PCLK2;
  14. RCC_ClkInitStruct.SYSCLKSource = RCC_SYSCLKSOURCE_PLLCLK;
  15. RCC_ClkInitStruct.AHBCLKDivider = RCC_SYSCLK_DIV1;
  16. RCC_ClkInitStruct.APB1CLKDivider = RCC_HCLK_DIV4;
  17. RCC_ClkInitStruct.APB2CLKDivider = RCC_HCLK_DIV2;
  18. HAL_RCC_OscConfig(&RCC_OscInitStruct);
  19. HAL_RCC_ClockConfig(&RCC_ClkInitStruct, FLASH_LATENCY_5);
  20. // UART配置
  21. huart2.Instance = USART2;
  22. huart2.Init.BaudRate = 115200;
  23. huart2.Init.WordLength = UART_WORDLENGTH_8B;
  24. huart2.Init.StopBits = UART_STOPBITS_1;
  25. huart2.Init.Parity = UART_PARITY_NONE;
  26. huart2.Init.Mode = UART_MODE_TX_RX;
  27. huart2.Init.HwFlowCtl = UART_HWCONTROL_NONE;
  28. huart2.Init.OverSampling = UART_OVERSAMPLING_16;
  29. HAL_UART_Init(&huart2);
  30. // FreeRTOS配置
  31. osThreadDef(defaultTask, StartDefaultTask, osPriorityNormal, 0, 128);
  32. defaultTaskHandle = osThreadCreate(osThread(defaultTask), NULL);
复制代码
3.3 后端环境配置


  • 安装Java JDK 11+
  • 安装Apache Kafka 2.8.0
  • 安装InfluxDB 2.0
  • 安装Grafana 8.0
Kafka配置示例 (server.properties):
  1. broker.id=0
  2. listeners=PLAINTEXT://localhost:9092
  3. num.network.threads=3
  4. num.io.threads=8
  5. socket.send.buffer.bytes=102400
  6. socket.receive.buffer.bytes=102400
  7. socket.request.max.bytes=104857600
  8. log.dirs=/tmp/kafka-logs
  9. num.partitions=1
  10. num.recovery.threads.per.data.dir=1
  11. offsets.topic.replication.factor=1
  12. transaction.state.log.replication.factor=1
  13. transaction.state.log.min.isr=1
  14. log.retention.hours=168
  15. log.segment.bytes=1073741824
  16. log.retention.check.interval.ms=300000
复制代码
注意事项:


  • 确保所有服务的端口不辩论
  • 配置适当的安全策略,如设置强暗码和启用SSL加密
  • 根据系统负载调整Kafka和InfluxDB的性能参数
四、代码实现

4.1 STM32边缘装备数据采集与预处理

  1. #include "main.h"
  2. #include "cmsis_os.h"
  3. #include "dht22.h"
  4. #include "bmp280.h"
  5. // 定义传感器数据结构
  6. typedef struct {
  7.     float temperature;
  8.     float humidity;
  9.     float pressure;
  10. } SensorData;
  11. // 数据采集任务
  12. void DataCollectionTask(void const * argument)
  13. {
  14.     SensorData data;
  15.     for(;;)
  16.     {
  17.         // 读取DHT22温湿度数据
  18.         DHT22_Read(&data.temperature, &data.humidity);
  19.         
  20.         // 读取BMP280气压数据
  21.         data.pressure = BMP280_ReadPressure();
  22.         
  23.         // 数据预处理 (示例: 简单的移动平均滤波)
  24.         static float temp_buffer[5] = {0};
  25.         static int buffer_index = 0;
  26.         
  27.         temp_buffer[buffer_index] = data.temperature;
  28.         buffer_index = (buffer_index + 1) % 5;
  29.         
  30.         float avg_temp = 0;
  31.         for(int i = 0; i < 5; i++) {
  32.             avg_temp += temp_buffer[i];
  33.         }
  34.         data.temperature = avg_temp / 5;
  35.         
  36.         // 发送数据到MQTT发布任务
  37.         xQueueSend(mqttPublishQueue, &data, portMAX_DELAY);
  38.         
  39.         osDelay(5000); // 每5秒采集一次数据
  40.     }
  41. }
  42. // MQTT发布任务
  43. void MQTTPublishTask(void const * argument)
  44. {
  45.     SensorData data;
  46.     char mqtt_message[100];
  47.     for(;;)
  48.     {
  49.         if(xQueueReceive(mqttPublishQueue, &data, portMAX_DELAY) == pdTRUE)
  50.         {
  51.             // 格式化MQTT消息
  52.             snprintf(mqtt_message, sizeof(mqtt_message),
  53.                      "{"temp":%.2f,"hum":%.2f,"press":%.2f}",
  54.                      data.temperature, data.humidity, data.pressure);
  55.             
  56.             // 通过ESP8266发送MQTT消息
  57.             ESP8266_MQTT_Publish("sensor/data", mqtt_message);
  58.         }
  59.     }
  60. }
复制代码
4.2 Spring Boot后端服务

  1. @Service
  2. public class SensorDataService {
  3.     private final KafkaTemplate<String, String> kafkaTemplate;
  4.     private final InfluxDBClient influxDBClient;
  5.     @Autowired
  6.     public SensorDataService(KafkaTemplate<String, String> kafkaTemplate, InfluxDBClient influxDBClient) {
  7.         this.kafkaTemplate = kafkaTemplate;
  8.         this.influxDBClient = influxDBClient;
  9.     }
  10.     @KafkaListener(topics = "sensor.data", groupId = "sensor-group")
  11.     public void consumeSensorData(String message) {
  12.         try {
  13.             JSONObject jsonObject = new JSONObject(message);
  14.             double temperature = jsonObject.getDouble("temp");
  15.             double humidity = jsonObject.getDouble("hum");
  16.             double pressure = jsonObject.getDouble("press");
  17.             // 存储数据到InfluxDB
  18.             Point point = Point.measurement("sensor_data")
  19.                     .addTag("sensor_id", "stm32_01")
  20.                     .addField("temperature", temperature)
  21.                     .addField("humidity", humidity)
  22.                     .addField("pressure", pressure)
  23.                     .time(Instant.now(), WritePrecision.NS);
  24.             WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
  25.             writeApi.writePoint("sensor_bucket", "org", point);
  26.             // 进行数据分析 (示例: 简单的阈值检测)
  27.             if (temperature > 30 || humidity > 80) {
  28.                 sendAlert("High temperature or humidity detected!");
  29.             }
  30.         } catch (JSONException e) {
  31.             log.error("Error parsing sensor data: ", e);
  32.         }
  33.     }
  34.     private void sendAlert(String message) {
  35.         // 实现告警逻辑,如发送邮件或推送通知
  36.         log.warn("Alert: " + message);
  37.     }
  38. }
复制代码
4.3 代码阐明

STM32边缘装备代码


  • 数据结构:

    • SensorData结构体用于存储温度、湿度和睦压数据。

  • 数据采集任务(DataCollectionTask):

    • 定期从DHT22和BMP280传感器读取数据。
    • 对温度数据举行简朴的移动平均滤波,作为数据预处理的示例。
    • 使用FreeRTOS队列(xQueueSend)将处理后的数据发送到MQTT发布任务。

  • MQTT发布任务(MQTTPublishTask):

    • 从队列吸收传感器数据。
    • 将数据格式化为JSON字符串。
    • 使用ESP8266模块通过MQTT协议发布数据。

Spring Boot后端服务代码


  • 依赖注入:

    • 使用@Autowired注入KafkaTemplate和InfluxDBClient。

  • Kafka斲丧者:

    • @KafkaListener注解用于订阅"sensor.data"主题。
    • consumeSensorData方法处理吸收到的传感器数据。

  • 数据处理:

    • 解析JSON格式的传感器数据。
    • 创建InfluxDB的Point对象,添加丈量值、标签和字段。

  • 数据存储:

    • 使用InfluxDB的WriteApiBlocking将数据点写入数据库。

  • 简朴分析:

    • 实现了基本的阈值检测,当温度或湿度高出设定值时触发告警。

  • 错误处理:

    • 使用try-catch块捕捉JSON解析错误,并记载日志。

这些代码实现了从STM32边缘装备采集数据,通过MQTT传输,再由Spring Boot后端吸收并存储到InfluxDB的完备流程。系统设计思量了实时性、可靠性和可扩展性,为物联网应用提供了一个solid的基础架构。
4.4 Grafana数据可视化


  • 在Grafana中添加InfluxDB数据源
  • 创建新的Dashboard
  • 添加Panel,使用InfluxQL查询语言配置数据展示
示例查询:
  1. SELECT mean("temperature") AS "avg_temp",
  2.        mean("humidity") AS "avg_hum",
  3.        mean("pressure") AS "avg_press"
  4. FROM "sensor_data"
  5. WHERE $timeFilter
  6. GROUP BY time($__interval)
复制代码
五、项目总结

本项目成功实现了一个基于STM32的边缘计算实时数据处理系统,主要功能和实现过程如下:

  • 数据采集: 使用DHT22和BMP280传感器采集温度、湿度和睦压数据。
  • 边缘处理: 在STM32上举行数据预处理,如滤波和压缩。
  • 数据传输: 利用ESP8266 Wi-Fi模块和MQTT协议将数据实时传输到云端。
  • 消息队列: 使用Apache Kafka实现高吞吐量的消息处理。
  • 后端服务: Spring Boot应用吸收并处理传感器数据,实现数据存储和简朴的分析。
  • 数据存储: 采用InfluxDB时序数据库存储大量传感器数据。
  • 数据可视化: 使用Grafana创建直观的数据仪表板。
通过这个系统,我们成功地将数据处理前移到边缘装备,减轻了后端服务器的负担,同时实现了近实时的数据分析和可视化。该系统具有精良的可扩展性,可以根据需求添加更多的传感器和分析功能。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王國慶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表