灌篮少年 发表于 2024-8-7 03:31:37

RabbitMQ定义的MQ多个consumer重复消耗同一条消息

RabbitMQ定义的MQ多个consumer重复消耗同一条消息

@author:shengfq
@date:2024-07-09
@category: 数据同等性
@title:RabbitMQ定义的MQ多个consumer重复消耗同一条消息
系统环境

direct范例的Exchange,通过routingKey:START/CONFORM绑定到业务队列Queue: START queue.xxx.qm.operationStart 工序开工质量监听队列 CONFORM queue.xxx.qm.operationEnd 工序完工质量监听队列 由于有多个qm服务实例,都监听了上述的queue,以是就有多个consumer. 问题描述

【2010工厂,生产环境,订单数据】 订单号 880006861916 订单状态 130-在制 订单范例 ZF01-成品订单 物料编码 130901000711B 同一个订单同一个工序生成了2个任务(查验计划里面也只有一个工序的查验规格呢) [图片] TKE日志如下,根本上通过requestId可以判断是同一条消息被多个consumer消耗了.

2024-07-05 10:41:10projectitrcp20220217183240276namespacekhoros-mom-prodpodmom-service-qm-9a049-7665f4899b-62sxdpathstdoutlog_data2024-07-05 10:41:10.765 WARN [-,,,] 1 --- c.s.m.q.t.s.h.OperationStartReceiver : OperationStartReceiver msg:{"oprName":"打胶-玻璃装配","scrappedQuantity":0,"materialNo":"130901000711B","wipOrderNo":"880006861916","oprSequenceNo":"0041","goodQuantity":0,"operationWorkStationName":"打胶-玻璃装配","standOprName":"人工组装","wipOrderOperationId":596309434920902660,"requestId":"2b7eaabea93843dcb9bf6838f8da1ca3","workCenterId":579597295600041993,"progressStatus":130,"facilityCode":"2010","facilityId":571301815799623680,"operationWorkStationCode":"2010JS01A0002","completeSystem":"IMOM","operationWorkStationId":579600364052193290,"orderPlanQuantity":1,"materialId":575618864008929280,"creationDate":1720147195174,"workCenterCode":"303910","wipOrderId":596309434912514048,"createdBy":39020,"workType":0,"tenantId":2,"operationType":"START","standSequenceNo":"ZP00009","oprSerialNo":"000000"} UserDetails:CustomUserDetails{userId=39020, username=hecf6, realName='何超锋', email='null', timeZone='null', language='zh_CN', roleId=null, roleIds=null, siteRoleIds=null, tenantRoleIds=null, roleMergeFlag=null, tenantId=2, tenantNum='null', tenantIds=null, imageUrl='null', organizationId=2, isAdmin=null, clientId=null, clientName='null', clientAuthorizedGrantTypes=null, clientResourceIds=null, clientScope=null, clientRegisteredRedirectUri=null, clientAccessTokenValiditySeconds=null, clientRefreshTokenValiditySeconds=null, clientAutoApproveScopes=null, additionInfo={"facility_ids":"571301815799623680","employee_no":"80033656","account_no":"hecf6","facility_name":"2010-华湘工厂","facility_no":"2010","facility_id":"571301815799623680","phone_number":"15062676297","domain_account":"hecf6"}, apiEncryptFlag=null, apiReplayFlag=null, menuIdFlag=null, roleLabels='null}

2024-07-05 10:41:10projectitrcp20220217183240276namespacekhoros-mom-prodpodmom-service-qm-9a049-7665f4899b-62sxdpathstdoutlog_data2024-07-05 10:41:10.766 INFO [-,,,] 1 --- .s.i.BaseInspectionTaskCreateServiceImpl : 开始创建检验任务: wipOrderNo:880006861916 materialNo:130901000711B type:01,02,04

2024-07-05 10:41:11projectitrcp20220217183240276namespacekhoros-mom-prodpodmom-service-qm-9a049-7665f4899b-jpxkbpathstdoutlog_data2024-07-05 10:41:10.783 INFO [-,,,] 1 --- i.InspectionTaskCreateServiceImplProduce : handle additional field in produce: {"objectVersionNumber":1,"oprName":"打胶-玻璃装配","workStationId":579600364052193290,"materialNo":"130901000711B","lastUpdateDate":1720147270779,"_token":"KAGDNz23HetMDAfJJq/9LYpnI/R6ia9Qcr0KvO1kMzQFdY55i+FzAtP46AJMmH1KyuXjf3zheBwdLiwS6ULciHXbNz74wk0jWRr7otOcfXdCgONlfVsDBuyLU6ij1OJZ","productionLineName":"驾驶室装配线","sampleNo":"ZJYB0705018750001","isPassBack":false,"wipOrderNo":"880006861916","oprSequenceNo":"0041","taskExecutionNumber":0,"isFirst":false,"businessNo":"","taskType":"01","wipOrderOperationId":596309434920902660,"workCenterId":579597295600041993,"flex":{},"supplier":"","wipOperationId":596309434920902660,"wipOrderNoReverse":"619168600088","taskNo":"JYZJ2024070501875","id":597014172781752320,"inspectionPlanId":583340904417832960,"workStationCode":"2010JS01A0002","productionLineId":578627703880851457,"lastUpdatedBy":39020,"facilityId":571301815799623680,"quantity":1,"measureUnit":"","materialId":575618864008929280,"creationDate":1720147270779,"exeStatus":false,"inspectionPlanVersion":"2","deleted":false,"workCenterCode":"303910","recordUnqualifiedNumber":false,"createdBy":39020,"productionLineCode":"2010JS01","tenantId":2,"isDevice":false,"businessType":"01","workStationName":"打胶-玻璃装配","status":"2","oprSerialNo":"000000"}
=================================
2024-07-05 10:41:11projectitrcp20220217183240276namespacekhoros-mom-prodpodmom-service-qm-9a049-7665f4899b-jpxkbpathstdoutlog_data2024-07-05 10:41:10.697 WARN [-,,,] 1 --- c.s.m.q.t.s.h.OperationStartReceiver : OperationStartReceiver msg:{"oprName":"打胶-玻璃装配","scrappedQuantity":0,"materialNo":"130901000711B","wipOrderNo":"880006861916","oprSequenceNo":"0041","goodQuantity":0,"operationWorkStationName":"打胶-玻璃装配","standOprName":"人工组装","wipOrderOperationId":596309434920902660,"requestId":"2b7eaabea93843dcb9bf6838f8da1ca3","workCenterId":579597295600041993,"progressStatus":130,"facilityCode":"2010","facilityId":571301815799623680,"operationWorkStationCode":"2010JS01A0002","completeSystem":"IMOM","operationWorkStationId":579600364052193290,"orderPlanQuantity":1,"materialId":575618864008929280,"creationDate":1720147195174,"workCenterCode":"303910","wipOrderId":596309434912514048,"createdBy":39020,"workType":0,"tenantId":2,"operationType":"START","standSequenceNo":"ZP00009","oprSerialNo":"000000"} UserDetails:CustomUserDetails{userId=39020, username=hecf6, realName='何超锋', email='null', timeZone='null', language='zh_CN', roleId=null, roleIds=null, siteRoleIds=null, tenantRoleIds=null, roleMergeFlag=null, tenantId=2, tenantNum='null', tenantIds=null, imageUrl='null', organizationId=2, isAdmin=null, clientId=null, clientName='null', clientAuthorizedGrantTypes=null, clientResourceIds=null, clientScope=null, clientRegisteredRedirectUri=null, clientAccessTokenValiditySeconds=null, clientRefreshTokenValiditySeconds=null, clientAutoApproveScopes=null, additionInfo={"facility_ids":"571301815799623680","employee_no":"80033656","account_no":"hecf6","facility_name":"2010-华湘工厂","facility_no":"2010","facility_id":"571301815799623680","phone_number":"15062676297","domain_account":"hecf6"}, apiEncryptFlag=null, apiReplayFlag=null, menuIdFlag=null, roleLabels='null}

2024-07-05 10:41:11projectitrcp20220217183240276namespacekhoros-mom-prodpodmom-service-qm-9a049-7665f4899b-jpxkbpathstdoutlog_data2024-07-05 10:41:10.698 INFO [-,,,] 1 --- .s.i.BaseInspectionTaskCreateServiceImpl : 开始创建检验任务: wipOrderNo:880006861916 materialNo:130901000711B type:01,02,04

2024-07-05 10:41:11projectitrcp20220217183240276namespacekhoros-mom-prodpodmom-service-qm-9a049-7665f4899b-62sxdpathstdoutlog_data2024-07-05 10:41:10.873 INFO [-,,,] 1 --- i.InspectionTaskCreateServiceImplProduce : handle additional field in produce: {"objectVersionNumber":1,"oprName":"打胶-玻璃装配","workStationId":579600364052193290,"materialNo":"130901000711B","lastUpdateDate":1720147270864,"_token":"KAGDNz23HetMDAfJJq/9LYpnI/R6ia9Qcr0KvO1kMzQFdY55i+FzAtP46AJMmH1KyuXjf3zheBwdLiwS6ULciHXbNz74wk0jWRr7otOcfXdZr1IoJ3ydQQ21EW6nNQiq","productionLineName":"驾驶室装配线","sampleNo":"ZJYB0705018760001","isPassBack":false,"wipOrderNo":"880006861916","oprSequenceNo":"0041","taskExecutionNumber":0,"isFirst":false,"businessNo":"","taskType":"01","wipOrderOperationId":596309434920902660,"workCenterId":579597295600041993,"flex":{},"supplier":"","wipOperationId":596309434920902660,"wipOrderNoReverse":"619168600088","taskNo":"JYZJ2024070501876","id":597014173138264064,"inspectionPlanId":583340904417832960,"workStationCode":"2010JS01A0002","productionLineId":578627703880851457,"lastUpdatedBy":39020,"facilityId":571301815799623680,"quantity":1,"measureUnit":"","materialId":575618864008929280,"creationDate":1720147270864,"exeStatus":false,"inspectionPlanVersion":"2","deleted":false,"workCenterCode":"303910","recordUnqualifiedNumber":false,"createdBy":39020,"productionLineCode":"2010JS01","tenantId":2,"isDevice":false,"businessType":"01","workStationName":"打胶-玻璃装配","status":"2","oprSerialNo":"000000"}

为什么会出现多个consumer对同一条消息重复消耗呢?

通过日志排查,同一消息队列的消息被重复消耗,固然创建查验任务有查重的判断,但是在多实例同时消耗时,在非常短的时间内,数据落表和查重存在幻读情况(查询数据库还消灭表成功时的状态),因此这种单历程级别的查重在分布式场景中根本无效,会有概率性的失败.
根据rabbitmq的原理,如果是direct范例的exchange,程度扩展的微服务多实例使用相同的routingkey绑定到同一个队列进行消耗,其消耗模式是 竞争消耗者模式:多个消耗者实例监听同一个队列,但每条消息只会被一个消耗者实例消耗。RabbitMQ使用公平分发(round-robin)的计谋来选择消耗者,但也可以通过设置消耗者优先级或预取计数(prefetch count)来影响分发计谋.
因此在这种情况下,消耗者是竞争消耗,当没有及时进行ACK应答时,就有大概率存在重复消耗,就会存在同一消息被多次消耗的大概情况.生产环境的日志证实白这一点.
问题的根源

消息处理的幂等性,无论是多个实例consumer消费同一条消息,在要求幂等的接口场景,就要做幂等性的代码实现,不能想当然认为这大概率不会发生,即使发生了1次,也是系统缺陷,要么做线性消费,要么做幂等消息处理.
解决办法:

思绪1.让多个consumer监听器线性消耗并让client端主动ACK.

//application.yml 增加自动应答
spring:
    rabbitmq:
      host:xxx
      port:xxx
      username:xx
      password:xx
      listener:
          simple:
            acknowledge-mode: auto
            prefectch: 1
            retry:
                enabled: true

//消息队列初始化为单活消费者模式
初始化queue时,设置client为单活消费者模式
HashMap<String,Object> args=new HashMap();
args.put("x-single-active-consumer",true);
amqpAdmin.declareQueue(new Queue(xx,xx,xx,args));

配置完成后,删除该队列,qm服务主动重建队列,配置完后可以查看到该消息队列的consumer中,处于active的只有一个.
思绪2. 消息幂等处理惩罚

MQ消耗者的幂等性一般使用全局ID大概写个唯一标识(比如流水号/时间戳/UUID/订单号)来判断该消息是否已消耗过,也可以使用redis执行setnx下令,天然具有幂等性,从而实现不重复消耗(保举使用redis)。
也就是在消耗者业务逻辑处理惩罚时,先辈行全局加锁,不外这种情况对接口响应性能有压力,而且依靠于Redis的稳定性,如果redis崩溃了,接口就无法正常服务.基于现在企业内的实际情况,应该可以做出取舍.
终极选择

接纳思绪1实现方式,设置简单,影响范围小,不依靠Redis组件.
在sit环境,启动多个qm实例模拟生产环境消耗消息,实践证实,没有再出现重复消耗消息的情况.
补丁包分支:bugfix-62-imom-28381
缺陷:imom-28383
生产修复:
手动删除队列:
xxx 工序开工质量监听队列
xxx 工序完工质量监听队列
参考资料
微服务架构数据同等性分析
RabbitMQ重复消耗

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: RabbitMQ定义的MQ多个consumer重复消耗同一条消息