设备通过RabbitMQ接入自建云平台

乌市泽哥  论坛元老 | 2025-4-9 08:25:12 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1847|帖子 1847|积分 5541

RabbitMQ开启MQTT协议

    设备端使用的是MQTT协议进行通信,当前系统的基础设施已经有了RabbitMQ,思量到维护,所以通过RabbitMQ来将设备接入到系统中。
    当前RabbitMQ的版本为3.9.16,可通过插件开启MQTT协议,但是当前版本仅支持MQTT3版本协议,较新的RabbitMQ支持MQTT5。

  • 开启MQTT协议插件下令
  1. rabbitmq-plugins enable rabbitmq_mqtt
复制代码

  • 配置MQTT插件
    在RabbitMQ的配置文件中加入MQTT配置,默认的配置文件位置在/etc/rabbitmq/rabbitmq.conf,没有的话可以创建该配置文件。具体配置可检察RabbitMQ官网文档。
  1. # MQTT config
  2. mqtt.exchange = device_mqtt # 指定MQTT协议的交换机,如果不指定就使用默认的交换机
  3. mqtt.tcp.default = 1883 # 设置MQTT监听的端口(默认是1883)
  4. mqtt.allow_anonymous = false # 允许匿名访问(生产环境中通常建议使用用户名和密码)。
  5. mqtt.vhost = / # 设置MQTT虚拟主机。
  6. mqtt.exchange_durable = true # 设置交换机是否持久化。
  7. mqtt.exchange_auto_delete = false # 设置交换机是否在最后一个队列删除后自动删除。
复制代码

  • 重启RabbitMQ服务
  1. sudo systemctl restart rabbitmq-server
  2. # 或者
  3. sudo service rabbitmq-server restart
复制代码
自定义客户端毗连鉴权

    由于设备端的4G模块已经封装死了MQTT毗连,不能对暗码生成规则进行更改,所以更改RabbitMQ端的鉴权规则。
    编写RabbitMQ的鉴权规则有一定难度,同时也不够灵活,将鉴权逻辑移到系齐备一的鉴权中央比较符合。RabbitMQ本身是不支持调用外部http接口进行鉴权,可以通过rabbitmq_auth_backend_http插件实现调用外部http接口鉴权。具体配置可以看插件github地址

  • 启用插件
  1. sudo rabbitmq-plugins enable rabbitmq_auth_backend_http
  2. sudo rabbitmq-plugins enable rabbitmq_auth_backend_cache
复制代码

  • 编辑RabbitMQ配置
    修改前面提到的/etc/rabbitmq/rabbitmq.conf配置文件。
  1. # 首先使用cache(缓存)进行认证
  2. auth_backends.1 = cache
  3. # 其次使用RabbitMQ内置认证
  4. auth_backends.2 = internal
  5. # 该配置指定了缓存后端所依赖的实际认证后端为 http。也就是说,当使用 cache 认证后端时,它会从 http 后端获取认证信息并进行缓存。如果缓存中存在有效的认证信息,则直接使用缓存数据,否则会通过 http 后端去获取并更新缓存。
  6. auth_cache.cached_backend = http
  7. # 认证请求类型
  8. auth_http.http_method   = post
  9. # 用于获取用户认证信息的 http 地址
  10. auth_http.user_path = http://localhost:8080/v1/rabbitmqAuth/user
  11. # 用于虚拟主机(vhost)相关认证和授权的 http 地址
  12. auth_http.vhost_path = http://localhost:8080/v1/rabbitmqAuth/vhost
  13. # 资源相关的认证和授权的 http 地址
  14. auth_http.resource_path = http://localhost:8080/v1/rabbitmqAuth/resource
  15. # 主题(topic)相关的认证和授权的 http 地址
  16. auth_http.topic_path = http://localhost:8080/v1/rabbitmqAuth/topic
  17. # 缓存时间,单位毫秒
  18. auth_cache.cache_ttl = 60000
复制代码

  • 实现认证逻辑
    这里以Spring应用为例。也可以检察官方认证模块工程。RabbitMqAuthBackendHttpController类,接口入参检察官方文档。
  1. @Slf4j
  2. @RestController
  3. @RequestMapping(path = "/v1/rabbitmqAuth")
  4. public class RabbitMqAuthBackendHttpController {
  5.     @Autowired
  6.     private RabbitmqAuthService rabbitmqAuthService;
  7.     @ApiOperation("校验用户信息")
  8.     @PostMapping("/user")
  9.     public String user(@RequestParam Map<String,String> userCheck) {
  10.         return rabbitmqAuthService.checkUser(userCheck);
  11.     }
  12.     @ApiOperation("校验vhost")
  13.     @PostMapping("/vhost")
  14.     public String vhost(VirtualHostCheck check) {
  15.         return rabbitmqAuthService.checkVhost(check);
  16.     }
  17.     @ApiOperation("校验资源")
  18.     @PostMapping("/resource")
  19.     public String resource(ResourceCheck check) {
  20.         return rabbitmqAuthService.checkResource(check);
  21.     }
  22.     @ApiOperation("校验Topic")
  23.     @PostMapping("/topic")
  24.     public String topic(TopicCheck check) {
  25.         return rabbitmqAuthService.checkTopic(check);
  26.     }
  27. }
