前言
本文我将介绍我和我的团队自主研发计划的一款AI产品的成果展示——“基于视频AI识别技术的煤矿安全生产管理系统”。
这款产品是目前我在创业阶段和几位矿业大学的博士共同从架构计划、开发到交付的全过程中首次在博客频道发布, 我之前一直想写但没有时机来整理这套系统的架构, 因此我也特别感谢CSDN平台提供了这个时机,让我有了一定的动力来整理并分享这套系统的架构。
本文主要介绍的是系统的抽芽,系统的架构计划,业务模块的划分,容灾恢复等多个方面来介绍这款产品。
我相信这将为系统架构计划,业务模块划分,容灾业务处理本领感到狐疑的技术开发人员也能提供有价值的思绪。
人工智能在煤矿安全检测领域的发展趋势
比年来,在中美两国的AI发展中,仍然存在一定的差异。美国在AI基础研究和技术创新方面处于领先地位,而且他们拥有丰富的资金和人才支持。而中国则在应用层面敏捷追赶,特别是在工业应用和智能化建立方面取得了显着希望, 中国的AI发展更注意财产协同,推动技术与现实应用的结合,以实现更高效的生产和管理。
随着AI技术在国内的迅猛发展,AI正在重塑各个行业,尤其是在煤矿安全检测领域。煤矿业作为国民经济发展的关键支柱型财产,其智能化建立对于促进矿山安全妥当发展、确保国家能源资源安全具有举足轻重的意义。比年来,我国矿山智能化建立步伐加快,成效显着,然而仍面临发展不均衡、不充实及不协调等挑战,亟需进一步优化与提升。为深入贯彻落实《中共中心办公厅 国务院办公厅关于进一步加强矿山安全生产工作的意见》 ,大家可以参考关于印发《关于深入推进矿山智能化建立 促进矿山安全发展的指导意见》的通知 检察具体政策意见。除此之外,国家能源局也出台了关于进一步加快煤矿智能化建立促进煤炭高质量发展的通知
明白要求加强新一代通讯技术、人工智能(AI)、数据中心等信息基础办法建立,鼓励具备条件的地方建立煤炭工业互联网平台,渐渐实现煤矿生产、经营、管理等数据的智能分析和统一管理。推广AI视频智能监控、井下高精度定位、露天矿边坡监测预警等系统,强化关键地区、重点岗位的实时监控。
可见将来AI领域将对煤矿业的生产带来生产服从,安全的提升,优化资源管理和故障管理维护的猜测,促进智能化转型等多方面的影响,安全性将产生非常深远重大的意义。
项目的抽芽
自2011年结业以来,我首先在IBM Platform Computing深耕假造云计算领域,积聚了近两年的宝贵履历。随后,我转战淘米网与网易,投身于游戏开发行业,不断拓展自己的技术边界。之后,我决定踏上创业之路,专注于人工智能领域的研发与创新。一次偶然的机遇,我在CSDN上与几位来自矿业大学的博士都对人工智能发展趋势,模子训练聊的投机,同舟共济,仿佛“桃园结义”,所以我们又共同致力于推动AI技术在煤矿行业的深度应用与发展,一步步妥当前行。
在这样的矿业背景和AI发展趋势下,我们从2014年3月份开始计划一款基于视频AI识别技术的煤矿安全生产管理系统计划,并敏捷推广给西北各个煤矿企业,我主要负责全平台的系统架构计划。
系统架构计划
系统架构计划是软件开发和部署中的核心环节,尤其在煤矿行业这样复杂且高风险的环境中,架构计划需要综合思量多种因素,以确保系统的性能、可靠性和安全性。以下将从通用架构计划因素和煤矿行业特定环境因素两个方面进行分析。
系统架构计划需要思量的因素
在计划系统架构时(有对服务器架构计划开发感兴趣的可以订阅专栏 游戏服务器开发专栏 缓存计划专栏),除了要思量通用的业务需求,性能,可扩展性,高可用,安全性以及成本控制等方面之外,作为煤矿行业,他的环境又有一定的特殊性,这就决定了系统架构计划需要额外思量以下环境因素:
1. 复杂的井上井下环境
- 高湿度和粉尘浓度
煤矿井下环境通常湿度大且粉尘浓度高,这对硬件装备的耐用性和系统的稳定性提出了更高要求。需要选择防尘、防潮的装备,并计划冗余系统以应对装备故障。
- 有限的网络条件
井下井上网络通常带宽有限且延迟较高,系统需要优化数据传输协议,减少带宽占用,并支持离线模式以应对网络中断。
2. 安全性和实时性
- 安全监测需求
煤矿行业需要实时监测瓦斯浓度、温度、压力等环境参数,以防止安全变乱。系统必须具备高实时性和可靠性,确保数据收罗和处理的实时性。
- 应急响应本领
系统需要支持快速响应机制,例如在检测到危险环境时,能够自动触发报警并执行预设的应急方案。
3. 系统的可扩展性
- 模块化计划
煤矿系统通常需要集成多种功能模块(如装备监控、人员定位、生产调度等)。模块化计划可以提高系统的灵活性,便于后续功能扩展。
- 分布式架构
由于煤矿作业地区分散,系统需要接纳分布式架构,以支持多地区协同工作,同时制止单点故障。
4. 法规与合规性
煤矿行业受严格的安全法规约束,系统计划必须符合相干尺度(如数据存储合规性、隐私掩护等)
需求简要阐明
这里我简要阐明下系统计划的需求部分:操作人员可以通过任何一台电脑用浏览器登陆系统页面,部署视频监控,硬件嵌入式盒子,算法和模子的信息,部署告警的地区,设置的算法一旦触发告警战略,会立即通知给管理平台,并通过音柱对告警地点地区进行语音告警,可分析出某一个时间段内所有告警范例的趋势并针对告警趋势进行预判猜测。
以下是目前常见的煤矿安全生产过程中需要检测的算法:
告警分类 算法
人员违规及异常举动 人员违规穿越皮带,人员脱岗举动,人员在岗睡觉,规范佩戴安全帽,
规范穿戴反光衣,打电话,吸烟,人员聚集,固定场所巡检举动
主运输系统隐患管理 皮带大块煤检测,皮带异物检测,皮带跑偏检测,皮带空载
皮带煤量识别,烟火检测,高温检测,火焰检测,吸烟检测
破裂站人员报警,人员靠近仓口预警
车辆运输异常举动识别 人员下车检测,超时停车检测
软件架构计划
一、应用与服务层
该层位于架构的最顶端,直接与用户和业务需求对接。它包罗以下关键组件:
- 视频结构化:在视频结构化过程中,通过RTSP等协议传输的视频流可以被接入随处理系统中。处理系统会对这些视频流进行解码、去噪、稳定处理等预处理操作,然后,利用算法对预处理后的视频流进行特征提取和对象识别等处理,从而提取出视频中的关键信息,以便于后续的应用和分析。
- 业务模块:如“人员违规举动”、“车辆结构化”、“火点烟雾检测”等,这些模块实现了具体的业务逻辑和功能需求。
- 算法模子与特征提取:负责处理和分析数据,提取有用信息,并应用算法模子进行猜测和决议。
- 态势猜测:通过态势猜测,基于当前和汗青数据,对将来一段时间内的发展趋势进行预估,从而为决议制定提供科学依据,实时发现并预警埋伏的风险和问题,使相干人员能够提前采取步伐进行防范,从而降低损失。
二、模子与数据处理层
该层位于应用与服务层之下,专注于算法模子的部署、数据处理和特征提取。它包罗:
- BMC引擎与BMCV Engine:专为特定业务场景计划的算法引擎,提供高效的模子推理和数据处理本领。
- OpenCV与FFmpeg:开源的计算机视觉和多媒体处理库,用于图像和视频的处理与分析。
三、基础架构与资源管理层
该层位于模子与数据处理层之下,负责提供基础架构支持和资源管理服务。它包罗:
- Docker容器:用于封装和部署算法模子,提高模子的可移植性和可扩展性。通过Docker,可以轻松地在不同环境中部署和运行模子。
- K8S:作为容器编排系统,Kubernetes负责管理和调度容器化应用,提供自动化部署、扩展、管理和运行容器化应用步伐的本领。
- 操作系统内核(Kernel):操作系统的核心部分,负责管理硬件资源、提供历程调度、内存管理等功能。
- 装备驱动步伐(Device Drivers):与硬件装备通讯的接口,使操作系统能够控制和管理这些装备。
- NGINX:高性能的HTTP和反向代理服务器,用于处理网络通讯和负载均衡。
- BootLoader:引导加载步伐,负责系统启动时加载操作系统内核。
- 数据库(MySQL):作为数据存储的核心,MySQL负责存储和管理系统所需的所有结构化数据,如地区信息,装备信息,算法信息,模子信息,告警信息等。
- Redis:高性能的键值存储数据库,用于缓存数据、热数据存储,劫难恢复和崩溃拉起业务。
各个模块业务松耦合
松耦合是一种软件计划原则,旨在降低系统组件之间的依靠关系,使系统更加可扩展和易维护(感兴趣的可以阅读 :你的代码是否按照高内聚、低耦合的原则来计划的?-CSDN博客)。实在从我们的业务需求上不难划分出各个模块来,我使用了服务化计划的思绪:将应用拆分为多个独立的服务,每个服务负责特定的功能。这些服务通过轻量级的通讯协议(如REST、gRPC)进行交互,而不依靠于彼此的具体实现。
因此我拆分为三个主要的服务: AI管理平台(Platform),算法后处理服务(AIEC)和算法处理服务(AIInterface),此中AIEC和AIInterface都是运行在嵌入式盒子大概服务器上的,这就要求AIEC和AIInterface适配不同的厂家硬件。
Platform:
平台的功能相对简单,主要是提供人机交互界面,操作人员登陆后对相机,算法,模子等资源进行设置和升级管理:
吸收和检察算法告警信息
根据告警汗青记录进行告警的态势猜测:
AIEC:
AIEC现实上充当的是算法调度器的角色,由于它运行在arm嵌入式盒子大概服务器上,那么他就需要适配多种硬件厂商来满足不同场景下的业务需要。
思量到适配不同硬件,而且要对算法调度,因此我们抽象化了硬件接口:
- type IBaseDevice interface {
- GetDeviceType() string //获取设备类型
- Init() error //启动的时候初始化一些配置文件
- StartAI() error //开启算法接口(必须初始化一些配置检查之类的)
- StopAI() error //关闭算法接口
- StatDeep() error //检查ai算法服务状态
- GetDeepState() (bool, string, error)
- ReloadAIProcess() error //重启ai算法服务
- Terminal() error //关闭ai算法服务
- Stop() error //暂停ai算法服务
- }
复制代码 对于不同的硬件比如华为,算能,英伟达只需要实现以上接口即可。
同样的,由于AIEC需要对算法服务的检测结果要进一步的分析推理,因此我们要对不同的算法进行抽象,现实上就只有一个接口方法DetectAnlysis(result *AIResultReq)而已,通过调用接口方法可以获取到AIInterface的检测结果,并根据不同的算法范例进行各自的推理分析。
比如离岗检测,需要统计某一个时间段内,目的地区是否有人员识别结果,一旦识别到人,则重置识别状态和结果,否则到达时间段的阈值则触发告警。
怎样调度的简要阐明:
我们通过RESTful接口吸收来自Platform的相机和算法更新信息,处理这些请求,并根据相机和算法调度相应的检测算法服务。之后,我们异步获取算法返回的检测结果,进行进一步的分析推理,终极将需要告警的信息推送回Platform。
AIInterface:
同样的,AIInterface也需要根据不同硬件装备(常见的SE算能,英伟达Nvidia,华为Huawei盒子、服务器)来适配做算法处理,根据视频流信息和算法信息进行模子 推理,通过rabbitmq消息队列将算法检测结果通知给AIEC。
由于不同的硬件资源他们的算力不同,因此对于不同的硬件,我们也接纳多历程多模子计算,单历程多模子等多种灵活的设置方案来办理算力和计算性能上的问题。
模子训练
我们都知道,模子训练在机器学习和人工智能领域中扮演着至关重要的角色。它不但是构建有用模子的基础,也是实现高准确性和可靠性的关键过程。
我们大多数模子训练都是通过自己收罗数据->人工标注->模子训练这样的方式来进行的,以人以及穿戴装备为例,大多数据集我们都使用了公开的数据集。
我就以安全帽识别为例,我们使用了 Kaggle的数据集:Safety Helmet Detection | KaggleSafety Helmet Detection | Kaggle
该数据集包罗 5000 个带有边界框注释的图像,格式为 PASCAL VOC 格式且主要针对以下 3 个类别:helmet, head person,存在数据不平衡:
从上面绘制的直方图中可以清晰看到,数据集中几乎 75% 的出现都属于头盔类,这不是什么功德。
虽然这个数据集从来都不是我们的首选,但这是我们在互联网上能找到的唯一像样的数据集。我们将整个数据集按照 8:1:1 的比例划分为训练集、验证集和测试集。上述集合分别包罗 4000、500 和 500 张图像。
我们使用了预先训练的Faster R-CNN 模子,并以 Resnet50 为主 ,并根据该数据集的训练分割进行了微调。让我们看看训练模子的代码片段:
用于训练迭代的函数定义:
- from tqdm import tqdm
- def train(train_data_loader, model):
- print('Training...')
- global train_itr
- global train_loss_list
- # initialize tqdm progress bar
- prog_bar = tqdm(train_data_loader, total=len(train_data_loader))
- # criterion = torch.nn.BCELoss(size_average=True)
- for i, data in enumerate(prog_bar):
- optimizer.zero_grad() #####
- images, targets = data #####
- images = list(image.to(DEVICE) for image in images) #####
- targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets] #####
- loss_dict = model(images, targets) #####
- losses = sum(loss for loss in loss_dict.values()) #####
- loss_value = losses.item()
- train_loss_list.append(loss_value)
- train_loss_hist.send(loss_value)
- losses.backward() #####
- optimizer.step() #####
- train_itr += 1 #####
- # update the loss value beside the progress bar for each iteration
- prog_bar.set_description(desc=f"Loss: {loss_value:.4f}")
- return train_loss_list
复制代码 验证迭代的函数定义:
- # function for running validation iterations
- def validate(valid_data_loader, model):
- print('Validating...')
- global val_itr
- global val_loss_list
- # initialize tqdm progress bar
- prog_bar = tqdm(valid_data_loader, total=len(valid_data_loader))
- for i, data in enumerate(prog_bar):
- images, targets = data
- images = list(image.to(DEVICE) for image in images)
- targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]
- with torch.no_grad():
- loss_dict = model(images, targets)
- losses = sum(loss for loss in loss_dict.values())
- loss_value = losses.item()
- val_loss_list.append(loss_value)
- val_loss_hist.send(loss_value)
- val_itr += 1
- # update the loss value beside the progress bar for each iteration
- prog_bar.set_description(desc=f"Loss: {loss_value:.4f}")
- return val_loss_list
复制代码 平均类用于记录训练和验证损失
- class Averager:
- def __init__(self):
- self.current_total = 0.0
- self.iterations = 0.0
- def send(self, value):
- self.current_total += value
- self.iterations += 1
- @property
- def value(self):
- if self.iterations == 0:
- return 0
- else:
- return 1.0 * self.current_total / self.iterations
- def reset(self):
- self.current_total = 0.0
- self.iterations = 0.0
- class SaveBestModel:
- """
- Class to save the best model while training. If the current epoch's
- validation loss is less than the previous least less, then save the
- model state.
- """
- def __init__(self, best_valid_loss=float('inf')):
- self.best_valid_loss = best_valid_loss
- def __call__(self, current_valid_loss, epoch, model, optimizer):
- if current_valid_loss < self.best_valid_loss:
- self.best_valid_loss = current_valid_loss
- print(f"\nBest validation loss: {self.best_valid_loss:.3f}")
- print(f"\nSaving best model for epoch: {epoch+1}\n")
- torch.save({
- 'epoch': epoch+1,
- 'model_state_dict': model.state_dict(),
- 'optimizer_state_dict': optimizer.state_dict(),
- 'scheduler_state_dict': scheduler.state_dict(),
- }, '/content/drive/My Drive/helmet_dataset/savedmodel/best_model.pth')
- # function to save the model after each epoch and after training ends
- def save_model(epoch, model, optimizer):
- """
- Function to save the trained model till current epoch, or whenver called
- """
- torch.save({
- 'epoch': epoch+1,
- 'model_state_dict': model.state_dict(),
- 'optimizer_state_dict': optimizer.state_dict(),
- 'scheduler_state_dict': scheduler.state_dict(),
- }, '/content/drive/My Drive/helmet_dataset/savedmodel/last_model.pth')
复制代码 训练循环
- train_loss_hist = Averager()
- val_loss_hist = Averager()
- train_itr = 1
- val_itr = 1
- # train and validation loss lists to store loss values of all
- # iterations till end and plot graphs for all iterations
- train_loss_list = []
- val_loss_list = []
- # initialize SaveBestModel class
- save_best_model = SaveBestModel()
- NUM_EPOCHS = 16 # number of epochs to train for
- epoch=0
- # start the training epochs
- for epoch in range(epoch, NUM_EPOCHS):
- print(f"\nEPOCH {epoch+1} of {NUM_EPOCHS}")
- # reset the training and validation loss histories for the current epoch
- train_loss_hist.reset()
- val_loss_hist.reset()
- train_loss = train(train_loader, model)
- val_loss = validate(valid_loader, model)
- print(scheduler.get_last_lr()) # this prints the lr used in training that specific epoch
- scheduler.step()
- print(f"Epoch #{epoch+1} train loss: {train_loss_hist.value:.3f}")
- print(f"Epoch #{epoch+1} validation loss: {val_loss_hist.value:.3f}")
- # save the best model till now if we have the least loss in the current epoch
- save_best_model(val_loss_hist.value, epoch, model, optimizer)
- # save the current epoch model
- save_model(epoch, model, optimizer)
复制代码 这些代码片段将指导我们完成用于该模子的整个训练过程。我们首先定义了用于迭代训练和验证数据集的函数。然后定义了用于记录损失和保存最佳和最后模子的类。然后我们有了训练循环的终极代码片段。
终极经过训练的模子图如下:

