利用谷歌云Pub/Sub 实现多任务并行分发处置惩罚方案

嚴華  金牌会员 | 2024-6-14 23:17:19 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 684|帖子 684|积分 2052

背景

目前老梁团队负责的Global Data Integration Platform每天有大量文件需要从来自不同地区的上游下载文件并进行处置惩罚后再发送到不同下游。老梁的数据集成平台集群有6个服务器节点,老梁渴望所有呆板的资源都能利用上,提升大量文件并行处置惩罚本领,并且不同呆板节点的任务必须不能重复,否则大概造成文件下载或处置惩罚失败。
原有的服务是使用Quarz集群,通过定时调度去下载,但是Quartz调度框架固然自己支持负载均衡,但是其Cluster每个节点都不是均衡分配任务,假如某一节点具有竞争资源上风,有机会一直持有任务,导致其他节点空闲下来,服务器大概某资质源消耗过大而导致宕机,这并不是老梁想要的结果。后来也尝试使用生产者消费者模型,通过F5负载均衡+API关照+异步回调方式后,服务多节点并行处置惩罚本领有所增强,但由于使用Http方式进行通信导致服务之间存在直接依靠,当消费者服务进行重启或者停机,存在生产者API关照失败的大概,需要做额外的补偿处置惩罚。如下图所示:
生产者消费者模型:

办理思绪

目前老梁公司已经完成了谷歌云和公司机房的网络搭建,并且公司的自有数据中央跟谷歌云可以直接通过谷歌的Dedicated Interconnect服务,也就是可以通过专线直接进行连接。固然老梁的数据集成平台还部署在自有数据中央,但相对于文件下载的时间和速度消耗,谷歌云上的服务通过专线进行通信所带来的性能消耗几乎可以忽略(大约几百毫秒),老梁公司的架构战略方向是优先使用云组件,减少On-Premise部署。末了老梁选择采用谷歌云Pub/Sub服务作为变乱消息服务,利用Pub/Sub高可用、使用简朴并天然支持多消息并行传输的特性,来对现有的数据集成平台进行改造。
Pub/Sub先容:
Pub/Sub 是一种筹划为高度可靠且可伸缩的异步消息传递服务。该服务以十多年来许多 Google 产品都在依靠的核心 Google 基础架构组件为基础而构建。其实可以理解成云上的Kafka。官网:https://cloud.google.com/pubsub/architecture?hl=zh-cn


  • Pub/Sub 是一种可扩缩的异步消息传递服务,可将天生消息的服务与处置惩罚这些消息的服务分离开来。
  • Pub/Sub 允许服务异步通信,延迟时间大约为 100 毫秒。
  • Pub/Sub 用于流式分析和数据集成流水线,以注入和分发数据。无论是作为用于消息整合的消息传递中间件,还是作为并行处置惩罚任务的队列,它都非常有用。
  • 通过 Pub/Sub,您可以创建变乱提供方和使用方的系统,称为发布者和订阅者。发布者通过广播变乱而不是同步远程过程调用 (RPC) 与订阅者异步通信。
  • 发布者将变乱发送到 Pub/Sub 服务,而不思量怎样或何时处置惩罚这些变乱。然后,Pub/Sub 会将变乱传送到对其做出响应的所有服务。在通过 RPC 进行通信的系统中,发布商必须等待订阅者接收数据。但是,Pub/Sub 中的异步集成可以提高整个系统的机动性和稳健性。
** 基于Pub/Sub改造后的模型: **
各个消费者节点所拿到的变乱都不会重复

大概实施方案

