golang rocketmq包管数据一致性(以电商订单为例)

打印 上一主题 下一主题

主题 887|帖子 887|积分 2661

数据一致性的原因

1.所有情况
a):当地订单信息创建失败,库存扣减成功(一致)
b):当地订单信息新建失败,但是库存扣减成功(不一致)
c):当地订单信息新建成功,但是库存扣减失败(不一致)
d):当地订单信息新建失败,库存扣减失败(一致)
当地数据库直接通过数据库事务就能完成,但是分布式体系跨微服务就很复杂
2.大概会出现的标题

a):库存扣减成功,当地执行失败-调用库存的归还接口(磁盘满了导致服务宕机、网络出标题调用失败-重试到成功-网络标题比力复杂:抖动、拥塞导致过了一段时间之后才会被接收到?-会不会导致重复归还-幂等性标题)
b):非当地执行失败,而是当地代码非常(挂掉),不知道当地执行情况

a):扣减库存发送出去,当网络壅闭,重试三次后哀求回滚
b):扣减库存发送出去,在库存服务返回之前当地事务宕机
TCC解决库存扣减的标题

confirm或者cancel的时候都大概会宕机,但是下一次启动的时候可以读取日志继续没有完成的逻辑,实现比力复杂,java有seta库,golang有go-seta但不支持grpc

基于可靠消息终极一致性方案

基于可靠消息本质上解决了一个标题:可靠消息(消耗者应该确保这个消息能被精确消耗,即库存服务代码要可靠而且肯定要执行,代码有bug或者宕机都能解决)
积分服务没有标题,但是库存服务大概存在库存不足,得答应失败(TCC比力合理一些,而且TCC并发没有可靠消息并发高)

改进执行库存后执行当地事务,关注库存扣减成功后出现的那种非常,扣减失败直接不执行

如何通过基于可靠消息实现库存的一致性?
1.订单信息准备一个归还库存的half消息
2.订单服务通过grpc调用库存服务
3.假如失败直接rollback不发送,什么事变都没有发生;假如成功执行当地事务
4.当地事务执行成功,通过rollback不发送1准备的消息;当地事务假如失败,通过commit发送1准备的消息通过调用reback逻辑归还库存
5.通过commit发送1准备的消息通过调用reback逻辑归还库存
到此正常的逻辑已经结束,第六七步是宕机或者步调堕落的回查
6.假如非常步调挂掉,就回查当地逻辑
7.假如没有查询到当地信息就通过commit发送1准备的消息通过调用reback逻辑归还库存

潜在标题:下单但不付出-设置超时归还订单
改进:通过库存服务监听延时消息确保超时归还

发送端逻辑
构造OrderListener
  1. type OrderListener struct{}
  2. // When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
  3. func (o *OrderListener) ExecuteLocalTransaction(addr *primitive.Message) primitive.LocalTransactionState {
  4.         fmt.Println("开始执行本地逻辑")
  5.         time.Sleep(time.Second * 3)
  6.         fmt.Println("执行本地逻辑失败")
  7.         //本地执行逻辑无缘无故失败 代码异常 宕机
  8.         return primitive.UnknowState
  9. }
  10. // When no response to prepare(half) message. broker will send check message to check the transaction status, and this
  11. // method will be invoked to get local transaction status.
  12. func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
  13.         fmt.Println("rocketmq的消息回查")
  14.         time.Sleep(time.Second * 15)
  15.         return primitive.CommitMessageState
  16. }
复制代码
创建订单API
  1. func (*OrderServer) CreateOrder(ctx context.Context, req *proto.OrderRequest) (*proto.OrderInfoResponse, error) {
  2.         orderListener := OrderListener{}
  3.         p, err := rocketmq.NewTransactionProducer(
  4.                 &orderListener,
  5.                 producer.WithNameServer([]string{"IP:PORT"}),
  6.         )
  7.         if err != nil {
  8.                 // panic("生成producer失败")
  9.                 zap.S().Errorf("生成producer失败:%s", err.Error())
  10.                 return nil, err
  11.         }
  12.         if err = p.Start(); err != nil {
  13.                 // panic("启动producer失败")
  14.                 zap.S().Errorf("启动producer失败:%s", err.Error())
  15.                 return nil, err
  16.         }
  17.         order := model.OrderInfo{
  18.                 OrderSn:      GenerateOrderSn(req.UserId),
  19.                 Address:      req.Address,
  20.                 SignerName:   req.Name,
  21.                 SingerMobile: req.Mobile,
  22.                 Post:         req.Post,
  23.                 User:         req.UserId,
  24.         }
  25.         //应该在消息中具体指名一个具体的商品的扣减情况
  26.         jsonString, _ := json.Marshal(order)
  27.         _, err = p.SendMessageInTransaction(context.Background(),
  28.                 primitive.NewMessage("order_reback", jsonString))
  29.         if err != nil {
  30.                 fmt.Printf("发送失败:%s\n", err)
  31.                 return nil, status.Error(codes.Internal, "发送消息失败")
  32.         }
  33.         // 4.本地事务失败,commit
  34.         if orderListener.Code != codes.OK {
  35.                 return nil, status.Error(orderListener.Code, orderListener.Detail)
  36.         }
  37.         return &proto.OrderInfoResponse{Id: orderListener.ID, OrderSn: order.OrderSn, Total: orderListener.OrderAmount}, nil
  38. }
复制代码
修改OrderListener通过指针的情势回传值
  1. type OrderListener struct {
  2.         Code        codes.Code
  3.         Detail      string
  4.         ID          int32
  5.         OrderAmount float32
  6. }
复制代码
ExecuteLocalTransaction执行当地事务逻辑
  1. // When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
  2. func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
  3.         var orderInfo model.OrderInfo
  4.         _ = json.Unmarshal(msg.Body, &orderInfo)
  5.         var goodsIds []int32
  6.         var shopCarts []model.ShoppingCart
  7.         goodsNumsMap := make(map[int32]int32)
  8.         if result := global.DB.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Find(&shopCarts); result.RowsAffected == 0 {
  9.                 o.Code = codes.InvalidArgument
  10.                 o.Detail = "没有选中结算的商品"
  11.                 //没扣减回滚,没必要发送
  12.                 return primitive.RollbackMessageState
  13.         }
  14.         for _, shopCart := range shopCarts {
  15.                 goodsIds = append(goodsIds, shopCart.Goods)
  16.                 goodsNumsMap[shopCart.Goods] = shopCart.Nums
  17.         }
  18.         //跨服务调用
  19.         //商品微服务
  20.         goods, err := global.GoodsSrvClient.BatchGetGoods(context.Background(), &proto.BatchGoodsIdInfo{
  21.                 Id: goodsIds,
  22.         })
  23.         if err != nil {
  24.                 o.Code = codes.Internal
  25.                 o.Detail = "批量查询商品信息失败"
  26.                 //没扣减回滚,没必要发送
  27.                 return primitive.RollbackMessageState
  28.         }
  29.         var orderAmount float32
  30.         var orderGoods []*model.OrderGoods
  31.         var goodsInvInfo []*proto.GoodsInvInfo
  32.         for _, good := range goods.Data {
  33.                 orderAmount += good.ShopPrice * float32(goodsNumsMap[good.Id])
  34.                 orderGoods = append(orderGoods, &model.OrderGoods{
  35.                         Goods:      good.Id,
  36.                         GoodsName:  good.Name,
  37.                         GoodsImage: good.GoodsFrontImage,
  38.                         GoodsPrice: good.ShopPrice,
  39.                         Nums:       goodsNumsMap[good.Id],
  40.                 })
  41.                 goodsInvInfo = append(goodsInvInfo, &proto.GoodsInvInfo{
  42.                         GoodsId: good.Id,
  43.                         Num:     goodsNumsMap[good.Id],
  44.                 })
  45.         }
  46.         //跨服务调用库存微服务进行库存扣减
  47.         if _, err = global.InventorySrvClient.Sell(context.Background(), &proto.SellInfo{OrderSn: orderInfo.OrderSn, GoodsInfo: goodsInvInfo}); err != nil {
  48.                 o.Code = codes.ResourceExhausted
  49.                 o.Detail = "扣减库存失败"
  50.                 return primitive.RollbackMessageState
  51.         }
  52.         //生成订单表
  53.         //20241020xxxx
  54.         tx := global.DB.Begin()
  55.         orderInfo.OrderMount = orderAmount
  56.         if result := tx.Save(&orderInfo); result.RowsAffected == 0 {
  57.                 tx.Rollback()
  58.                 o.Code = codes.ResourceExhausted
  59.                 o.Detail = "创建订单失败"
  60.                 return primitive.CommitMessageState
  61.         }
  62.         for _, orderGood := range orderGoods {
  63.                 orderGood.Order = orderInfo.ID
  64.         }
  65.         o.OrderAmount = orderAmount
  66.         o.ID = orderInfo.ID
  67.         //批量插入orderGoods
  68.         if result := tx.CreateInBatches(orderGoods, 100); result.RowsAffected == 0 {
  69.                 tx.Rollback()
  70.                 o.Code = codes.Internal
  71.                 o.Detail = "批量插入商品失败"
  72.                 return primitive.CommitMessageState
  73.         }
  74.         if result := tx.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Delete(&model.ShoppingCart{}); result.RowsAffected == 0 {
  75.                 tx.Rollback()
  76.                 o.Code = codes.Internal
  77.                 o.Detail = "删除购物车记录失败"
  78.                 return primitive.CommitMessageState
  79.         }
  80.         //发送延时消息
  81.         p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"IP:PORT"}))
  82.         if err != nil {
  83.                 panic("生成producer失败")
  84.         }
  85.         if err = p.Start(); err != nil {
  86.                 panic("启动producer失败")
  87.         }
  88.         msgs := primitive.NewMessage("order_timeout", msg.Body)
  89.         msgs.WithDelayTimeLevel(16) //延时30mins
  90.         _, err = p.SendSync(context.Background(), msgs)
  91.         if err != nil {
  92.                 zap.S().Errorf("发送延时消息失败:%v\n", err.Error())
  93.                 tx.Rollback()
  94.                 o.Code = codes.Internal
  95.                 o.Detail = "发送延时消息失败"
  96.                 return primitive.CommitMessageState
  97.         }
  98.         if err = p.Shutdown(); err != nil {
  99.                 panic("关闭producer失败")
  100.         }
  101.         //提交事务
  102.         tx.Commit()
  103.         o.Code = codes.OK
  104.         //本地执行逻辑无缘无故失败 代码异常 宕机
  105.         return primitive.RollbackMessageState
  106. }
复制代码
消息回查CheckLocalTransaction
  1. // When no response to prepare(half) message. broker will send check message to check the transaction status, and this
  2. // method will be invoked to get local transaction status.
  3. func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
  4.         var orderInfo model.OrderInfo
  5.         _ = json.Unmarshal(msg.Body, &orderInfo)
  6.         //怎么检查之前的逻辑是否完成
  7.         if result := global.DB.Where(model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&orderInfo); result.RowsAffected == 0 {
  8.                 //这里并不能说明内存扣减了
  9.                 return primitive.CommitMessageState
  10.         }
  11.         return primitive.RollbackMessageState
  12. }
复制代码
消耗端逻辑
归还表
  1. type DeliveryHistory struct {
  2.         Goods   int32  `gorm:"type:int;index"`
  3.         Num     int32  `gorm:"type:int"`
  4.         OrderSn string `gorm:"type:varchar(200)"`
  5.         Status  string `gorm:"type:varchar(200)"` //1.代表已扣减.2.代表已归还,3.失败
  6. }
复制代码
表设计比力贫苦,十件商品创建十条信息,1.数据量大2.查询贫苦
改进表为StockSellDetail
  1. type StockSellDetail struct {
  2.         OrderSn string          `gorm:"type:varchar(200);index:idx_order_sn,unique"`
  3.         Status  int             `gorm:"type:varchar(200)"` //1.表示已扣减,2.表示已归还
  4.         Detail  GoodsDetailList `gorm:"type:varchar(200)"` //list类型,包括id和num
  5. }
复制代码
GoodsDetailList定义
  1. type GoodsDetail struct {
  2.         Goods int32
  3.         Num   int32
  4. }
  5. type GoodsDetailList []GoodsDetail
  6. func (g *GoodsDetailList) Scan(value interface{}) error {
  7.         return json.Unmarshal(value.([]byte), &g)
  8. }
  9. func (g GoodsDetailList) Value() (driver.Value, error) {
  10.         return json.Marshal(g)
  11. }
复制代码
通过gorm新建数据表
归还函数
  1. func AutoReback(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
  2.         type OrderInfo struct {
  3.                 OrderSn string
  4.         }
  5.         for i := range msgs {
  6.                 var orderInfo OrderInfo
  7.                 err := json.Unmarshal(msgs[i].Body, &orderInfo)
  8.                 if err != nil {
  9.                         zap.S().Errorf("解析json失败:%v\n", msgs[i].Body)
  10.                         //代码问题 下一次重新发consumer.ConsumeRetryLater
  11.                         //数据问题 忽略
  12.                         return consumer.ConsumeSuccess, nil
  13.                 }
  14.                 //去将inv的库存加回去,将selldetail的status设置为2,要在事务中进行
  15.                 tx := global.DB.Begin()
  16.                 var sellDetail model.StockSellDetail
  17.                 if result := tx.Where(&model.StockSellDetail{OrderSn: orderInfo.OrderSn, Status: 1}).First(&sellDetail); result.RowsAffected == 0 {
  18.                         return consumer.ConsumeSuccess, nil
  19.                 }
  20.                 //逐个归还
  21.                 for _, orderGood := range sellDetail.Detail {
  22.                         //先查后改并发容易出问题,直接stocks=stocks+2
  23.                         if result := tx.Where(&model.Inventory{Goods: orderGood.Goods}).Update("stocks", gorm.Expr("sotcks+?", orderGood.Num)); result.RowsAffected == 0 {
  24.                                 tx.Rollback()
  25.                                 //检查代码
  26.                                 return consumer.ConsumeRetryLater, nil
  27.                         }
  28.                 }
  29.                 sellDetail.Status = 2
  30.                 if result := tx.Where(&model.StockSellDetail{OrderSn: orderInfo.OrderSn}).Update("status", 2); result.RowsAffected == 0 {
  31.                         tx.Rollback()
  32.                         return consumer.ConsumeRetryLater, nil
  33.                 }
  34.                 tx.Commit()
  35.         }
  36.         return consumer.ConsumeSuccess, nil
  37. }
复制代码
库存服务订阅topic
  1. //监听库存归还topic
  2.         c, err := rocketmq.NewPushConsumer(
  3.                 consumer.WithNameServer([]string{"IP:PORT"}),
  4.                 consumer.WithGroupName("shop"),
  5.         )
  6.         if err != nil {
  7.                 panic("连接失败")
  8.         }
  9.         if err = c.Subscribe("order_reback", consumer.MessageSelector{}, handler.AutoReback); err != nil {
  10.                 fmt.Println("读取消息失败")
  11.         }
  12.         _ = c.Start()
  13.         //不能让主goroutine退出
  14.         time.Sleep(time.Hour)
  15.         _ = c.Shutdown()
复制代码
超时归还订阅topic
  1. //订单超时归还topic
  2.         c, err := rocketmq.NewPushConsumer(
  3.                 consumer.WithNameServer([]string{"IP:PORT"}),
  4.                 consumer.WithGroupName("order"),
  5.         )
  6.         if err != nil {
  7.                 panic("连接失败")
  8.         }
  9.         if err = c.Subscribe("order_timeout", consumer.MessageSelector{}, handler.OrderTimeout); err != nil {
  10.                 fmt.Println("读取消息失败")
  11.         }
  12.         _ = c.Start()
  13.         //不能让主goroutine退出
  14.         time.Sleep(time.Hour)
  15.         _ = c.Shutdown()
复制代码
超时归还处置惩罚函数
  1. func OrderTimeout(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
  2.         for i := range msgs {
  3.                 var orderInfo model.OrderInfo
  4.                 _ = json.Unmarshal(msgs[i].Body, &orderInfo)
  5.                 fmt.Printf("获取到消息订单超时消息:%v\n", time.Now())
  6.                 //查询订单的支付状态,未支付归还
  7.                 var order model.OrderInfo
  8.                 if result := global.DB.Model(model.OrderInfo{}).Where(model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&order); result.RowsAffected == 0 {
  9.                         return consumer.ConsumeSuccess, nil
  10.                 }
  11.                 if order.Status != "TRADE_SUCCESS" {
  12.                         //归还库存发送一个消息到order_reback
  13.                         tx := global.DB.Begin()
  14.                         //修改订单的状态已支付
  15.                         order.Status = "TRADE_CLOSE"
  16.                         tx.Save(&order)
  17.                         p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"IP:PORT"}))
  18.                         if err != nil {
  19.                                 panic("生成producer失败")
  20.                         }
  21.                         if err = p.Start(); err != nil {
  22.                                 panic("启动producer失败")
  23.                         }
  24.                         res, err := p.SendSync(context.Background(), primitive.NewMessage("order_reback", msgs[i].Body))
  25.                         if err != nil {
  26.                                 tx.Rollback()
  27.                                 fmt.Println("发送失败:%s\n", err)
  28.                                 return consumer.ConsumeRetryLater, nil
  29.                         } else {
  30.                                 fmt.Println("发送成功:%s\n", res.String())
  31.                         }
  32.                         tx.Commit()
  33.                         if err = p.Shutdown(); err != nil {
  34.                                 panic("关闭producer失败")
  35.                         }
  36.                 }
  37.         }
  38.         return consumer.ConsumeSuccess, nil
  39. }
复制代码
shutdown导致消耗偏移量offset更新失败

client.ClientID()默认返回历程的pid,而clientMap通过调用client.ClientID()为key创建或加载,等候shutdown时会移除,假如一个历程中多次调用且存在shutdown大概会导致消耗偏移量offset更新失败,进而导致rocketmq堕落。
解决方法
(1)生成clientID不一致
(2)不随便shutdown
注意
不要在一个历程中利用多个producer或者不要随意调用shutdown影响其他producer

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

勿忘初心做自己

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表