thinkphp8 使用think-worker 单服务器同时可使用tcp(Gateway)websocket和 ...

打印 上一主题 下一主题

主题 802|帖子 802|积分 2406

1.安装tp8
  1. composer create-project topthink/think tp
复制代码
2.安装tp集成的workerman:(自带GatewayWorker)
  1. composer require topthink/think-worker
复制代码
3.安装GatewayClient(TCP绑定设备下发指令用)
  1. composer require workerman/gatewayclient
复制代码
4.安装workerman/mqtt:
  1. composer require workerman/mqtt
复制代码
  介绍:GatewayWorker使用经典的Gateway和Worker进程模子。Gateway进程负责维持客户端毗连,并转发客户端的数据给BusinessWorker进程处理,BusinessWorker进程负责处理实际的业务逻辑(默认调用Events.php处理业务),并将结果推送给对应的客户端。Gateway服务和BusinessWorker服务可以分开部署在不同的服务器上,实现分布式集群。
  要注意的几个点:
1.businessWorker Events里onWorkerStart启动时同时毗连mqtt服务器 :
php think worker:gateway
  1. namespace app\worker;
  2. use GatewayWorker\Lib\Gateway;
  3. use Workerman\Worker;
  4. use think\facade\Db;
  5. use Workerman\Mqtt\Client as MqttClient;
  6. use Workerman\Lib\Timer;
  7. use think\facade\Log;
  8. use app\model\Company as CompanyModel;
  9. /**
  10. * Worker 命令行服务类
  11. */
  12. class TcpEvents
  13. {
  14.     private static $mqtt = null;
  15.     /**
  16.      * @var true
  17.      */
  18.     private static bool $mqtt_connected;
  19.     /**
  20.      * onWorkerStart 事件回调
  21.      * 当businessWorker进程启动时触发。每个进程生命周期内都只会触发一次
  22.      *
  23.      * @access public
  24.      * @param  \Workerman\Worker    $businessWorker
  25.      * @return void
  26.      */
  27.     public static function onWorkerStart(Worker $businessWorker)
  28.     {
  29.         $options = [
  30.             'username' => 'MQTT',
  31.             'password' => 'MQTTPW',
  32.         ];
  33.         $mqtt = new MqttClient('mqtt://0.0.0.0:1883',$options);
  34.         $mqtt->onConnect = function($mqtt) {
  35.             $mqtt->subscribe('YK/#');
  36.             self::$mqtt_connected = true;//mqtt连接成功 下面判断用
  37.         };
  38.         $mqtt->onMessage = function($topic, $content, $mqtt){
  39.             var_dump($topic.'主题信息:', $content);
  40.                         if($content == 'getParam'){
  41.                                 $arr = array('topic'=>'TPR/869219071430769/setParam', 'content'=>'{"FH":"1.200","FL":"0.001","TH":"50.0","TL":"0.0","AT":"200","CT":"3","FF":"1.600","TF":"5.0","FB":"0.1","TB":"2.0","BR":"0"}');
  42.                                 $mqtt->publish($arr['topic'], $arr['content']);
  43.                                
  44.                         }
  45.             
  46.             Log::write('MQTT设备发送的信息:'.$topic,'notice');
  47.         };
  48.         $mqtt->connect();
  49.         static::$mqtt = $mqtt;//mqtt对象下面用
  50.         // Gateway::sendToAll('hello');
  51.         //$app = new Application;
  52.         //$app->initialize();
  53.         Log::write('TCP设备发送的信息:','notice');
  54.     }
复制代码
说下TCP和websocket 以及Mqtt实现客户端UID的绑定和下发指令

常用两种方式绑定uid
一、第一种方式 通过GatewayClient(websocket常用 物联网设备毗连时不能如许)
1.网站页面创建与GatewayWorker的websocket毗连
2. GatewayWorker发现有页面发起毗连时,将对应毗连的client_id发给网站页面
3. 网站页面收到client_id后触发一个ajax请求(假设是bind.php)将client_id发到tp后端
4. mvc后端bind.php收到client_id后使用GatewayClient调用Gateway::bindUid($client_id,                                    u                         i                         d                         )                         将                         c                         l                         i                         e                         n                                   t                            i                                  d                         与当前                         u                         i                         d                         (                         用户                         i                         d                         大概客户端唯一标识                         )                         绑定。如果有群组、群发功能,也可以使用                         G                         a                         t                         e                         w                         a                         y                         :                         :                         j                         o                         i                         n                         G                         r                         o                         u                         p                         (                              uid)将client_id与当前uid(用户id大概客户端唯一标识)绑定。如果有群组、群发功能,也可以使用Gateway::joinGroup(                  uid)将clienti​d与当前uid(用户id大概客户端唯一标识)绑定。如果有群组、群发功能,也可以使用Gateway::joinGroup(client_id, $group_id)将client_id参加到对应分组
二、第二种方式(Gatewayworker) 大概更多用于物联网设备绑定
1、网站页面创建与GatewayWorker的websocket毗连(设备创建TCP大概MQtt毗连)
2、js把网站携带的uid发送到websocket服务器 socket.send(uid);(设备自动上报信息携带其序列号 Mqtt每个设备发布主题带设备编号 必如 Device/{id}/+)
3、businessWorker Events触发onMessage变乱 调用Gateway::bindUid 实现uid和clientid绑定 以便下一步发送指令(TCP根据设备上报的内容获得设备编号 Mqtt根据主题解析出设备编号)
同样也常用两种方式下发指令
一、第一种通过GatewayClient (Mqtt不能用这个方式,Mqtt是通过发布主题实现命令下发)
前面绑定后 tp框架处理业务过程中需要向某个uid大概某个群组发送数据时,直接调用GatewayClient的接口Gateway::sendToUid Gateway::sendToGroup 等发送即可
二、通过直接毗连Gatewayworker作为一个客户端去和别的客户端通信 实在就是实现GatewayClient (Websocket如果实现PHP后端下发指令不大好操作,js很方便 )
1、thinkphp 毗连tcp 给gatewayworker发送信息 以Mqtt发布主题为例
  1. $client = stream_socket_client('tcp://127.0.0.1:2348', $errno, $errmsg, 30);
  2. $data = array('topic'=>'869219071430769/set', 'content'=>'{"FH":"1.200","FL":"0.001","TH":"50.0","TL":"0.0","AT":"2","CT":"3","FF":"1.600","TF":"5.0","FB":"0.1","TB":"2.0","BR":"0"}','options'=>array('qos'=>0,'retain'=>0));
  3. // 发送数据,
  4.         fwrite($client, json_encode($data)."\n");
  5. // 读取推送结果
  6.         echo fread($client, 8192);
复制代码
2.gatewayworker会转发给businessWorker 处理, Events触发onMessage变乱,调用Gateway::sendToUid 给客户端发送指令 (Mqtt是发送主题设备订阅$mqtt->publish(‘topic’,‘content’)
   不管用那种方式绑定了客户端和uid 都可以使用GatewayClient给uid发送指令,我们的目的就是为了用GatewayClient 所以建议只管用第一种方式,Mqtt没办法只能用第二种,固然也可以和官方那样重新开个worker单独毗连mqtt服务器去发布主题 大概配合别的phpMqttclient扩展,不用worker也可以实现毗连Mqtt服务器下发指令
  注意:一个Gatewayworker实例 不能同时开TCP和websocket ,如果单台服务器需要同时支持TCP和websocket 需要开两个 Gatewayworker实例,下面是步调 (固然也可以用一个Gatewayworker同时开TCP+mqq,再用一个workerman单开Websocket 如许应该更简单些,而且防止端口冲突,但是我是为了都用GatewayClient的api所以如许操作)

1、topthink/think-worker/src/command/GatewayWorker.php 复制一份改名为 GatewayWorker2.php
内里的类名改为GatewayWorker2 调用配置文件 改为
  1. $option = Config::get('gateway_worker2');
复制代码
2、config里的gateway_worker 复制一份改名为gateway_worker2 修改内里的配置(一个为tcp一个为websocket) port registerAddress端口 startPort eventHandler name等都根据本身需求改下,特殊注意,增长配置:
  1. 'pidFile' => app()->getRuntimePath().'gateway_worker2.pid',
复制代码
因为tp集成的think-worker版本比力低默认的pidFile是固定的 两个Gatewayworker实例必须两个pidFile
3.修改命令:topthink/think-worker/src/Service.php 增长一条
  1. 'worker:gateway2' => '\\think\\worker\\command\\GatewayWorker2',
复制代码
另有一个注意事项,因为用了thinkphp集成的workerman(think-worker)版本是3.5不是最新的 ,使用workerman/mqtt有点问题,定时器类引用位置要改下,暂时没发现别的问题

  1. namespace Workerman\Mqtt;
  2. use \Workerman\Connection\AsyncTcpConnection;
  3. use Workerman\Mqtt\Consts\MQTTConst;
  4. use Workerman\Mqtt\Consts\ReasonCodeConst;
  5. use \Workerman\Protocols\mqtt;
  6. use \Workerman\Lib\Timer;//这里要改
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

兜兜零元

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表