这里只使用模拟场景展示大概思绪,具体细节还需要根据各自项目进行优化。
注意事项:


  • 首先你要创建你应用要使用的Topic和Subscription,这里需要注意的是Subscription的ACK停止时间建议设置大点,否者假如你消费者如果消费变乱所消耗的时间>ACK停止时间,Pub/Sub将会对消息进行重发,这时候会存在重复变乱消息。也就是说,你要确保你的消费节点能在ACK停止时间之前处置惩罚好变乱并且响应ACK给Pub/SUb。

  • 建议你服务使用Pull方式从Pub/Sub的Subscription拉取消息,由于如许可以在你Consumer代码里自由设置你哀求所需要的参数,例如setMaxMessages方法可以让你自由定义你每次拉取多少变乱,更好地基于你服务器的本领去设置,并且也可以避免在做负载均衡的时候某些呆板节点所拿到的任务变乱太多导致服务器节点的资源没办法充分利用。


  • 使用Pub/Sub的自定义Event(变乱)必须要自定义一个唯一标识,如许可以在Consumer逻辑加上幂等控制,否则当刚好消费者没有及时处置惩罚变乱而Pub/Sub由于消费者ACK超时进行补偿重发,这大概会由于重复处置惩罚变乱给业务带来严肃结果。GCP Pub/Sub采用的是至少一次投递的策略,也就是大概对同一消息投递多次,固然实际应用中不常见,以下官方文档阐明了会重复投递的环境,通常就是上面所说的ACK超时导致的。

完成流程

这里只截取小部分文件下载的流程作为示范,其他类似需要并行处置惩罚的任务都可以参考。


  • File Watch Dog 从上游远程服务器基于File Pattern去监测有没有新文件
  • File Watch Dog 把监测到的新文件信息组装成变乱分别推送到GCP Pub/Sub Topic
  • File Process Engine所有节点并行从GCP Pub/Sub Subscription拉取任务,分别拉到不同的变乱消息
  • File Process Engine所有节点分别基于变乱消息里的DatafeedId去设置中央查找该Datafeed的连接信息
  • File Process Engine所有节点分别去上游远程服务器下载自己接收到的变乱对应的文件
简朴测试

这里使用官方提供的示例代码,简朴测试下发布多个消息,看看消费者代码是否会重复消费类似变乱。
参考示例:https://cloud.google.com/pubsub/docs/pull#java
  1. import com.google.cloud.pubsub.v1.AckReplyConsumer;
  2. import com.google.cloud.pubsub.v1.MessageReceiver;
  3. import com.google.cloud.pubsub.v1.Subscriber;
  4. import com.google.pubsub.v1.ProjectSubscriptionName;
  5. import com.google.pubsub.v1.PubsubMessage;
  6. import java.util.concurrent.TimeUnit;
  7. import java.util.concurrent.TimeoutException;
  8. public class SubscribeAsyncExample {
  9.   public static void main(String... args) throws Exception {
  10.     // TODO(developer): Replace these variables before running the sample.
  11.     String projectId = "your-project-id";
  12.     String subscriptionId = "your-subscription-id";
  13.     subscribeAsyncExample(projectId, subscriptionId);
  14.   }
  15.   public static void subscribeAsyncExample(String projectId, String subscriptionId) {
  16.     ProjectSubscriptionName subscriptionName =
  17.         ProjectSubscriptionName.of(projectId, subscriptionId);
  18.     // Instantiate an asynchronous message receiver.
  19.     MessageReceiver receiver =
  20.         (PubsubMessage message, AckReplyConsumer consumer) -> {
  21.           // Handle incoming message, then ack the received message.
  22.           System.out.println("Received MessageId: " + message.getMessageId()+"Data: " + message.getData().toStringUtf8());
  23.           consumer.ack();
  24.           System.out.println("Message has been acknowledge")
  25.         };
  26.     Subscriber subscriber = null;
  27.     try {
  28.       subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
  29.       // Start the subscriber.
  30.       subscriber.startAsync().awaitRunning();
  31.       System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
  32.       // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
  33.       subscriber.awaitTerminated(30, TimeUnit.SECONDS);
  34.     } catch (TimeoutException timeoutException) {
  35.       // Shut down the subscriber after 30s. Stop receiving messages.
  36.       subscriber.stopAsync();
  37.     }
  38.   }
  39. }
复制代码
这里我在Topic发布了5条带有序号的消息,分别是:test:1、test:2、test:3、test:4、test:5,然后开了三个进程去监听Subscription,看看会不会每个进程会不会出现重复的消息
进程1

进程2

进程3


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

嚴華

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表