数据一致性的原因
1.所有情况
a):当地订单信息创建失败,库存扣减成功(一致)
b):当地订单信息新建失败,但是库存扣减成功(不一致)
c):当地订单信息新建成功,但是库存扣减失败(不一致)
d):当地订单信息新建失败,库存扣减失败(一致)
当地数据库直接通过数据库事务就能完成,但是分布式体系跨微服务就很复杂
2.大概会出现的标题
a):库存扣减成功,当地执行失败-调用库存的归还接口(磁盘满了导致服务宕机、网络出标题调用失败-重试到成功-网络标题比力复杂:抖动、拥塞导致过了一段时间之后才会被接收到?-会不会导致重复归还-幂等性标题)
b):非当地执行失败,而是当地代码非常(挂掉),不知道当地执行情况
a):扣减库存发送出去,当网络壅闭,重试三次后哀求回滚
b):扣减库存发送出去,在库存服务返回之前当地事务宕机
TCC解决库存扣减的标题
confirm或者cancel的时候都大概会宕机,但是下一次启动的时候可以读取日志继续没有完成的逻辑,实现比力复杂,java有seta库,golang有go-seta但不支持grpc
基于可靠消息终极一致性方案
基于可靠消息本质上解决了一个标题:可靠消息(消耗者应该确保这个消息能被精确消耗,即库存服务代码要可靠而且肯定要执行,代码有bug或者宕机都能解决)
积分服务没有标题,但是库存服务大概存在库存不足,得答应失败(TCC比力合理一些,而且TCC并发没有可靠消息并发高)
改进执行库存后执行当地事务,关注库存扣减成功后出现的那种非常,扣减失败直接不执行
如何通过基于可靠消息实现库存的一致性?
1.订单信息准备一个归还库存的half消息
2.订单服务通过grpc调用库存服务
3.假如失败直接rollback不发送,什么事变都没有发生;假如成功执行当地事务
4.当地事务执行成功,通过rollback不发送1准备的消息;当地事务假如失败,通过commit发送1准备的消息通过调用reback逻辑归还库存
5.通过commit发送1准备的消息通过调用reback逻辑归还库存
到此正常的逻辑已经结束,第六七步是宕机或者步调堕落的回查
6.假如非常步调挂掉,就回查当地逻辑
7.假如没有查询到当地信息就通过commit发送1准备的消息通过调用reback逻辑归还库存
潜在标题:下单但不付出-设置超时归还订单
改进:通过库存服务监听延时消息确保超时归还
发送端逻辑
构造OrderListener
- type OrderListener struct{}
- // When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
- func (o *OrderListener) ExecuteLocalTransaction(addr *primitive.Message) primitive.LocalTransactionState {
- fmt.Println("开始执行本地逻辑")
- time.Sleep(time.Second * 3)
- fmt.Println("执行本地逻辑失败")
- //本地执行逻辑无缘无故失败 代码异常 宕机
- return primitive.UnknowState
- }
- // When no response to prepare(half) message. broker will send check message to check the transaction status, and this
- // method will be invoked to get local transaction status.
- func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
- fmt.Println("rocketmq的消息回查")
- time.Sleep(time.Second * 15)
- return primitive.CommitMessageState
- }
复制代码 创建订单API
- func (*OrderServer) CreateOrder(ctx context.Context, req *proto.OrderRequest) (*proto.OrderInfoResponse, error) {
- orderListener := OrderListener{}
- p, err := rocketmq.NewTransactionProducer(
- &orderListener,
- producer.WithNameServer([]string{"IP:PORT"}),
- )
- if err != nil {
- // panic("生成producer失败")
- zap.S().Errorf("生成producer失败:%s", err.Error())
- return nil, err
- }
- if err = p.Start(); err != nil {
- // panic("启动producer失败")
- zap.S().Errorf("启动producer失败:%s", err.Error())
- return nil, err
- }
- order := model.OrderInfo{
- OrderSn: GenerateOrderSn(req.UserId),
- Address: req.Address,
- SignerName: req.Name,
- SingerMobile: req.Mobile,
- Post: req.Post,
- User: req.UserId,
- }
- //应该在消息中具体指名一个具体的商品的扣减情况
- jsonString, _ := json.Marshal(order)
- _, err = p.SendMessageInTransaction(context.Background(),
- primitive.NewMessage("order_reback", jsonString))
- if err != nil {
- fmt.Printf("发送失败:%s\n", err)
- return nil, status.Error(codes.Internal, "发送消息失败")
- }
- // 4.本地事务失败,commit
- if orderListener.Code != codes.OK {
- return nil, status.Error(orderListener.Code, orderListener.Detail)
- }
- return &proto.OrderInfoResponse{Id: orderListener.ID, OrderSn: order.OrderSn, Total: orderListener.OrderAmount}, nil
- }
复制代码 修改OrderListener通过指针的情势回传值
- type OrderListener struct {
- Code codes.Code
- Detail string
- ID int32
- OrderAmount float32
- }
复制代码 ExecuteLocalTransaction执行当地事务逻辑
- // When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
- func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
- var orderInfo model.OrderInfo
- _ = json.Unmarshal(msg.Body, &orderInfo)
- var goodsIds []int32
- var shopCarts []model.ShoppingCart
- goodsNumsMap := make(map[int32]int32)
- if result := global.DB.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Find(&shopCarts); result.RowsAffected == 0 {
- o.Code = codes.InvalidArgument
- o.Detail = "没有选中结算的商品"
- //没扣减回滚,没必要发送
- return primitive.RollbackMessageState
- }
- for _, shopCart := range shopCarts {
- goodsIds = append(goodsIds, shopCart.Goods)
- goodsNumsMap[shopCart.Goods] = shopCart.Nums
- }
- //跨服务调用
- //商品微服务
- goods, err := global.GoodsSrvClient.BatchGetGoods(context.Background(), &proto.BatchGoodsIdInfo{
- Id: goodsIds,
- })
- if err != nil {
- o.Code = codes.Internal
- o.Detail = "批量查询商品信息失败"
- //没扣减回滚,没必要发送
- return primitive.RollbackMessageState
- }
- var orderAmount float32
- var orderGoods []*model.OrderGoods
- var goodsInvInfo []*proto.GoodsInvInfo
- for _, good := range goods.Data {
- orderAmount += good.ShopPrice * float32(goodsNumsMap[good.Id])
- orderGoods = append(orderGoods, &model.OrderGoods{
- Goods: good.Id,
- GoodsName: good.Name,
- GoodsImage: good.GoodsFrontImage,
- GoodsPrice: good.ShopPrice,
- Nums: goodsNumsMap[good.Id],
- })
- goodsInvInfo = append(goodsInvInfo, &proto.GoodsInvInfo{
- GoodsId: good.Id,
- Num: goodsNumsMap[good.Id],
- })
- }
- //跨服务调用库存微服务进行库存扣减
- if _, err = global.InventorySrvClient.Sell(context.Background(), &proto.SellInfo{OrderSn: orderInfo.OrderSn, GoodsInfo: goodsInvInfo}); err != nil {
- o.Code = codes.ResourceExhausted
- o.Detail = "扣减库存失败"
- return primitive.RollbackMessageState
- }
- //生成订单表
- //20241020xxxx
- tx := global.DB.Begin()
- orderInfo.OrderMount = orderAmount
- if result := tx.Save(&orderInfo); result.RowsAffected == 0 {
- tx.Rollback()
- o.Code = codes.ResourceExhausted
- o.Detail = "创建订单失败"
- return primitive.CommitMessageState
- }
- for _, orderGood := range orderGoods {
- orderGood.Order = orderInfo.ID
- }
- o.OrderAmount = orderAmount
- o.ID = orderInfo.ID
- //批量插入orderGoods
- if result := tx.CreateInBatches(orderGoods, 100); result.RowsAffected == 0 {
- tx.Rollback()
- o.Code = codes.Internal
- o.Detail = "批量插入商品失败"
- return primitive.CommitMessageState
- }
- if result := tx.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Delete(&model.ShoppingCart{}); result.RowsAffected == 0 {
- tx.Rollback()
- o.Code = codes.Internal
- o.Detail = "删除购物车记录失败"
- return primitive.CommitMessageState
- }
- //发送延时消息
- p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"IP:PORT"}))
- if err != nil {
- panic("生成producer失败")
- }
- if err = p.Start(); err != nil {
- panic("启动producer失败")
- }
- msgs := primitive.NewMessage("order_timeout", msg.Body)
- msgs.WithDelayTimeLevel(16) //延时30mins
- _, err = p.SendSync(context.Background(), msgs)
- if err != nil {
- zap.S().Errorf("发送延时消息失败:%v\n", err.Error())
- tx.Rollback()
- o.Code = codes.Internal
- o.Detail = "发送延时消息失败"
- return primitive.CommitMessageState
- }
- if err = p.Shutdown(); err != nil {
- panic("关闭producer失败")
- }
- //提交事务
- tx.Commit()
- o.Code = codes.OK
- //本地执行逻辑无缘无故失败 代码异常 宕机
- return primitive.RollbackMessageState
- }
复制代码 消息回查CheckLocalTransaction
- // When no response to prepare(half) message. broker will send check message to check the transaction status, and this
- // method will be invoked to get local transaction status.
- func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
- var orderInfo model.OrderInfo
- _ = json.Unmarshal(msg.Body, &orderInfo)
- //怎么检查之前的逻辑是否完成
- if result := global.DB.Where(model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&orderInfo); result.RowsAffected == 0 {
- //这里并不能说明内存扣减了
- return primitive.CommitMessageState
- }
- return primitive.RollbackMessageState
- }
复制代码 消耗端逻辑
归还表
- type DeliveryHistory struct {
- Goods int32 `gorm:"type:int;index"`
- Num int32 `gorm:"type:int"`
- OrderSn string `gorm:"type:varchar(200)"`
- Status string `gorm:"type:varchar(200)"` //1.代表已扣减.2.代表已归还,3.失败
- }
复制代码 表设计比力贫苦,十件商品创建十条信息,1.数据量大2.查询贫苦
改进表为StockSellDetail
- type StockSellDetail struct {
- OrderSn string `gorm:"type:varchar(200);index:idx_order_sn,unique"`
- Status int `gorm:"type:varchar(200)"` //1.表示已扣减,2.表示已归还
- Detail GoodsDetailList `gorm:"type:varchar(200)"` //list类型,包括id和num
- }
复制代码 GoodsDetailList定义
- type GoodsDetail struct {
- Goods int32
- Num int32
- }
- type GoodsDetailList []GoodsDetail
- func (g *GoodsDetailList) Scan(value interface{}) error {
- return json.Unmarshal(value.([]byte), &g)
- }
- func (g GoodsDetailList) Value() (driver.Value, error) {
- return json.Marshal(g)
- }
复制代码 通过gorm新建数据表
归还函数
- func AutoReback(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
- type OrderInfo struct {
- OrderSn string
- }
- for i := range msgs {
- var orderInfo OrderInfo
- err := json.Unmarshal(msgs[i].Body, &orderInfo)
- if err != nil {
- zap.S().Errorf("解析json失败:%v\n", msgs[i].Body)
- //代码问题 下一次重新发consumer.ConsumeRetryLater
- //数据问题 忽略
- return consumer.ConsumeSuccess, nil
- }
- //去将inv的库存加回去,将selldetail的status设置为2,要在事务中进行
- tx := global.DB.Begin()
- var sellDetail model.StockSellDetail
- if result := tx.Where(&model.StockSellDetail{OrderSn: orderInfo.OrderSn, Status: 1}).First(&sellDetail); result.RowsAffected == 0 {
- return consumer.ConsumeSuccess, nil
- }
- //逐个归还
- for _, orderGood := range sellDetail.Detail {
- //先查后改并发容易出问题,直接stocks=stocks+2
- if result := tx.Where(&model.Inventory{Goods: orderGood.Goods}).Update("stocks", gorm.Expr("sotcks+?", orderGood.Num)); result.RowsAffected == 0 {
- tx.Rollback()
- //检查代码
- return consumer.ConsumeRetryLater, nil
- }
- }
- sellDetail.Status = 2
- if result := tx.Where(&model.StockSellDetail{OrderSn: orderInfo.OrderSn}).Update("status", 2); result.RowsAffected == 0 {
- tx.Rollback()
- return consumer.ConsumeRetryLater, nil
- }
- tx.Commit()
- }
- return consumer.ConsumeSuccess, nil
- }
复制代码 库存服务订阅topic
- //监听库存归还topic
- c, err := rocketmq.NewPushConsumer(
- consumer.WithNameServer([]string{"IP:PORT"}),
- consumer.WithGroupName("shop"),
- )
- if err != nil {
- panic("连接失败")
- }
- if err = c.Subscribe("order_reback", consumer.MessageSelector{}, handler.AutoReback); err != nil {
- fmt.Println("读取消息失败")
- }
- _ = c.Start()
- //不能让主goroutine退出
- time.Sleep(time.Hour)
- _ = c.Shutdown()
复制代码 超时归还订阅topic
- //订单超时归还topic
- c, err := rocketmq.NewPushConsumer(
- consumer.WithNameServer([]string{"IP:PORT"}),
- consumer.WithGroupName("order"),
- )
- if err != nil {
- panic("连接失败")
- }
- if err = c.Subscribe("order_timeout", consumer.MessageSelector{}, handler.OrderTimeout); err != nil {
- fmt.Println("读取消息失败")
- }
- _ = c.Start()
- //不能让主goroutine退出
- time.Sleep(time.Hour)
- _ = c.Shutdown()
复制代码 超时归还处置惩罚函数
- func OrderTimeout(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
- for i := range msgs {
- var orderInfo model.OrderInfo
- _ = json.Unmarshal(msgs[i].Body, &orderInfo)
- fmt.Printf("获取到消息订单超时消息:%v\n", time.Now())
- //查询订单的支付状态,未支付归还
- var order model.OrderInfo
- if result := global.DB.Model(model.OrderInfo{}).Where(model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&order); result.RowsAffected == 0 {
- return consumer.ConsumeSuccess, nil
- }
- if order.Status != "TRADE_SUCCESS" {
- //归还库存发送一个消息到order_reback
- tx := global.DB.Begin()
- //修改订单的状态已支付
- order.Status = "TRADE_CLOSE"
- tx.Save(&order)
- p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"IP:PORT"}))
- if err != nil {
- panic("生成producer失败")
- }
- if err = p.Start(); err != nil {
- panic("启动producer失败")
- }
- res, err := p.SendSync(context.Background(), primitive.NewMessage("order_reback", msgs[i].Body))
- if err != nil {
- tx.Rollback()
- fmt.Println("发送失败:%s\n", err)
- return consumer.ConsumeRetryLater, nil
- } else {
- fmt.Println("发送成功:%s\n", res.String())
- }
- tx.Commit()
- if err = p.Shutdown(); err != nil {
- panic("关闭producer失败")
- }
- }
- }
- return consumer.ConsumeSuccess, nil
- }
复制代码 shutdown导致消耗偏移量offset更新失败
client.ClientID()默认返回历程的pid,而clientMap通过调用client.ClientID()为key创建或加载,等候shutdown时会移除,假如一个历程中多次调用且存在shutdown大概会导致消耗偏移量offset更新失败,进而导致rocketmq堕落。
解决方法
(1)生成clientID不一致
(2)不随便shutdown
注意
不要在一个历程中利用多个producer或者不要随意调用shutdown影响其他producer
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |