基于DolphinScheduler抽取通用EventBus组件:支持耽误与事件驱动 ...

打印 上一主题 下一主题

主题 1934|帖子 1934|积分 5812

一、思路来源

虽然guava中的eventbus已经很方便了,但是还是想要实现一个更为方便,同时支持耽误事件、同时带eventbus的组件。在Apache DolphinScheduler项目中,有一个eventbus的组件,这个组件写得挺好的,想着用在业务系统上,因此本身抽取了一下,拿到业务系统中来用。话不多说,我们把它抽取出来吧,同时举行demo的运行。还是要感谢Apache DolphinScheduler的开源,让这个很简单,但是很高效的组件可以或许让我们便捷地使用。
二、具体实现过程

起首是定义事件接口:
  1. public interface IEvent {
  2. }
复制代码
针对事件接口,我们抽象出共性方法接口:耽误时间和过期时间。
  1. public abstractclass AbstractDelayEvent implements IEvent, Delayed {
  2.     privatefinallong delayTime;
  3.     privatefinallong expireTime;
  4.     public long getDelayTime() {
  5.         return delayTime;
  6.     }
  7.     public long getExpireTime() {
  8.         return expireTime;
  9.     }
  10.     public AbstractDelayEvent(long delayTime) {
  11.         this.delayTime = delayTime;
  12.         this.expireTime = System.currentTimeMillis() + delayTime;
  13.     }
  14.     @Override
  15.     public long getDelay(TimeUnit unit) {
  16.         long diff = expireTime - System.currentTimeMillis();
  17.         return unit.convert(diff, TimeUnit.MILLISECONDS);
  18.     }
  19.     @Override
  20.     public int compareTo(Delayed o) {
  21.         if (this.expireTime < ((AbstractDelayEvent) o).expireTime) {
  22.             return -1;
  23.         }
  24.         if (this.expireTime > ((AbstractDelayEvent) o).expireTime) {
  25.             return1;
  26.         }
  27.         return0;
  28.     }
  29. }
复制代码
主要的信息:
定义eventbus中,我们需要使用的方法:
  1. public interface IEventBus<T extends IEvent> {
  2.     void publish(T event);
  3.     Optional<T> poll() throws InterruptedException;
  4.     Optional<T> peek();
  5.     Optional<T> remove();
  6.     boolean isEmpty();
  7.     int size();
  8. }
复制代码
可以看到主要是:发布事件、消费、移除、删除、判断当前的事件是否为空,以及事件大小等方法。其中最重要的方法为发布事件和消费处置惩罚事件方法。
针对当前的事件bus接口举行抽象,抽取出共性方法,方便复用:
  1. public abstractclass AbstractDelayEventBus<T extends AbstractDelayEvent> implements IEventBus<T> {
  2.     protectedfinal DelayQueue<T> delayEventQueue = new DelayQueue<>();
  3.     @Override
  4.     public void publish(T event) {
  5.         delayEventQueue.put(event);
  6.     }
  7.     @Override
  8.     public Optional<T> poll() throws InterruptedException {
  9.         // 使用带超时的 poll 方法,等待事件到期
  10.         return Optional.ofNullable(delayEventQueue.poll(1000, TimeUnit.MILLISECONDS));
  11.     }
  12.     @Override
  13.     public Optional<T> peek() {
  14.         return Optional.ofNullable(delayEventQueue.peek());
  15.     }
  16.     @Override
  17.     public Optional<T> remove() {
  18.         return Optional.ofNullable(delayEventQueue.poll());
  19.     }
  20.     @Override
  21.     public boolean isEmpty() {
  22.         return delayEventQueue.isEmpty();
  23.     }
  24.     @Override
  25.     public int size() {
  26.         return delayEventQueue.size();
  27.     }
  28. }
复制代码
三、测试运行效果

接下来,我们使用它,来举行处置惩罚:
定义本身的耽误事件:
如果是在业务中,可以定义本身的业务数据信息事件对象
  1. public class MyDelayEvent extends AbstractDelayEvent {
  2.     private final String message;
  3.     public MyDelayEvent(long delayTime, String message) {
  4.         super(delayTime);
  5.         this.message = message;
  6.     }
  7.     public String getMessage() {
  8.         return message;
  9.     }
  10. }
复制代码
定义事件耽误事件bus
当然也可以举行本身的可定制化特性。
  1. public class MyDelayEventBus extends AbstractDelayEventBus<MyDelayEvent> {
  2.     // 不需要额外的修改
  3. }
复制代码
举行测试:
思路:创建事件总线、发布事件,然后针对发布的事件信息,举行消费,然后等待耽误时间的到来,从而实现消费,从而举行业务的处置惩罚。
  1. import java.util.Optional;
  2. publicclass EventBusExample {
  3.     public static void main(String[] args) throws InterruptedException {
  4.         // 创建事件总线
  5.         IEventBus<MyDelayEvent> eventBus = new MyDelayEventBus();
  6.         // 发布单个事件
  7.         eventBus.publish(new MyDelayEvent(100, "Single Event"));
  8.         System.out.println("After publish, event bus size: " + eventBus.size());
  9.         // 持续尝试消费事件
  10.         while (true) {
  11.             Optional<MyDelayEvent> event = eventBus.poll();
  12.             if (event.isPresent()) {
  13.                 System.out.println("Received event: " + event.get().getMessage());
  14.             } else {
  15.                 System.out.println("No event received within the timeout.");
  16.                 break;
  17.             }
  18.         }
  19.         // 检查总线大小
  20.         System.out.println("Event bus size: " + eventBus.size());
  21.     }
  22. }
复制代码
运行结果:

可以看到实现本身的业务逻辑还是很方便的,可以本身实现吧,这里给出的代码是可以运行的。
作者 | 刘亚洲
本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

泉缘泉

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表