ToB企服应用市场:ToB评测及商务社交产业平台
标题:
Dapr Outbox 执行流程
[打印本页]
作者:
郭卫东
时间:
2024-5-18 02:07
标题:
Dapr Outbox 执行流程
Dapr Outbox 是1.12中的功能。
本文只介绍Dapr Outbox 执行流程,Dapr Outbox基本用法请阅读
官方文档
。本文中appID=order-processor,topic=orders
本文前提知识:认识Dapr状态管理、Dapr发布订阅和Outbox 模式。
Outbox 模式的核心是在同一个数据库事件中生存业务数据和待发布的事件消息,再由某个“定时任务”读取待发布的事件消息并发布事件(并删除数据库中事件消息)
相关文章:
.NET中实现Outbox模式的框架CAP,作者Savorboard
使用 dotnetcore/CAP 的本地消息表模式,圣杰
先在内部发布一个主题(topic)
要使用Dapr Outbox,在.NET中就是调用DaprClient的ExecuteStateTransactionAsync(...)方法(得先完成Outbox相关的配置!),调用此方法会完成事件操作(生存业务数据和待发布的事件消息)并发布事件消息。
string DAPR_STORE_NAME = "statestoresql";
var client = new DaprClientBuilder().Build();
var orderId = 1;
var order = new Order(orderId);
var bytes = JsonSerializer.SerializeToUtf8Bytes(order);
var upsert = new List<StateTransactionRequest>()
{
new StateTransactionRequest(orderId.ToString(), bytes, StateOperationType.Upsert)
};
// 保存状态,并发布事件消息
await client.ExecuteStateTransactionAsync(DAPR_STORE_NAME, upsert);
public record Order([property: JsonPropertyName("orderId")] int orderId);
复制代码
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: orderpubsub # 发布订阅组件
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
复制代码
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestoresql # 状态组件
spec:
type: state.mysql
version: v1
metadata:
- name: connectionString
value: "root:mysecret@tcp(localhost:3306)/?allowNativePasswords=true"
- name: outboxPublishPubsub
value: orderpubsub
- name: outboxPublishTopic
value: orders
复制代码
调用ExecuteStateTransactionAsync(...)方法时,此方法把请求转发给sidecar,sidecar会发布一个内部主题。所谓内部,就是供Dapr使用,用户不消操作;所谓主题(Topic)就是一个事件;此主题格式为:namespace + appID + topic + "outbox" ,假设appID=order-processor,topic=orders,则内部主题(Topic)名就是order-processorordersoutbox(namespace 是与k8s有关),此主题用于判断事件是否执行乐成。
注:该内部主题(topic)默认和事件消息使用同一个Dapr发布/订阅组件,可以通过配置状态组件的元数据(metadata配置)字段outboxPubsub单独指定内部主题所使用的发布/订阅组件。相关配置请看
官方文档
主题内容
是CloudEvent格式,发布的事件数据如下(真正的待发布事件消息就是json中的data字段,后面就是读取的此值):
{
"data":"{"orderId":1}",
"datacontenttype":"text/plain",
"id":"outbox-a53e45f3-d646-4e4e-bcbf-0692ec7b9dd0",
"pubsubname":"orderpubsub",
"source":"order-processor",
"specversion":"1.0",
"time":"2024-01-25T17:12:31+08:00",
"topic":"",
"traceid":"",
"traceparent":"",
"tracestate":"",
"type":"com.dapr.event.sent"
}
复制代码
有了事件的发布者,那事件的订阅者是谁呢?
appID=order-processor的Dapr sidecar实例
。可以是执行生存状态的sidecar程序,或者是appID=order-processor的其他sidecar。
在同一事件中生存状态和事件消息
在内部主题(Topic)发布乐成后
,会在同一事件中生存状态和事件消息,也就是将方法client.ExecuteStateTransactionAsync(...)中的数据生存到数据库。id为outbox-a53e45f3-d646-4e4e-bcbf-0692ec7b9dd0的表示需待发布事件消息,id为order-processor||1表示状态数据。事件消息和状态数据生存在同一张表state中,在mysql中其表结构和数据如下所示。
如果此内部主题(Topic)发布
失败
,调用方直接抛非常,不会执行事件操作!state表不会有下面两条数据。
"eyJvcmRlcklkIjoxfQ=="既是状态数据又是待发布的事件数据;经过Base64解码,得到该值为json格式,即:{"orderId":1}
CREATE TABLE `state` (
`id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`value` json NOT NULL,
`isbinary` tinyint(1) NOT NULL,
`insertDate` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updateDate` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`eTag` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`expiredate` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `expiredate_idx`(`expiredate` ASC) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
复制代码
idvalueisbinaryinsertDateupdateDateeTagexpiredateoutbox-a53e45f3-d646-4e4e-bcbf-0692ec7b9dd0"0"02024-01-25 09:22:142024-01-25 09:22:1407884eed-eb5d-4887-8399-051c71206ed5order-processor||1"eyJvcmRlcklkIjoxfQ=="12024-01-25 09:12:312024-01-25 09:22:143d1e368f-f6d8-4ccd-946d-c10090c7cc42
内部主题(Topic)的订阅者发布事件消息
数据库事件执行乐成后,什么时候把事件消息发布出去呢?
把事件消息发布出去是在内部主题(Topic)的订阅者中实现的,具体如下:
步调X:appID为order-processor的sidecar接收到内部主题(Topic)发送的事件,然后通过查询判断id为outbox-a53e45f3-d646-4e4e-bcbf-0692ec7b9dd0的数据是否存在?
如果存在,表示状态数据和事件消息都已生存在mysql中,则发布事件消息(事件数据就前面提到的data字段)。事件发布乐成后,则删除id为outbox-a53e45f3-d646-4e4e-bcbf-0692ec7b9dd0的记录。
如果不存在就直接退出,停止后续操作;事件的订阅者会多次收到订阅消息,即重复步调X过程。
这里会有一个问题:接收到内部主题(Topic)后,状态和事件消息大概没有持久化到mysql(前面提到过,Dapr sidecar是先发布一个内部主题,再在同一事件中生存状态和事件消息)。所以获取状态执行以下重试计谋。删除状态时也是此重试计谋。
bo := &backoff.ExponentialBackOff{
InitialInterval: time.Millisecond * 500,// 初始间隔
MaxInterval: time.Second * 3, // 最大间隔。重试时间超过此值时,以此值为准
MaxElapsedTime: time.Second * 10, // 累计重试时间
Multiplier: 3, // 递增倍数
Clock: backoff.SystemClock,
RandomizationFactor: 0.1, // 随机因子
}
复制代码
总结
Dapr Outbox 执行流程简单说就是:先发布一个内部事件,再执行生存业务数据和事件消息,内部事件的订阅者再发布真正的事件消息。Dapr轮询数据库中待发布事件消息是通过订阅一个内部主题(Topic)实现的。
因为状态生存和事件发布是在sidecar中执行,所以
业务代码和事件消息不在同一个事件中
!!!Dapr Outbox是把
业务的状态数据和事件消息在同一个事件中生存
,也就是代码client.ExecuteStateTransactionAsync(...);并且状态数据和事件消息是生存到同一张表state中。
参考:
代码
Enable the transactional outbox pattern
outbox.go
Outbox issues
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4