我们测试下训练结果:
设置置信度为0.65,选择一张测试图片,我们来看下检测结果

检测结果信息:
{
"predictions": [
{
"x": 244.5,
"y": 66.5,
"width": 111,
"height": 79,
"confidence": 0.891,
"class": "helmet",
"class_id": 1,
"detection_id": "d6ca5493-b249-44b2-b857-56d98057051d"
},
{
"x": 189,
"y": 366,
"width": 40,
"height": 40,
"confidence": 0.83,
"class": "boots",
"class_id": 0,
"detection_id": "24dc2b28-235d-42ea-9c17-b4583ee75295"
},
{
"x": 198,
"y": 237,
"width": 312,
"height": 388,
"confidence": 0.731,
"class": "person",
"class_id": 5,
"detection_id": "7c5d5d4f-931e-4486-ac98-b56a8984ff80"
}
]
}
硬件适配
我将从硬件性能、开发 SDK 和适用场景三个方面,对 SE 算能盒子、英伟达 Nvidia 盒子(如 Jetson 系列)以及华为盒子(如昇腾系列)进行具体对比。
硬件对比
特性SE 算能盒子英伟达 Nvidia 盒子华为盒子处理器架构专用 AI 加快芯片(如 BM1684X TPU)Tegra 系列 SoC(如 Jetson Xavier NX)昇腾 AI 芯片(如昇腾310/910)算力INT8 算力高达 32 TOPS(BM1684X)FP16 算力高达 32 TOPS(Jetson AGX Xavier)FP16 算力高达 256 TOPS(昇腾910)功耗低功耗计划,范例功耗 15W-30W功耗范围 10W-30W(根据型号不同)功耗范围 30W-310W(根据型号不同)存储与内存DDR4/DDR5 内存,eMMC 存储LPDDR4 内存,eMMC 或 NVMe 存储HBM 高带宽内存(如 HBM2e)接口支持丰富的 I/O 接口(HDMI、USB、以太网等)支持 PCIe、USB、HDMI、GPIO 等支持 PCIe、以太网、NVLink 等高性能接口适用场景视频分析、工业检测、边沿 AI 推理自动驾驶、机器人、边沿 AI 推理云端 AI 训练、大规模推理、边沿计算 硬件总结
- SE 算能盒子:专注于低功耗、高效的边沿 AI 推理,恰当工业和视频分析场景。
- 英伟达 Nvidia 盒子:通用性强,支持从边沿推理到图形处理的多种任务。
- 华为盒子:以高性能为主,恰当云端 AI 训练和大规模推理任务。
SDK 对比
特性SE 算能盒子 SDK英伟达 Nvidia SDK华为盒子 SDK开发工具Sophon SDK,支持模子优化、推理加快JetPack SDK,包罗 CUDA、cuDNN、TensorRT 等工具MindSpore 和 CANN,支持训练与推理支持框架TensorFlow、PyTorch、Caffe 等TensorFlow、PyTorch、ONNX 等TensorFlow、PyTorch、MindSpore 等模子优化BMCompiler 和 BMOptimizerTensorRT 进行模子优化和推理加快CANN 提供模子编译与优化编程语言支持C++、PythonC++、PythonC++、Python文档与社区支持文档较完善,社区支持相对较少文档丰富,社区活跃,支持广泛文档完善,社区支持渐渐加强 SDK 总结
- SE 算能盒子:Sophon SDK 针对其专用硬件进行了深度优化,恰当特定 AI 推理任务。
- 英伟达 Nvidia 盒子:JetPack SDK 提供全面的开发工具,生态系统成熟,恰当多样化开发需求。
- 华为盒子:MindSpore 和 CANN 提供从训练到推理的完整支持,恰当高性能 AI 应用。
华为适配
从刚才的硬件,SDK对比结果不丢脸出,不同硬件装备的适配方案是各有不同的,这里我就以以华为服务器为例, 华为的 MindX 组件 是昇腾 AI 生态系统中的重要部分,旨在简化 AI 应用的开发和部署。通过与 pipeline(数据管道)相结合,MindX 提供了一种高效的方式来处理数据流和任务流,从而实现复杂的 AI 应用场景。 开发文档大家可以参考: 简介-mxVision 用户指南-智能视频分析-MindSDK5.0.0开发文档-昇腾社区
MindX 提供了多个功能模块,这些模块可以通过 pipeline 进行灵活组合。例如:
- 数据解码模块:负责将输入数据(如图片或视频流)解码为可处理的格式。
- 推理模块:调用 AI 模子进行推理。
- 后处理模块:对推理结果进行处理,如生成可视化输出或统计分析。
- 设置驱动:pipeline 的设置文件定义了各模块的执行顺序、参数设置以及模块间的数据运动。
1.设置pipeline
例如设置我们的pipeline信息如下:
- {
- "detection": {
- "appsink0": {
- "factory": "appsink"
- },
- "mxpi_imageresize0": {
- "factory": "mxpi_imageresize",
- "next": "mxpi_modelinfer0",
- "props": {
- "interpolation": "1",
- "resizeType": "Resizer_KeepAspectRatio_Fit",
- "paddingType": "Padding_NO"
- }
- },
- "mxpi_modelinfer0": {
- "factory": "mxpi_modelinfer",
- "next": "appsink0",
- "props": {
- "dataSource": "mxpi_imageresize0",
- "deviceId": "0",
- "labelPath": "/data0/algorithm/hvhpp/model/hvhpp.names",
- "modelPath": "/data0/algorithm/hvhpp/model/hvhpp.om",
- "postProcessConfigPath": "/data0/algorithm/hvhpp/model/yolov5.cfg",
- "postProcessLibPath": "/data0/algorithm/mxVision-5.0.1/lib/libMpYOLOv5PostProcessor.so"
- }
- },
- "mxpi_parallel2serial0": {
- "factory": "mxpi_parallel2serial",
- "next": "mxpi_imageresize0",
- "props": {
- "dataSource": "mxpi_videodecoder1, mxpi_videodecoder22, mxpi_videodecoder14, mxpi_videodecoder3, mxpi_videodecoder10, mxpi_videodecoder15, mxpi_videodecoder4, mxpi_videodecoder24, mxpi_videodecoder5, mxpi_videodecoder21, mxpi_videodecoder0, mxpi_videodecoder9, mxpi_videodecoder2"
- }
- },
- "mxpi_rtspsrc0": {
- "factory": "mxpi_rtspsrc",
- "next": "mxpi_videodecoder0",
- "props": {
- "channelId": "0",
- "rtspUrl": "rtsp://admin:a!123456789@192.168.2.81:554/media1/video2/video"
- }
- },
- "mxpi_rtspsrc1": {
- "factory": "mxpi_rtspsrc",
- "next": "mxpi_videodecoder1",
- "props": {
- "channelId": "1",
- "rtspUrl": "rtsp://admin:a!123456789@192.168.2.83:554/media1/video2/video"
- }
- },
- "mxpi_rtspsrc10": {
- "factory": "mxpi_rtspsrc",
- "next": "mxpi_videodecoder10",
- "props": {
- "channelId": "10",
- "rtspUrl": "rtsp://admin:a!123456789@192.168.2.93:554/media1/video2/video"
- }
- },
- "mxpi_rtspsrc14": {
- "factory": "mxpi_rtspsrc",
- "next": "mxpi_videodecoder14",
- "props": {
- "channelId": "14",
- "rtspUrl": "rtsp://admin:a!123456789@192.168.2.90:554/media1/video2/video"
- }
- },
- "mxpi_rtspsrc15": {
- "factory": "mxpi_rtspsrc",
- "next": "mxpi_videodecoder15",
- "props": {
- "channelId": "15",
- "rtspUrl": "rtsp://admin:a!123456789@192.168.2.187:554/media1/video2/video"
- }
- },
- "mxpi_rtspsrc2": {
- "factory": "mxpi_rtspsrc",
- "next": "mxpi_videodecoder2",
- "props": {
- "channelId": "2",
- "rtspUrl": "rtsp://admin:a!123456789@192.168.2.82:554/media1/video2/video"
- }
- },
- "mxpi_rtspsrc21": {
- "factory": "mxpi_rtspsrc",
- "next": "mxpi_videodecoder21",
- "props": {
- "channelId": "21",
- "rtspUrl": "rtsp://admin:a!123456789@192.168.2.225:554/media1/video2/video"
- }
- },
- "mxpi_rtspsrc22": {
- "factory": "mxpi_rtspsrc",
- "next": "mxpi_videodecoder22",
- "props": {
- "channelId": "22",
- "rtspUrl": "rtsp://admin:a!123456789@192.168.2.95:554/media1/video2/video"
- }
- },
- "mxpi_rtspsrc24": {
- "factory": "mxpi_rtspsrc",
- "next": "mxpi_videodecoder24",
- "props": {
- "channelId": "24",
- "rtspUrl": "rtsp://admin:a!123456789@192.168.1.105:554/media1/video2/video"
- }
- },
- "mxpi_rtspsrc3": {
- "factory": "mxpi_rtspsrc",
- "next": "mxpi_videodecoder3",
- "props": {
- "channelId": "3",
- "rtspUrl": "rtsp://admin:a!123456789@192.168.1.102:554/media1/video2/video"
- }
- },
- "mxpi_rtspsrc4": {
- "factory": "mxpi_rtspsrc",
- "next": "mxpi_videodecoder4",
- "props": {
- "channelId": "4",
- "rtspUrl": "rtsp://admin:a!123456789@192.168.1.101:554/media1/video2/video"
- }
- },
- "mxpi_rtspsrc5": {
- "factory": "mxpi_rtspsrc",
- "next": "mxpi_videodecoder5",
- "props": {
- "channelId": "5",
- "rtspUrl": "rtsp://admin:a!123456789@192.168.1.100:554/media1/video2/video"
- }
- },
- "mxpi_rtspsrc9": {
- "factory": "mxpi_rtspsrc",
- "next": "mxpi_videodecoder9",
- "props": {
- "channelId": "9",
- "rtspUrl": "rtsp://admin:a!123456789@192.168.1.100:554/media1/video2/video"
- }
- },
- "mxpi_videodecoder0": {
- "factory": "mxpi_videodecoder",
- "next": "mxpi_parallel2serial0:10",
- "props": {
- "deviceId": "0",
- "skipFrame": "30",
- "inputVideoFormat": "H264",
- "outputImageFormat": "YUV420SP_NV12",
- "vdecChannelId": "0"
- }
- },
- "mxpi_videodecoder1": {
- "factory": "mxpi_videodecoder",
- "next": "mxpi_parallel2serial0:0",
- "props": {
- "deviceId": "0",
- "skipFrame": "30",
- "inputVideoFormat": "H264",
- "outputImageFormat": "YUV420SP_NV12",
- "vdecChannelId": "1"
- }
- },
- "mxpi_videodecoder10": {
- "factory": "mxpi_videodecoder",
- "next": "mxpi_parallel2serial0:4",
- "props": {
- "deviceId": "0",
- "skipFrame": "30",
- "inputVideoFormat": "H264",
- "outputImageFormat": "YUV420SP_NV12",
- "vdecChannelId": "10"
- }
- },
- "mxpi_videodecoder14": {
- "factory": "mxpi_videodecoder",
- "next": "mxpi_parallel2serial0:2",
- "props": {
- "deviceId": "0",
- "skipFrame": "30",
- "inputVideoFormat": "H264",
- "outputImageFormat": "YUV420SP_NV12",
- "vdecChannelId": "14"
- }
- },
- "mxpi_videodecoder15": {
- "factory": "mxpi_videodecoder",
- "next": "mxpi_parallel2serial0:5",
- "props": {
- "deviceId": "0",
- "skipFrame": "30",
- "inputVideoFormat": "H264",
- "outputImageFormat": "YUV420SP_NV12",
- "vdecChannelId": "15"
- }
- },
- "mxpi_videodecoder2": {
- "factory": "mxpi_videodecoder",
- "next": "mxpi_parallel2serial0:12",
- "props": {
- "deviceId": "0",
- "skipFrame": "30",
- "inputVideoFormat": "H264",
- "outputImageFormat": "YUV420SP_NV12",
- "vdecChannelId": "2"
- }
- },
- "mxpi_videodecoder21": {
- "factory": "mxpi_videodecoder",
- "next": "mxpi_parallel2serial0:9",
- "props": {
- "deviceId": "0",
- "skipFrame": "30",
- "inputVideoFormat": "H264",
- "outputImageFormat": "YUV420SP_NV12",
- "vdecChannelId": "21"
- }
- },
- "mxpi_videodecoder22": {
- "factory": "mxpi_videodecoder",
- "next": "mxpi_parallel2serial0:1",
- "props": {
- "deviceId": "0",
- "skipFrame": "30",
- "inputVideoFormat": "H264",
- "outputImageFormat": "YUV420SP_NV12",
- "vdecChannelId": "22"
- }
- },
- "mxpi_videodecoder24": {
- "factory": "mxpi_videodecoder",
- "next": "mxpi_parallel2serial0:7",
- "props": {
- "deviceId": "0",
- "skipFrame": "30",
- "inputVideoFormat": "H264",
- "outputImageFormat": "YUV420SP_NV12",
- "vdecChannelId": "24"
- }
- },
- "mxpi_videodecoder3": {
- "factory": "mxpi_videodecoder",
- "next": "mxpi_parallel2serial0:3",
- "props": {
- "deviceId": "0",
- "skipFrame": "30",
- "inputVideoFormat": "H264",
- "outputImageFormat": "YUV420SP_NV12",
- "vdecChannelId": "3"
- }
- },
- "mxpi_videodecoder4": {
- "factory": "mxpi_videodecoder",
- "next": "mxpi_parallel2serial0:6",
- "props": {
- "deviceId": "0",
- "skipFrame": "30",
- "inputVideoFormat": "H264",
- "outputImageFormat": "YUV420SP_NV12",
- "vdecChannelId": "4"
- }
- },
- "mxpi_videodecoder5": {
- "factory": "mxpi_videodecoder",
- "next": "mxpi_parallel2serial0:8",
- "props": {
- "deviceId": "0",
- "skipFrame": "30",
- "inputVideoFormat": "H264",
- "outputImageFormat": "YUV420SP_NV12",
- "vdecChannelId": "5"
- }
- },
- "mxpi_videodecoder9": {
- "factory": "mxpi_videodecoder",
- "next": "mxpi_parallel2serial0:11",
- "props": {
- "deviceId": "0",
- "skipFrame": "30",
- "inputVideoFormat": "H264",
- "outputImageFormat": "YUV420SP_NV12",
- "vdecChannelId": "9"
- }
- },
- "stream_config": {
- "deviceId": "0"
- }
- }
- }
复制代码 2.设置信息:
模子训练om模子是华为昇腾AI处理器支持的离线推理模子格式,因此我们使用om模子和对应的模子文件设置。
3.加载模子启动算法检测
启动算法脚本python编写:
- import os
- import cv2
- import redis
- import json
- import time
- import base64
- import struct
- import numpy as np
- from StreamManagerApi import StreamManagerApi, MxDataInput, StringVector
- import MxpiDataType_pb2 as MxpiDataType
- # redis
- r = redis.StrictRedis(host='localhost', port=6379)
- # The following belongs to the SDK Process
- streamManagerApi = StreamManagerApi()
- # init stream manager
- ret = streamManagerApi.InitManager()
- if ret != 0:
- print("Failed to init Stream manager, ret=%s" % str(ret))
- exit()
- # create streams by pipeline config file
- # load pipline
- cur_dir = os.path.dirname(os.path.abspath(__file__))
- print(cur_dir)
- cur_dir = cur_dir + "/pipeline/video.pipeline"
- print(cur_dir)
- with open(cur_dir, 'rb') as f:
- pipelineStr = f.read()
- ret = streamManagerApi.CreateMultipleStreams(pipelineStr)
- # Print error message
- if ret != 0:
- print("Failed to create Stream, ret=%s" % str(ret))
- # Stream name
- streamName = b'detection'
- # Obtain the inference result by specifying streamName and keyVec
- # The data that needs to be obtained is searched by the plug-in name
- keys = [b"ReservedFrameInfo", b"mxpi_modelinfer0", b"mxpi_parallel2serial0"]
- keyVec = StringVector()
- for key in keys:
- keyVec.push_back(key)
- while True:
- # Get data through GetResult
- infer_result = streamManagerApi.GetResult(streamName, b'appsink0', keyVec)
- # Determine whether the output is empty
- if infer_result.metadataVec.size() == 0:
- print("infer_result is null")
- # continue
- # Frame information structure
- frameList = MxpiDataType.MxpiFrameInfo()
- frameList.ParseFromString(infer_result.metadataVec[0].serializedMetadata)
- # Objectpostprocessor information
- objectList = MxpiDataType.MxpiObjectList()
- objectList.ParseFromString(infer_result.metadataVec[1].serializedMetadata)
- # Videodecoder information
- visionList = MxpiDataType.MxpiVisionList()
- visionList.ParseFromString(infer_result.metadataVec[2].serializedMetadata)
- vision_data = visionList.visionVec[0].visionData.dataStr
- visionInfo = visionList.visionVec[0].visionInfo
- # # cv2 func YUV to BGR
- # YUV_BYTES_NU = 3
- # YUV_BYTES_DE = 2
- # img_yuv = np.frombuffer(vision_data, np.uint8)
- # # reshape
- # img_bgr = img_yuv.reshape(visionInfo.heightAligned * YUV_BYTES_NU // YUV_BYTES_DE, visionInfo.widthAligned)
- # # Color gamut conversion
- # img = cv2.cvtColor(img_bgr, getattr(cv2, "COLOR_YUV2BGR_NV12"))
- Id = frameList.frameId
- bboxes = []
- # fire or not
- # if len(objectList.objectVec) == 0:
- # continue
- json_str = '{' + '"StreamID":"{}","list":['.format(frameList.channelId)
- # json_str = '{' + '"StreamID":"1679044571","list":['
- print(len(objectList.objectVec))
- list = ""
- for i in range(len(objectList.objectVec)):
- # get ObjectList
- results = objectList.objectVec[i]
- bboxes = {'x0': int(results.x0),
- 'x1': int(results.x1),
- 'y0': int(results.y0),
- 'y1': int(results.y1),
- 'confidence': round(results.classVec[0].confidence, 4),
- 'text': results.classVec[0].className}
- text = "{}{}".format(str(bboxes['confidence']), " ")
- list = list + '{' + '"x1":{},"y1":{},"x2":{},"y2":{},"data":"NULL","class":"{}","confidence":{},"track_id":-1'.format(bboxes['x0'], bboxes['y0'], bboxes['x1'], bboxes['y1'], bboxes['text'], bboxes['confidence']) + '}'
- if(i < len(objectList.objectVec) - 1):
- list = list + ','
- # Draw rectangle
- # for item in bboxes['text']:
- # text += item
- # cv2.putText(img, text, (bboxes['x0'] + 10, bboxes['y0'] + 10), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2)
- # cv2.rectangle(img, (bboxes['x0'], bboxes['y0']), (bboxes['x1'], bboxes['y1']), (0, 0, 255), 4)
- # send_json_time = int(time.time())
- send_json_time = int(time.time() * 1000)
- if len(objectList.objectVec) == 0:
- json_str = json_str + ']}'
- else:
- json_str = json_str + list + ']}'
- list_buf = '{' + '"Long_j":{},"width":{},"height":{},"timestamp":{},"json":{},"long_p":{}'.format(len(json_str), visionInfo.width, visionInfo.height, send_json_time, json_str, len(vision_data)) + '}'
- # 计算list_buf不包含图片的大小,(大小头)小头保存长度
- len_little = len(list_buf).to_bytes(4, byteorder='little', signed=True)
- #转成字节
- list_buf = bytes(list_buf,'UTF-8')
- # 拼接图片内容
- print(list_buf)
- print("\n")
- list_buf = len_little + list_buf + vision_data
- r.publish("mychannel",list_buf)
- # # save picture
- # Id = frameList.frameId
- # result_path = "./result/"
- # if os.path.exists(result_path) != 1:
- # os.makedirs("./result/")
- # oringe_imgfile = './result/image' + '-' + str(Id) + '.jpg'
- # print("Warning! Fire or smoke detected")
- # print("Result save in ",oringe_imgfile)
- # cv2.imwrite(oringe_imgfile, img)
- # Destroy All Streams
- streamManagerApi.DestroyAllStreams()
复制代码 执行算法检测:

服务自启和崩溃拉起
崩溃拉起和服务自启是当代软件系统中不可或缺的机制,通过自动监控和恢复,可以显着提高系统的可用性和稳定性。服务在运行过程中大概会存在各种各样的场景,比如异常断电,断网,硬件故障等,崩溃拉起和服务自启可以确保应用步伐高可用性和稳定性。
关于高可用,容灾恢复问题,我曾经在CSDN里的这篇文章里做过方案,大家可以通过阅读这篇文章来寻找答案:
变形记---容灾恢复(一),异常崩溃引发服务器丢档或无法正常运行-CSDN博客
这里我只阐明AIEC的服务自启和崩溃拉起的机制,由于Platform和算法AIInterface的相对较为简单,完满是靠操作系统的服务管理设置项大概由AIEC来控制的。
AIEC自启动机制
aiec承接着非常重要的算法资源调度工作,当aiec服务发生崩溃后,服务假如不再正常运行,则会给煤矿生产带来非常重大的安全隐患和损失,我们使用两种方式来确保aiec边沿计算的可用性:
- 通过Platform平台可定时检测AIEC的心跳来监控其运行状态,假如历程一旦崩溃则触发重启操作。
- aiec自身有服务自启机制,通过借助linux的systemd大概init.d的服务器管理设置项来设置启动战略。