复制代码
    鉴权核心逻辑在RabbitmqAuthServiceImpl类
  1. @Slf4j
  2. @Service
  3. public class RabbitmqAuthServiceImpl implements RabbitmqAuthService {
  4.     /**
  5.      * 校验成功
  6.      */
  7.     private static final String SUCCESS="allow";
  8.     /**
  9.      * 校验失败
  10.      */
  11.     private static final String FAIL="deny";
  12.     @Autowired
  13.     private IDeviceRabbitmqAuthService deviceRabbitmqAuthService;
  14.     /**
  15.      * 校验RabbitMQ用户
  16.      * @param userCheck
  17.      * @return
  18.      * <ul>
  19.      *     <li>deny:拒绝访问 user / vhost / resource</li>
  20.      *     <li>allow:允许访问user / vhost / resource</li>
  21.      * </ul>
  22.      */
  23.     @Override
  24.     public String checkUser(Map<String, String> userCheck) {
  25.         String username = userCheck.get("username");
  26.         if (ObjectUtil.isNotEmpty(username) && !username.contains("&")){
  27.             // 业务需要,只有部分账号的校验才需要http远程调用
  28.             return FAIL;
  29.         }
  30.         String password=  userCheck.get("password");
  31.         String vhost=  userCheck.get("vhost");
  32.         String clientId= userCheck.get("client_id");
  33.         // 加载配置的账号信息
  34.         DeviceRabbitmqAuth deviceRabbitmqAuth = deviceRabbitmqAuthService.selectDeviceRabbitmqAuthByDevice(username);
  35.         // 验证设备连接
  36.         checkDeviceConnect(password,clientId,deviceRabbitmqAuth);
  37.         // 转换权限
  38.         List<String> tags = RabbitMQTagsEnums.permissionCovertTag(deviceRabbitmqAuth.getPermission());
  39.         // 校验成功
  40.         return SUCCESS+" " + StringUtils.collectionToDelimitedString(tags, " ");
  41.     }
  42.     /**
  43.      * 校验设备端连接
  44.      * @param password 设备连接的密码
  45.      * @param clientId 设备连接的clientId
  46.      * @param deviceRabbitmqAuth 存储的设备信息
  47.      * @return
  48.      */
  49.     public void checkDeviceConnect(String password,String clientId,DeviceRabbitmqAuth deviceRabbitmqAuth) {
  50.         if (deviceRabbitmqAuth==null){
  51.             throw new RabbitmqAuthException("未找到连接信息");
  52.         }
  53.         // TDO 不同4G模块使用加密算法、加密方式不同,根据设备自定义账号校验逻辑
  54.     }
  55.     /**
  56.      * 校验vhost
  57.      * @param virtualHostCheck
  58.      * @return
  59.      * <ul>
  60.      *     <li>deny:拒绝访问 user / vhost / resource</li>
  61.      *     <li>allow:允许访问user / vhost / resource</li>
  62.      * </ul>
  63.      */
  64.     @Override
  65.     public String checkVhost(VirtualHostCheck virtualHostCheck) {
  66.         log.debug("RabbitMqCheck 校验vhost:{}", JSONObject.toJSONString(virtualHostCheck));
  67.         return SUCCESS;
  68.     }
  69.     /**
  70.      * 资源认证
  71.      * @param resourceCheck
  72.      * @return
  73.      * <ul>
  74.      *     <li>deny:拒绝访问 user / vhost / resource</li>
  75.      *     <li>allow:允许访问user / vhost / resource</li>
  76.      * </ul>
  77.      */
  78.     @Override
  79.     public String checkResource(ResourceCheck resourceCheck) {
  80.         log.debug("RabbitMqCheck 校验资源:{}", JSONObject.toJSONString(resourceCheck));
  81.         return SUCCESS;
  82.     }
  83.     /**
  84.      * 主题认证
  85.      * @param topicCheck
  86.      * @return
  87.      * <ul>
  88.      *     <li>deny:拒绝访问 user / vhost / resource</li>
  89.      *     <li>allow:允许访问user / vhost / resource</li>
  90.      * </ul>
  91.      */
  92.     @Override
  93.     public String checkTopic(TopicCheck topicCheck) {
  94.         log.debug("RabbitMqCheck 校验Topic:{}", JSONObject.toJSONString(topicCheck));
  95.         return SUCCESS;
  96.     }
  97. }
复制代码

  • 重启RabbitMQ服务
  1. sudo systemctl restart rabbitmq-server
  2. # 或者
  3. sudo service rabbitmq-server restart
复制代码
RabbitMQ鉴权流程:

设备接入系统

设备与系统通信流程:

    设备端使用MQTT协议与系统进行交互。互换时机将MQTT协议数据转换为AMQP协议数据转发到绑定的队列。
   由于一些缘故原由,服务器端监听MQTT协议的数据时,服务器端发送MQTT协议的数据,出现了多个监听队列都收到消息了。所以服务器端监听AMQP协议的数据,只有发送时才使用MQTT协议。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

乌市泽哥

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表