乌市泽哥 发表于 2025-4-9 08:25:12

设备通过RabbitMQ接入自建云平台

RabbitMQ开启MQTT协议

    设备端使用的是MQTT协议进行通信,当前系统的基础设施已经有了RabbitMQ,思量到维护,所以通过RabbitMQ来将设备接入到系统中。
    当前RabbitMQ的版本为3.9.16,可通过插件开启MQTT协议,但是当前版本仅支持MQTT3版本协议,较新的RabbitMQ支持MQTT5。

[*]开启MQTT协议插件下令
rabbitmq-plugins enable rabbitmq_mqtt

[*]配置MQTT插件
在RabbitMQ的配置文件中加入MQTT配置,默认的配置文件位置在/etc/rabbitmq/rabbitmq.conf,没有的话可以创建该配置文件。具体配置可检察RabbitMQ官网文档。
# MQTT config
mqtt.exchange = device_mqtt # 指定MQTT协议的交换机,如果不指定就使用默认的交换机
mqtt.tcp.default = 1883 # 设置MQTT监听的端口(默认是1883)
mqtt.allow_anonymous = false # 允许匿名访问(生产环境中通常建议使用用户名和密码)。
mqtt.vhost = / # 设置MQTT虚拟主机。
mqtt.exchange_durable = true # 设置交换机是否持久化。
mqtt.exchange_auto_delete = false # 设置交换机是否在最后一个队列删除后自动删除。

[*]重启RabbitMQ服务
sudo systemctl restart rabbitmq-server
# 或者
sudo service rabbitmq-server restart
自定义客户端毗连鉴权

    由于设备端的4G模块已经封装死了MQTT毗连,不能对暗码生成规则进行更改,所以更改RabbitMQ端的鉴权规则。
    编写RabbitMQ的鉴权规则有一定难度,同时也不够灵活,将鉴权逻辑移到系齐备一的鉴权中央比较符合。RabbitMQ本身是不支持调用外部http接口进行鉴权,可以通过rabbitmq_auth_backend_http插件实现调用外部http接口鉴权。具体配置可以看插件github地址

[*]启用插件
sudo rabbitmq-plugins enable rabbitmq_auth_backend_http

sudo rabbitmq-plugins enable rabbitmq_auth_backend_cache

[*]编辑RabbitMQ配置
修改前面提到的/etc/rabbitmq/rabbitmq.conf配置文件。
# 首先使用cache(缓存)进行认证
auth_backends.1 = cache
# 其次使用RabbitMQ内置认证
auth_backends.2 = internal
# 该配置指定了缓存后端所依赖的实际认证后端为 http。也就是说,当使用 cache 认证后端时,它会从 http 后端获取认证信息并进行缓存。如果缓存中存在有效的认证信息,则直接使用缓存数据,否则会通过 http 后端去获取并更新缓存。
auth_cache.cached_backend = http
# 认证请求类型
auth_http.http_method   = post
# 用于获取用户认证信息的 http 地址
auth_http.user_path = http://localhost:8080/v1/rabbitmqAuth/user
# 用于虚拟主机(vhost)相关认证和授权的 http 地址
auth_http.vhost_path = http://localhost:8080/v1/rabbitmqAuth/vhost
# 资源相关的认证和授权的 http 地址
auth_http.resource_path = http://localhost:8080/v1/rabbitmqAuth/resource
# 主题(topic)相关的认证和授权的 http 地址
auth_http.topic_path = http://localhost:8080/v1/rabbitmqAuth/topic
# 缓存时间,单位毫秒
auth_cache.cache_ttl = 60000

[*]实现认证逻辑
这里以Spring应用为例。也可以检察官方认证模块工程。RabbitMqAuthBackendHttpController类,接口入参检察官方文档。
@Slf4j
@RestController
@RequestMapping(path = "/v1/rabbitmqAuth")
public class RabbitMqAuthBackendHttpController {
    @Autowired
    private RabbitmqAuthService rabbitmqAuthService;

    @ApiOperation("校验用户信息")
    @PostMapping("/user")
    public String user(@RequestParam Map<String,String> userCheck) {
      return rabbitmqAuthService.checkUser(userCheck);
    }
    @ApiOperation("校验vhost")
    @PostMapping("/vhost")
    public String vhost(VirtualHostCheck check) {
      return rabbitmqAuthService.checkVhost(check);
    }
    @ApiOperation("校验资源")
    @PostMapping("/resource")
    public String resource(ResourceCheck check) {
      return rabbitmqAuthService.checkResource(check);
    }
    @ApiOperation("校验Topic")
    @PostMapping("/topic")
    public String topic(TopicCheck check) {
      return rabbitmqAuthService.checkTopic(check);
    }
}
    鉴权核心逻辑在RabbitmqAuthServiceImpl类
@Slf4j
@Service
public class RabbitmqAuthServiceImpl implements RabbitmqAuthService {
    /**
   * 校验成功
   */
    private static final String SUCCESS="allow";
    /**
   * 校验失败
   */
    private static final String FAIL="deny";
    @Autowired
    private IDeviceRabbitmqAuthService deviceRabbitmqAuthService;
    /**
   * 校验RabbitMQ用户
   * @param userCheck
   * @return
   * <ul>
   *   <li>deny:拒绝访问 user / vhost / resource</li>
   *   <li>allow:允许访问user / vhost / resource</li>
   * </ul>
   */
    @Override
    public String checkUser(Map<String, String> userCheck) {
      String username = userCheck.get("username");
      if (ObjectUtil.isNotEmpty(username) && !username.contains("&")){
            // 业务需要,只有部分账号的校验才需要http远程调用
            return FAIL;
      }
      String password=userCheck.get("password");
      String vhost=userCheck.get("vhost");
      String clientId= userCheck.get("client_id");
      // 加载配置的账号信息
      DeviceRabbitmqAuth deviceRabbitmqAuth = deviceRabbitmqAuthService.selectDeviceRabbitmqAuthByDevice(username);
      // 验证设备连接
      checkDeviceConnect(password,clientId,deviceRabbitmqAuth);
      // 转换权限
      List<String> tags = RabbitMQTagsEnums.permissionCovertTag(deviceRabbitmqAuth.getPermission());
      // 校验成功
      return SUCCESS+" " + StringUtils.collectionToDelimitedString(tags, " ");
    }

    /**
   * 校验设备端连接
   * @param password 设备连接的密码
   * @param clientId 设备连接的clientId
   * @param deviceRabbitmqAuth 存储的设备信息
   * @return
   */
    public void checkDeviceConnect(String password,String clientId,DeviceRabbitmqAuth deviceRabbitmqAuth) {
      if (deviceRabbitmqAuth==null){
            throw new RabbitmqAuthException("未找到连接信息");
      }
      // TDO 不同4G模块使用加密算法、加密方式不同,根据设备自定义账号校验逻辑
    }
    /**
   * 校验vhost
   * @param virtualHostCheck
   * @return
   * <ul>
   *   <li>deny:拒绝访问 user / vhost / resource</li>
   *   <li>allow:允许访问user / vhost / resource</li>
   * </ul>
   */
    @Override
    public String checkVhost(VirtualHostCheck virtualHostCheck) {
      log.debug("RabbitMqCheck 校验vhost:{}", JSONObject.toJSONString(virtualHostCheck));
      return SUCCESS;
    }
    /**
   * 资源认证
   * @param resourceCheck
   * @return
   * <ul>
   *   <li>deny:拒绝访问 user / vhost / resource</li>
   *   <li>allow:允许访问user / vhost / resource</li>
   * </ul>
   */
    @Override
    public String checkResource(ResourceCheck resourceCheck) {
      log.debug("RabbitMqCheck 校验资源:{}", JSONObject.toJSONString(resourceCheck));
      return SUCCESS;
    }
    /**
   * 主题认证
   * @param topicCheck
   * @return
   * <ul>
   *   <li>deny:拒绝访问 user / vhost / resource</li>
   *   <li>allow:允许访问user / vhost / resource</li>
   * </ul>
   */
    @Override
    public String checkTopic(TopicCheck topicCheck) {
      log.debug("RabbitMqCheck 校验Topic:{}", JSONObject.toJSONString(topicCheck));
      return SUCCESS;
    }
}


[*]重启RabbitMQ服务
sudo systemctl restart rabbitmq-server
# 或者
sudo service rabbitmq-server restart
RabbitMQ鉴权流程:
https://i-blog.csdnimg.cn/direct/1ef5d7760df2485eb20e020cd9a11527.png
设备接入系统

设备与系统通信流程:
https://i-blog.csdnimg.cn/direct/9f3ac2a93847419e893f9e2bda2c0d3d.png
    设备端使用MQTT协议与系统进行交互。互换时机将MQTT协议数据转换为AMQP协议数据转发到绑定的队列。
   由于一些缘故原由,服务器端监听MQTT协议的数据时,服务器端发送MQTT协议的数据,出现了多个监听队列都收到消息了。所以服务器端监听AMQP协议的数据,只有发送时才使用MQTT协议。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 设备通过RabbitMQ接入自建云平台