关于aiec的自启动机制,我提供了以下自启动脚本:
- #!/bin/bash
- ## 创建systemctl服务配置文件
- ## $1 服务名称,
- ## $2 pwd
- ## $3程序位置
- ## $4 程序文件
- ## $5名字
- ## $6 after
- ## $7用户
- function createService() {
- {
- echo "[Unit]"
- echo "Description=$5"
- #echo "After=$6"
- echo "Requires=aiec.service mysql.service"
- echo "Wants=aiec.service mysql.service"
- echo ""
- echo "[Service]"
- echo "User=$7"
- echo "Restart=always"
- echo "Group=$7"
- echo "Type=forking"
- echo "ExecStart=/bin/bash $2/$3/$4 start"
- echo "ExecStop=/bin/bash $2/$3/$4 stop"
- echo "ExecReload=/bin/bash $2/$3/$4 restart"
- echo ""
- echo "[Install]"
- echo "WantedBy=multi-user.target"
- } >"$1"
- }
- ## 创建systemctl服务
- function installService() {
- sudo systemctl daemon-reload
- systemctl enable "$1"
- systemctl start "$1"
- }
- ## 创建服务启动、关闭、重启命令脚本
- function createCommandScript() {
- # 脚本文件名
- scriptName="$1"
- # 服务目录
- serverPath="$(pwd)/$2"
- # 服务名称
- serverName="$3"
- # 服务启动命令
- serverStartCommand="$4"
- # systemctl服务名称
- systemctlName="$5"
- {
- echo "#!/bin/bash"
- echo ""
- echo "case \$1 in"
- echo "start)"
- echo " cd $serverPath && $serverStartCommand"
- echo " ;;"
- echo "stop)"
- echo " if [[ -n \$(pgrep "$serverName") ]]; then"
- echo " pgrep $serverName | sudo xargs kill -9"
- echo " fi"
- echo " ;;"
- echo "restart)"
- echo " if [[ -n \$(pgrep "$serverName") ]]; then"
- echo " pgrep $serverName | sudo xargs kill -9"
- echo " fi"
- echo " cd $serverPath && $serverStartCommand"
- echo " ;;"
- echo "reload)"
- echo " systemctl restart $systemctlName"
- echo " ;;"
- echo "esac"
- } >"$scriptName"
- chmod 777 "$scriptName"
- }
- ## 创建对应服务启动、关闭、重启命令脚本
- # aiec启动脚本
- createCommandScript "./aiec.sh" "" "aiec" "nohup ./aiec >/dev/null &" "aiec.service"
- ## 创建aiec的服务
- createService "/etc/systemd/system/aiec.service" "$(pwd)" "" "aiec.sh" "aiec.service" "redis.service" "root"
- ## 设置对应服务自启动
- installService "aiec.service"
复制代码 执行shell脚本后会自动创建一个aiec服务,并启动aiec服务。

AIEC崩溃拉起机制
当一个服务发生崩溃而自启动,假如没有一个安全的崩溃拉起机制,大概会导致多种严肃问题:
- 未保存的数据丢失:服务崩溃时,任何未保存的重要的检测结果或临时数据都大概会丢失,导致AIEC自动后依靠的数据找不到,算法分析出现异常。
- 影响上下文分析:假如服务器启动后部分算法的分析需要借助上下文的临时数据来分析这些告警数据,假如崩溃之前的临时数据丢失就很有大概造成数据状态不一致,影响检测告警结果。
这里我给大家提供几个我专栏里的关于崩溃和缓存的文章:
变形记---容灾恢复(一),异常崩溃引发服务器丢档或无法正常运行-CSDN博客
使用缓存战略优化系统性能:综合指南
掌握服务器缓存战略
我们会把每次关键的检测结果缓存到Redis中,当崩溃拉起的时候,就能获取到崩溃前的记录,这样就能根据上下文继续进行算法推理分析了。
后记
我们团队每个人的每一小步都能为人工智能行业和煤矿领域带来积极的进步。纵然这些步子微小,缓慢,但我相信,总会有人在默默推动着这一历程向前发展。
每一个小的积极和实验,都是为将来的更大突破奠定基础。正是这些看似微不敷道的希望,汇聚起来才能形成推动整个行业前行的强盛动力。无论前路多么艰难,我都愿意坚定地迈出这一步,为实现更高效、更安全的煤矿作业和更智能的AI应用贡献自己的气力。
最后,我相信人工智能不但能够推动煤矿安全检测的发展,也能促进越来越多行业的高效安全生产,助力其在各个领域扎根生长。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |