部门老大狂问我:RabbitMQ多斲丧者序次性斲丧消息实现 ...

打印 上一主题 下一主题

主题 343|帖子 343|积分 1029



迩来起了个项目消息中心,用来中转各个系统中产生的消息,用到的是RabbitMQ,由于UAT情况、生产情况每台斲丧者服务都是多台,有些消息要求按序次斲丧,以是须要采取肯定的措施包管消息的序次斲丧,下面讲下我们不断优化的三种方法:
1、我们最开始思量的比较简朴,采用的direct交换机,指定特定斲丧者服务器监听队列,其他斲丧者服务器不监听。好比如今有C1、C2、C3三台斲丧者机器,我们决定C1斲丧消息,C2、C3不监听。我们在启动C1的时间,启动脚本中添加C1_IP,在代码中做处置惩罚,斲丧者服务器启动时,如果当前服务器IP就是启动脚本的C1_IP,那就会由这台C1来监听并斲丧消息。这种方式有个单点故障问题,如果C1服务器宕机,那么整个消息中心剩余两个节点都无法斲丧这个队列,导致队列消息堆积。如果有丰富的监控措施,那么监控到C1宕机后,可通过手动配置C2_IP(或者C3_IP)到启动脚本,重启C2服务器(C3服务器)斲丧消息。
2、为了办理单点故障问题,我们采用了fanout交换机,每个斲丧者创建一个专用的queue,如许如果生产者产生两条有先后序次的消息m1和m2(它们有公共的批次号batchNo和唯一的消息编号msgID),就会给每个queue都推送,如下图所示。同时斲丧者斲丧的时间须要配合数据库共同实施,斲丧者监听到消息后就入库(落库内容包括m1消息信息和斲丧者IP),根据msgID唯一索引性如果入库了则本身抛弃消息,斲丧m2时,须要从库表中取出m1的斲丧者IP是否是当前IP,如果不是则抛弃消息。但是这个方案有个缺点:如果consumer1斲丧了m1后挂掉了,m2只能比及consumer1正常后才气斲丧,无法转移到其他斲丧者进行斲丧,如许会对一些业务场景不友好(当然这个地方可以思量死信交换机死信队列进行转移,只不过架构更复杂了)。


3.第三种方式跟第二种类似,采用fanout交换机,每个斲丧者创建一个专用的queue。但是没有借助数据库,而是通过访问rabbitMQ的API接口,获取这三个队列的所有斲丧者的IP放到list中,斲丧者监听到消息后,判定本身的ip是否是ip聚集里面的最小值,如果是则斲丧,如果否则抛弃消息。一旦最小IP的斲丧者宕机后,则list种就会只剩下两个IP,后续的消息选定的斲丧者就会从这两个IP中选择最小IP斲丧。同理它也有第二种方案的缺点。
最后附上通过rabbitmq的api获取minIP的代码(入参consumerIps是初始size=0的list),如下:
  1. private String findUsefulMinIP(List<String> consumerIps) {
  2.         String minIp = null;
  3.         SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
  4.         requestFactory.setConnectTimeout(20000);
  5.         try {
  6.             RestTemplate rest = new RestTemplateBuilder().basicAuthentication(username, password).build();
  7.             rest.setRequestFactory(requestFactory);
  8.             JSONArray result2 = rest.getForObject(moccMQApiUrl, JSONArray.class);
  9.             if(result2 != null && result2.size() > 0) {
  10.                 log.info("===clear the ips===new query start===");
  11.                 consumerIps.clear();
  12.             }
  13.             for(int m=0; m<result2.size(); m++) {
  14.                 LinkedHashMap itmap = (LinkedHashMap) result2.get(m);
  15.                 LinkedHashMap queueMap = (LinkedHashMap)itmap.get("queue");
  16.                 if(!queueMap.values().stream().anyMatch(v -> v.toString().indexOf(moccQueue)>=0)) {
  17.                    continue;
  18.                 }
  19.                
  20.                 LinkedHashMap consumerMap = (LinkedHashMap)itmap.get("channel_details");
  21.                 consumerIps.add((String)consumerMap.get("peer_host"));
  22.             }
  23.             log.info("===query from mq===consumerIps={}", consumerIps);
  24.         } catch (RestClientException e) {
  25.             log.error(e.getMessage(), e);
  26.         }
  27.         minIp = Collections.min(consumerIps);
  28.         return minIp;
  29.     }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

星球的眼睛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表