EMQX安装
EMQX服务器安装
安装文档,见链接不另外写
https://docs.emqx.com/zh/emqx/latest/deploy/install-ubuntu.html
启动 EMQX
启动为一个 systemd 服务:
- sudo systemctl start emqx
复制代码 在windows安装客户端
在线 MQTT WebSocket 客户端工具,MQTTX Web 是开源的 MQTT 5.0 欣赏器客户端,但是经我测试没有成功,似乎有bug.
发起使用MQTT 5.0 命令行客户端工具。使用命令行上的 MQTTX,旨在帮助开发者在不需要使用图形化界面的底子上,也能更快的开发和调试 MQTT 服务与应用。
由于是后期被写的博文,图是借官方的。请自行区分一下。
平台安装后的地点
1,平台的地点
- http://127.0.0.1:18083
配景登录 用户名:test 暗码:test
Laravel中处理MQTT订阅
1,安装MQTT客户端库
在Laravel项目中安装一个MQTT客户端库。你可以使用Composer来安装 php-mqtt/client:
- composer require php-mqtt/client
复制代码 2, 新建command文件
文件路径:app/Console/Commands/MqttClientCommand.php
这段PHP代码是一个用于处理MQTT消息的命令行工具,它使用了Simps的MQTT客户端库。代码中定义了两个类:MQTTUserConfig 和 MqttClientCommand。
MQTTUserConfig 类定义了一些常量,这些常量用于配置MQTT毗连。
MqttClientCommand 类继承自 Illuminate\Console\Command,是一个命令行工具,用于订阅或发布MQTT消息。
- <?php
- namespace App\Console\Commands;
- use App\Http\Controllers\Wxapi\DeviceReportController;
- use Illuminate\Console\Command;
- use Illuminate\Support\Facades\DB;
- use Simps\MQTT\Protocol\Types;
- use Simps\MQTT\Protocol\V5;
- use Simps\MQTT\Tools\Common;
- use Simps\MQTT\Client;
- use Simps\MQTT\Config\ClientConfig;
- use Simps\MQTT\Hex\ReasonCode;
- use Swoole\Coroutine;
- use Illuminate\Support\Facades\Redis;
- class MQTTUserConfig
- {
- const SIMPS_MQTT_REMOTE_HOST = '*';
- const SIMPS_MQTT_PORT = 1883;
- const SIMPS_MQTT_SUBSCRIBE_PORT = 8083;
- const SIMPS_MQTT_USER = 'test*';
- const SIMPS_MQTT_PASSWORD = 'test*';
- }
- class MqttClientCommand extends Command
- {
- protected $signature = 'mqtt:handle {param1}';
- protected $description = '订阅物联网mqtt消息 param1:null 订阅消息, param1:public 发布消息';
- protected $mqtt ;
- const SWOOLE_MQTT_CONFIG = [
- 'open_mqtt_protocol' => true,
- 'package_max_length' => 2 * 1024 * 1024,
- 'connect_timeout' => 5.0,
- 'write_timeout' => 5.0,
- 'read_timeout' => 5.0,
- ];
- //模拟设备
- const CLiENT_IDs = [
- 'mqttx_devA',
- 'mqttx_devB',
- 'mqttx_devC',
- 'mqttx_devD'
- ];
- public function __construct()
- {
- parent::__construct();
- }
- public function handle()
- {
- $param1 =$this->argument('param1');
- // $param2 =$this->argument('param2');
- if ($param1=='subscribe') {
- $this->info('启动订阅...');
- $this->subscribeMqtt();
- } elseif ($param1=='public') {
- $this->info('启动发布...');
- $this->publishMQTT();
- }
- echo '\r\n\r\n分配工作执行完成!!!';
- }
- protected function getTestMQTT5ConnectConfig()
- {
- $config = new ClientConfig();
- $UserConfig = new MQTTUserConfig();
- return $config->setUserName($UserConfig::SIMPS_MQTT_USER)
- ->setPassword($UserConfig::SIMPS_MQTT_PASSWORD)
- ->setClientId(Client::genClientID())
- ->setKeepAlive(10)
- ->setDelay(3000) // 3s
- ->setMaxAttempts(5)
- ->setProperties([
- 'session_expiry_interval' => 60,
- 'receive_maximum' => 65535,
- 'topic_alias_maximum' => 65535,
- ])
- ->setProtocolLevel(5)
- ->setSwooleConfig( [
- 'open_mqtt_protocol' => true,
- 'package_max_length' => 2 * 1024 * 1024,
- 'connect_timeout' => 5.0,
- 'write_timeout' => 5.0,
- 'read_timeout' => 5.0,
- ]);
- }
- private function heartbeat($message) {
- if ($message) {
- parse_str($message,$array);
- $device = $array['imei'];
- $hash = ':mqtt:heartbeat:online'.":{$device}";
- Redis::expire($hash,30); ##30s有效
- Redis::sAdd($hash,1);
- }
- }
- /*
- * 订阅
- * private function subscribeMqtt(){
- Coroutine\run(function () {
- $client = new Client('39.108.230.87', 1883, $this->getTestMQTT5ConnectConfig());
- ....
- */
- private function subscribeMqtt(){
- Coroutine\run(function () {
- $UserConfig = new MQTTUserConfig();
- $client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, 1883,
- $this->getTestMQTT5ConnectConfig());
- $will = [
- 'topic' => 'simps-mqtt/dinweiyi/delete',
- 'qos' => 1,
- 'retain' => 0,
- 'message' => 'byebye',
- 'properties' => [
- 'will_delay_interval' => 60,
- 'message_expiry_interval' => 60,
- 'content_type' => 'test',
- 'payload_format_indicator' => true, // false 0 1
- ],
- ];
- $client->connect(true, $will);
- $topics['simps-mqtt/dinweiyi/subscribe_message'] = [
- 'qos' => 2,
- 'no_local' => true,
- 'retain_as_published' => true,
- 'retain_handling' => 2,
- ];
- $res = $client->subscribe($topics);
- $timeSincePing = time();
- var_dump($res);
- echo '\r\n\r\n connect success !!!';
- while (true) {
- try {
- $buffer = $client->recv();
- $message = null;
- if ($buffer && $buffer !== true) {
- $message = $buffer["message"];
- // QoS1 PUBACK
- if ($buffer['type'] === Types::PUBLISH && $buffer['qos'] === 1) {
- $client->send(
- [
- 'type' => Types::PUBACK,
- 'message_id' => $buffer['message_id'],
- ],
- false
- );
- }
- if ($buffer['type'] === Types::DISCONNECT) {
- echo sprintf(
- "Broker is disconnected, The reason is %s [%d]\n",
- ReasonCode::getReasonPhrase($buffer['code']),
- $buffer['code']
- );
- $client->close($buffer['code']);
- break;
- }
- $reportObj = new DeviceReportController();
- $ret = $reportObj->store($message);
- var_dump("182>>>",$ret);
- unset($reportObj);
- }
- if ($timeSincePing <= (time() - $client->getConfig()->getKeepAlive())) {
- $buffer = $client->ping();
- if ($buffer) {
- echo 'send ping success ...' ;
- $this->heartbeat($message);
- $timeSincePing = time();
- }
- }
- } catch (\Throwable $e) {
- throw $e;
- }
- }
- });
- }
- protected function getMessage() {
- $client_ids = [
- 'mqttx_devA',
- // 'mqttx_devB',
- 'mqttx_devC',
- 'mqttx_devD'
- ];
- $message = [];
- $message['clientID'] = self::CLiENT_IDs[array_rand($client_ids)];
- $message['time'] = time();
- $message['location'] = ["x"=>rand(1000,9999),"y"=>rand(1000,9999)];
- return json_encode($message);
- }
- /*
- * 发布
- */
- public function publishMQTT() {
- Coroutine\run(function () {
- $UserConfig = new MQTTUserConfig();
- $client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, $UserConfig::SIMPS_MQTT_PORT,
- $this->getTestMQTT5ConnectConfig());
- $client->connect();
- while (true) {
- $message = $this->getMessage();
- $response = $client->publish(
- 'simps-mqtt/user/subscribe_message',
- $message,
- 1,
- 0,
- 0,
- [
- 'topic_alias' => 1,
- 'message_expiry_interval' => 12,
- ]
- );
- var_dump( 'publishMQTT>>>',$message);
- Coroutine::sleep(1);
- }
- });
- }
- }
复制代码 3, 代码流程图
使用Mermaid语法描述的上述PHP代码的流程图:
流程分析:
- 开始:程序启动。
- 构造函数 __construct:初始化命令行工具。
- handle 方法:处理命令行输入。
- param1 参数:根据输入的参数决定是订阅还是发布。
- 调用 subscribeMqtt:如果参数是subscribe,则调用此方法。
- 调用 publishMQTT:如果参数是public,则调用此方法。
- Coroutine 运行 subscribeMqtt:在协程中运行订阅方法。
- 创建 MQTT 客户端并毗连:创建MQTT客户端并毗连到服务器。
- 设置遗嘱消息:设置遗嘱消息,以便在客户端意外断开时发送。
- 订阅主题:订阅特定的MQTT主题。
- 吸取消息:持续监听并吸取消息。
- 处理消息:对吸取到的消息进行处理。
- 心跳函数 heartbeat:检查设备心跳。
- 存储消息:将消息存储到数据库或其他存储系统。
- 是否断开毗连:检查客户端是否断开毗连。
- 关闭毗连:如果断开,则关闭毗连。
- Coroutine 运行 publishMQTT:在协程中运行发布方法。
- 创建 MQTT 客户端并毗连:创建MQTT客户端并毗连到服务器。
- 循环发布消息:循环发布消息。
- 获取测试消息:生成要发布的测试消息。
- 发布消息:将消息发布到MQTT服务器。
- 结束:程序结束。
配景常驻运行
1,php artisan命令在配景运行
- 打开您的终端或SSH到您的服务器。
- 使用nohup命令运行您的Artisan命令进行测试,如下所示
- php /www/wwwroot/denwei_laraveladmin/artisan mqtt:handle subscribe
复制代码 3.命令行的php的版本与web php的版本号要同等
2,使用宝塔的保卫历程开启历程
也可以添加保卫历程。
以上2种最好是只选一个
测试
打开emqx web ,在欣赏器输入http://127.0.0.0.1:18083/#/websocket
主题:
主题跟php代码内的主题是同等的。
Payload:
是发出的字符串。由于在测试中遇到json字符串转换失败。所以选择了组装字符格式。
已发送
会出现发布的主题和内容
检查发送的结果
打开数据库,检查device_report表是否成功。成功应下图所示:
实操完成
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |