RabbitMQ开启MQTT协议
设备端使用的是MQTT协议进行通信,当前系统的基础设施已经有了RabbitMQ,思量到维护,所以通过RabbitMQ来将设备接入到系统中。
当前RabbitMQ的版本为3.9.16,可通过插件开启MQTT协议,但是当前版本仅支持MQTT3版本协议,较新的RabbitMQ支持MQTT5。
- rabbitmq-plugins enable rabbitmq_mqtt
复制代码
- 配置MQTT插件
在RabbitMQ的配置文件中加入MQTT配置,默认的配置文件位置在/etc/rabbitmq/rabbitmq.conf,没有的话可以创建该配置文件。具体配置可检察RabbitMQ官网文档。
- # MQTT config
- mqtt.exchange = device_mqtt # 指定MQTT协议的交换机,如果不指定就使用默认的交换机
- mqtt.tcp.default = 1883 # 设置MQTT监听的端口(默认是1883)
- mqtt.allow_anonymous = false # 允许匿名访问(生产环境中通常建议使用用户名和密码)。
- mqtt.vhost = / # 设置MQTT虚拟主机。
- mqtt.exchange_durable = true # 设置交换机是否持久化。
- mqtt.exchange_auto_delete = false # 设置交换机是否在最后一个队列删除后自动删除。
复制代码- sudo systemctl restart rabbitmq-server
- # 或者
- sudo service rabbitmq-server restart
复制代码 自定义客户端毗连鉴权
由于设备端的4G模块已经封装死了MQTT毗连,不能对暗码生成规则进行更改,所以更改RabbitMQ端的鉴权规则。
编写RabbitMQ的鉴权规则有一定难度,同时也不够灵活,将鉴权逻辑移到系齐备一的鉴权中央比较符合。RabbitMQ本身是不支持调用外部http接口进行鉴权,可以通过rabbitmq_auth_backend_http插件实现调用外部http接口鉴权。具体配置可以看插件github地址
- sudo rabbitmq-plugins enable rabbitmq_auth_backend_http
- sudo rabbitmq-plugins enable rabbitmq_auth_backend_cache
复制代码
- 编辑RabbitMQ配置
修改前面提到的/etc/rabbitmq/rabbitmq.conf配置文件。
- # 首先使用cache(缓存)进行认证
- auth_backends.1 = cache
- # 其次使用RabbitMQ内置认证
- auth_backends.2 = internal
- # 该配置指定了缓存后端所依赖的实际认证后端为 http。也就是说,当使用 cache 认证后端时,它会从 http 后端获取认证信息并进行缓存。如果缓存中存在有效的认证信息,则直接使用缓存数据,否则会通过 http 后端去获取并更新缓存。
- auth_cache.cached_backend = http
- # 认证请求类型
- auth_http.http_method = post
- # 用于获取用户认证信息的 http 地址
- auth_http.user_path = http://localhost:8080/v1/rabbitmqAuth/user
- # 用于虚拟主机(vhost)相关认证和授权的 http 地址
- auth_http.vhost_path = http://localhost:8080/v1/rabbitmqAuth/vhost
- # 资源相关的认证和授权的 http 地址
- auth_http.resource_path = http://localhost:8080/v1/rabbitmqAuth/resource
- # 主题(topic)相关的认证和授权的 http 地址
- auth_http.topic_path = http://localhost:8080/v1/rabbitmqAuth/topic
- # 缓存时间,单位毫秒
- auth_cache.cache_ttl = 60000
复制代码
- 实现认证逻辑
这里以Spring应用为例。也可以检察官方认证模块工程。RabbitMqAuthBackendHttpController类,接口入参检察官方文档。
- @Slf4j
- @RestController
- @RequestMapping(path = "/v1/rabbitmqAuth")
- public class RabbitMqAuthBackendHttpController {
- @Autowired
- private RabbitmqAuthService rabbitmqAuthService;
- @ApiOperation("校验用户信息")
- @PostMapping("/user")
- public String user(@RequestParam Map<String,String> userCheck) {
- return rabbitmqAuthService.checkUser(userCheck);
- }
- @ApiOperation("校验vhost")
- @PostMapping("/vhost")
- public String vhost(VirtualHostCheck check) {
- return rabbitmqAuthService.checkVhost(check);
- }
- @ApiOperation("校验资源")
- @PostMapping("/resource")
- public String resource(ResourceCheck check) {
- return rabbitmqAuthService.checkResource(check);
- }
- @ApiOperation("校验Topic")
- @PostMapping("/topic")
- public String topic(TopicCheck check) {
- return rabbitmqAuthService.checkTopic(check);
- }
- }
复制代码 鉴权核心逻辑在RabbitmqAuthServiceImpl类
- @Slf4j
- @Service
- public class RabbitmqAuthServiceImpl implements RabbitmqAuthService {
- /**
- * 校验成功
- */
- private static final String SUCCESS="allow";
- /**
- * 校验失败
- */
- private static final String FAIL="deny";
- @Autowired
- private IDeviceRabbitmqAuthService deviceRabbitmqAuthService;
- /**
- * 校验RabbitMQ用户
- * @param userCheck
- * @return
- * <ul>
- * <li>deny:拒绝访问 user / vhost / resource</li>
- * <li>allow:允许访问user / vhost / resource</li>
- * </ul>
- */
- @Override
- public String checkUser(Map<String, String> userCheck) {
- String username = userCheck.get("username");
- if (ObjectUtil.isNotEmpty(username) && !username.contains("&")){
- // 业务需要,只有部分账号的校验才需要http远程调用
- return FAIL;
- }
- String password= userCheck.get("password");
- String vhost= userCheck.get("vhost");
- String clientId= userCheck.get("client_id");
- // 加载配置的账号信息
- DeviceRabbitmqAuth deviceRabbitmqAuth = deviceRabbitmqAuthService.selectDeviceRabbitmqAuthByDevice(username);
- // 验证设备连接
- checkDeviceConnect(password,clientId,deviceRabbitmqAuth);
- // 转换权限
- List<String> tags = RabbitMQTagsEnums.permissionCovertTag(deviceRabbitmqAuth.getPermission());
- // 校验成功
- return SUCCESS+" " + StringUtils.collectionToDelimitedString(tags, " ");
- }
- /**
- * 校验设备端连接
- * @param password 设备连接的密码
- * @param clientId 设备连接的clientId
- * @param deviceRabbitmqAuth 存储的设备信息
- * @return
- */
- public void checkDeviceConnect(String password,String clientId,DeviceRabbitmqAuth deviceRabbitmqAuth) {
- if (deviceRabbitmqAuth==null){
- throw new RabbitmqAuthException("未找到连接信息");
- }
- // TDO 不同4G模块使用加密算法、加密方式不同,根据设备自定义账号校验逻辑
- }
- /**
- * 校验vhost
- * @param virtualHostCheck
- * @return
- * <ul>
- * <li>deny:拒绝访问 user / vhost / resource</li>
- * <li>allow:允许访问user / vhost / resource</li>
- * </ul>
- */
- @Override
- public String checkVhost(VirtualHostCheck virtualHostCheck) {
- log.debug("RabbitMqCheck 校验vhost:{}", JSONObject.toJSONString(virtualHostCheck));
- return SUCCESS;
- }
- /**
- * 资源认证
- * @param resourceCheck
- * @return
- * <ul>
- * <li>deny:拒绝访问 user / vhost / resource</li>
- * <li>allow:允许访问user / vhost / resource</li>
- * </ul>
- */
- @Override
- public String checkResource(ResourceCheck resourceCheck) {
- log.debug("RabbitMqCheck 校验资源:{}", JSONObject.toJSONString(resourceCheck));
- return SUCCESS;
- }
- /**
- * 主题认证
- * @param topicCheck
- * @return
- * <ul>
- * <li>deny:拒绝访问 user / vhost / resource</li>
- * <li>allow:允许访问user / vhost / resource</li>
- * </ul>
- */
- @Override
- public String checkTopic(TopicCheck topicCheck) {
- log.debug("RabbitMqCheck 校验Topic:{}", JSONObject.toJSONString(topicCheck));
- return SUCCESS;
- }
- }
复制代码- sudo systemctl restart rabbitmq-server
- # 或者
- sudo service rabbitmq-server restart
复制代码 RabbitMQ鉴权流程:
设备接入系统
设备与系统通信流程:
设备端使用MQTT协议与系统进行交互。互换时机将MQTT协议数据转换为AMQP协议数据转发到绑定的队列。
由于一些缘故原由,服务器端监听MQTT协议的数据时,服务器端发送MQTT协议的数据,出现了多个监听队列都收到消息了。所以服务器端监听AMQP协议的数据,只有发送时才使用MQTT协议。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |