MQTT客户端实战:从毗连到通信。具体分析MQTT客户端和MQTT代理进行通信 ...

打印 上一主题 下一主题

主题 1029|帖子 1029|积分 3087


EMQX安装

EMQX服务器安装

安装文档,见链接不另外写
https://docs.emqx.com/zh/emqx/latest/deploy/install-ubuntu.html
启动 EMQX

启动为一个 systemd 服务:
  1. sudo systemctl start emqx
复制代码
在windows安装客户端

在线 MQTT WebSocket 客户端工具,MQTTX Web 是开源的 MQTT 5.0 欣赏器客户端,但是经我测试没有成功,似乎有bug.
发起使用MQTT 5.0 命令行客户端工具。使用命令行上的 MQTTX,旨在帮助开发者在不需要使用图形化界面的底子上,也能更快的开发和调试 MQTT 服务与应用。

由于是后期被写的博文,图是借官方的。请自行区分一下。
平台安装后的地点

   1,平台的地点
  

  • http://127.0.0.1:18083
    配景登录 用户名:test 暗码:test
  Laravel中处理MQTT订阅

1,安装MQTT客户端库

在Laravel项目中安装一个MQTT客户端库。你可以使用Composer来安装 php-mqtt/client:
  1. composer require php-mqtt/client
复制代码
2, 新建command文件

文件路径:app/Console/Commands/MqttClientCommand.php
这段PHP代码是一个用于处理MQTT消息的命令行工具,它使用了Simps的MQTT客户端库。代码中定义了两个类:MQTTUserConfig 和 MqttClientCommand。
MQTTUserConfig 类定义了一些常量,这些常量用于配置MQTT毗连。
MqttClientCommand 类继承自 Illuminate\Console\Command,是一个命令行工具,用于订阅或发布MQTT消息。
  1. <?php
  2. namespace App\Console\Commands;
  3. use App\Http\Controllers\Wxapi\DeviceReportController;
  4. use Illuminate\Console\Command;
  5. use Illuminate\Support\Facades\DB;
  6. use Simps\MQTT\Protocol\Types;
  7. use Simps\MQTT\Protocol\V5;
  8. use Simps\MQTT\Tools\Common;
  9. use Simps\MQTT\Client;
  10. use Simps\MQTT\Config\ClientConfig;
  11. use Simps\MQTT\Hex\ReasonCode;
  12. use Swoole\Coroutine;
  13. use Illuminate\Support\Facades\Redis;
  14. class MQTTUserConfig
  15. {   
  16.     const SIMPS_MQTT_REMOTE_HOST = '*';
  17.     const SIMPS_MQTT_PORT = 1883;
  18.     const SIMPS_MQTT_SUBSCRIBE_PORT = 8083;
  19.     const SIMPS_MQTT_USER = 'test*';
  20.     const SIMPS_MQTT_PASSWORD = 'test*';
  21. }
  22. class MqttClientCommand extends Command
  23. {
  24.     protected $signature = 'mqtt:handle {param1}';
  25.     protected $description = '订阅物联网mqtt消息 param1:null 订阅消息, param1:public 发布消息';
  26.     protected  $mqtt ;
  27.     const SWOOLE_MQTT_CONFIG = [
  28.         'open_mqtt_protocol' => true,
  29.         'package_max_length' => 2 * 1024 * 1024,
  30.         'connect_timeout' => 5.0,
  31.         'write_timeout' => 5.0,
  32.         'read_timeout' => 5.0,
  33.     ];
  34.     //模拟设备
  35.     const CLiENT_IDs = [
  36.         'mqttx_devA',
  37.         'mqttx_devB',
  38.         'mqttx_devC',
  39.         'mqttx_devD'
  40.     ];
  41.     public function __construct()
  42.     {
  43.         parent::__construct();
  44.     }
  45.     public function handle()
  46.     {
  47.         $param1 =$this->argument('param1');
  48. //        $param2 =$this->argument('param2');
  49.         if ($param1=='subscribe') {
  50.             $this->info('启动订阅...');
  51.             $this->subscribeMqtt();
  52.         } elseif ($param1=='public') {
  53.             $this->info('启动发布...');
  54.             $this->publishMQTT();
  55.         }
  56.         echo '\r\n\r\n分配工作执行完成!!!';
  57.     }
  58.     protected function getTestMQTT5ConnectConfig()
  59.         {
  60.             $config = new ClientConfig();
  61.             $UserConfig = new MQTTUserConfig();
  62.             return $config->setUserName($UserConfig::SIMPS_MQTT_USER)
  63.                 ->setPassword($UserConfig::SIMPS_MQTT_PASSWORD)
  64.                 ->setClientId(Client::genClientID())
  65.                 ->setKeepAlive(10)
  66.                 ->setDelay(3000) // 3s
  67.                 ->setMaxAttempts(5)
  68.                 ->setProperties([
  69.                     'session_expiry_interval' => 60,
  70.                     'receive_maximum' => 65535,
  71.                     'topic_alias_maximum' => 65535,
  72.                 ])
  73.                 ->setProtocolLevel(5)
  74.                 ->setSwooleConfig( [
  75.                     'open_mqtt_protocol' => true,
  76.                     'package_max_length' => 2 * 1024 * 1024,
  77.                     'connect_timeout' => 5.0,
  78.                     'write_timeout' => 5.0,
  79.                     'read_timeout' => 5.0,
  80.                 ]);
  81.     }
  82.     private function heartbeat($message) {
  83.         if ($message) {
  84.             parse_str($message,$array);
  85.             $device = $array['imei'];
  86.             $hash = ':mqtt:heartbeat:online'.":{$device}";
  87.             Redis::expire($hash,30);  ##30s有效
  88.             Redis::sAdd($hash,1);
  89.         }
  90.     }
  91.     /*
  92.      * 订阅
  93.      *  private function subscribeMqtt(){
  94.         Coroutine\run(function () {
  95.             $client = new Client('39.108.230.87', 1883, $this->getTestMQTT5ConnectConfig());
  96.             ....
  97.      */
  98.     private function subscribeMqtt(){
  99.         Coroutine\run(function () {
  100.             $UserConfig = new MQTTUserConfig();
  101.             $client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, 1883,
  102.             $this->getTestMQTT5ConnectConfig());
  103.             $will = [
  104.                 'topic' => 'simps-mqtt/dinweiyi/delete',
  105.                 'qos' => 1,
  106.                 'retain' => 0,
  107.                 'message' => 'byebye',
  108.                 'properties' => [
  109.                     'will_delay_interval' => 60,
  110.                     'message_expiry_interval' => 60,
  111.                     'content_type' => 'test',
  112.                     'payload_format_indicator' => true, // false 0 1
  113.                 ],
  114.             ];
  115.             $client->connect(true, $will);
  116.             $topics['simps-mqtt/dinweiyi/subscribe_message'] = [
  117.                 'qos' => 2,
  118.                 'no_local' => true,
  119.                 'retain_as_published' => true,
  120.                 'retain_handling' => 2,
  121.             ];
  122.             $res = $client->subscribe($topics);
  123.             $timeSincePing = time();
  124.             var_dump($res);
  125.             echo '\r\n\r\n connect success !!!';
  126.             while (true) {
  127.                 try {
  128.                     $buffer = $client->recv();
  129.                     $message = null;
  130.                     if ($buffer && $buffer !== true) {
  131.                         $message = $buffer["message"];
  132.                         // QoS1 PUBACK
  133.                         if ($buffer['type'] === Types::PUBLISH && $buffer['qos'] === 1) {
  134.                             $client->send(
  135.                                 [
  136.                                     'type' => Types::PUBACK,
  137.                                     'message_id' => $buffer['message_id'],
  138.                                 ],
  139.                                 false
  140.                             );
  141.                         }
  142.                         if ($buffer['type'] === Types::DISCONNECT) {
  143.                             echo sprintf(
  144.                                 "Broker is disconnected, The reason is %s [%d]\n",
  145.                                 ReasonCode::getReasonPhrase($buffer['code']),
  146.                                 $buffer['code']
  147.                             );
  148.                             $client->close($buffer['code']);
  149.                             break;
  150.                         }
  151.                         $reportObj = new DeviceReportController();
  152.                         $ret = $reportObj->store($message);
  153.                         var_dump("182>>>",$ret);
  154.                         unset($reportObj);
  155.                     }
  156.                     if ($timeSincePing <= (time() - $client->getConfig()->getKeepAlive())) {
  157.                         $buffer = $client->ping();
  158.                         if ($buffer) {
  159.                             echo 'send ping success ...' ;
  160.                             $this->heartbeat($message);
  161.                             $timeSincePing = time();
  162.                         }
  163.                     }
  164.                 } catch (\Throwable $e) {
  165.                     throw $e;
  166.                 }
  167.             }
  168.         });
  169.     }
  170.     protected function getMessage() {
  171.         $client_ids = [
  172.             'mqttx_devA',
  173. //            'mqttx_devB',
  174.             'mqttx_devC',
  175.             'mqttx_devD'
  176.         ];
  177.         $message = [];
  178.         $message['clientID'] = self::CLiENT_IDs[array_rand($client_ids)];
  179.         $message['time'] = time();
  180.         $message['location'] = ["x"=>rand(1000,9999),"y"=>rand(1000,9999)];
  181.         return json_encode($message);
  182.     }
  183.     /*
  184.      * 发布
  185.      */
  186.     public function publishMQTT() {
  187.         Coroutine\run(function () {
  188.             $UserConfig = new MQTTUserConfig();
  189.             $client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, $UserConfig::SIMPS_MQTT_PORT,
  190.             $this->getTestMQTT5ConnectConfig());
  191.             $client->connect();
  192.             while (true) {
  193.                 $message = $this->getMessage();
  194.                 $response = $client->publish(
  195.                     'simps-mqtt/user/subscribe_message',
  196.                     $message,
  197.                     1,
  198.                     0,
  199.                     0,
  200.                     [
  201.                         'topic_alias' => 1,
  202.                         'message_expiry_interval' => 12,
  203.                     ]
  204.                 );
  205.                 var_dump( 'publishMQTT>>>',$message);
  206.                 Coroutine::sleep(1);
  207.             }
  208.         });
  209.     }
  210. }
复制代码
3, 代码流程图

使用Mermaid语法描述的上述PHP代码的流程图:
     流程分析:


  • 开始:程序启动。
  • 构造函数 __construct:初始化命令行工具。
  • handle 方法:处理命令行输入。
  • param1 参数:根据输入的参数决定是订阅还是发布。
  • 调用 subscribeMqtt:如果参数是subscribe,则调用此方法。
  • 调用 publishMQTT:如果参数是public,则调用此方法。
  • Coroutine 运行 subscribeMqtt:在协程中运行订阅方法。
  • 创建 MQTT 客户端并毗连:创建MQTT客户端并毗连到服务器。
  • 设置遗嘱消息:设置遗嘱消息,以便在客户端意外断开时发送。
  • 订阅主题:订阅特定的MQTT主题。
  • 吸取消息:持续监听并吸取消息。
  • 处理消息:对吸取到的消息进行处理。
  • 心跳函数 heartbeat:检查设备心跳。
  • 存储消息:将消息存储到数据库或其他存储系统。
  • 是否断开毗连:检查客户端是否断开毗连。
  • 关闭毗连:如果断开,则关闭毗连。
  • Coroutine 运行 publishMQTT:在协程中运行发布方法。
  • 创建 MQTT 客户端并毗连:创建MQTT客户端并毗连到服务器。
  • 循环发布消息:循环发布消息。
  • 获取测试消息:生成要发布的测试消息。
  • 发布消息:将消息发布到MQTT服务器。
  • 结束:程序结束。
配景常驻运行

1,php artisan命令在配景运行


  • 打开您的终端或SSH到您的服务器。
  • 使用nohup命令运行您的Artisan命令进行测试,如下所示
  1. php /www/wwwroot/denwei_laraveladmin/artisan mqtt:handle subscribe
复制代码
3.命令行的php的版本与web php的版本号要同等
2,使用宝塔的保卫历程开启历程


也可以添加保卫历程。
以上2种最好是只选一个
测试

打开emqx web ,在欣赏器输入http://127.0.0.0.1:18083/#/websocket


主题:
主题跟php代码内的主题是同等的。
Payload:
是发出的字符串。由于在测试中遇到json字符串转换失败。所以选择了组装字符格式。
已发送
会出现发布的主题和内容
检查发送的结果

打开数据库,检查device_report表是否成功。成功应下图所示:

实操完成


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

伤心客

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表