【深入浅出 Yarn 架构与实现】2-4 Yarn 基础库 - 状态机库

[复制链接]
发表于 2022-11-11 20:45:52 | 显示全部楼层 |阅读模式
当一个服务拥有太多处理逻辑时,会导致代码结构异常的混乱,很难分辨一段逻辑是在哪个阶段发挥作用的。
这时就可以引入状态机模型,帮助代码结构变得清晰。
一、状态机库概述

一)简介

状态机由一组状态组成:
【初始状态 -> 中间状态 -> 最终状态】。
在一个状态机中,每个状态会接收一组特定的事件,根据事件类型进行处理,并转换到下一个状态。当转换到最终状态时则退出。
二)状态转换方式

状态间转换会有下面这三种类型:

三)Yarn 状态机类

在 Yarn 中提供了一个工厂类 StateMachineFactory 来帮助定义状态机。如何使用,我们直接写个 demo。

二、案例 demo

在上一篇文章《Yarn 服务库和事件库》案例基础上进行扩展,增加状态机库的内容。如果还不了解服务库和事件库的同学,建议先学习下上一篇文章。
案例已上传至 github,有帮助可以点个 ⭐️
https://github.com/Simon-Ace/hadoop-yarn-study-demo/tree/master/state-demo
一)状态机实现

状态机实现,可以直接嵌入到上篇文章中的 AsyncDispatcher使用。
这里仅给出状态机JobStateMachine以及各种事件处理的代码。完整的代码项目执行,请到 github demo 中查看。
  1. import com.shuofxz.event.JobEvent;
  2. import com.shuofxz.event.JobEventType;
  3. import org.apache.hadoop.yarn.event.EventHandler;
  4. import org.apache.hadoop.yarn.state.*;
  5. import java.util.EnumSet;
  6. import java.util.concurrent.locks.Lock;
  7. import java.util.concurrent.locks.ReadWriteLock;
  8. import java.util.concurrent.locks.ReentrantReadWriteLock;
  9. /*
  10. * 可参考 Yarn 中实现的状态机对象:
  11. * ResourceManager 中的 RMAppImpl、RMApp- AttemptImpl、RMContainerImpl 和 RMNodeImpl,
  12. * NodeManager 中 的 ApplicationImpl、 ContainerImpl 和 LocalizedResource,
  13. * MRAppMaster 中的 JobImpl、TaskImpl 和 TaskAttemptImpl 等
  14. * */
  15. @SuppressWarnings({"rawtypes", "unchecked"})
  16. public class JobStateMachine implements EventHandler<JobEvent> {
  17.     private final String jobID;
  18.     private EventHandler eventHandler;
  19.     private final Lock writeLock;
  20.     private final Lock readLock;
  21.     // 定义状态机
  22.     protected static final StateMachineFactory<JobStateMachine, JobStateInternal,
  23.             JobEventType, JobEvent>
  24.             stateMachineFactory = new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>(JobStateInternal.NEW)
  25.             .addTransition(JobStateInternal.NEW, JobStateInternal.INITED, JobEventType.JOB_INIT, new InitTransition())
  26.             .addTransition(JobStateInternal.INITED, JobStateInternal.SETUP, JobEventType.JOB_START, new StartTransition())
  27.             .addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING, JobEventType.JOB_SETUP_COMPLETED, new SetupCompletedTransition())
  28.             .addTransition(JobStateInternal.RUNNING, EnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED), JobEventType.JOB_COMPLETED, new JobTasksCompletedTransition())
  29.             .installTopology();
  30.     private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;
  31.     public JobStateMachine(String jobID, EventHandler eventHandler) {
  32.         this.jobID = jobID;
  33.         // 多线程异步处理,state 有可能被同时读写,使用读写锁来避免竞争
  34.         ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  35.         this.readLock = readWriteLock.readLock();
  36.         this.writeLock = readWriteLock.writeLock();
  37.         this.eventHandler = eventHandler;
  38.         stateMachine = stateMachineFactory.make(this);
  39.     }
  40.     protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
  41.         return stateMachine;
  42.     }
  43.     public static class InitTransition implements SingleArcTransition<JobStateMachine, JobEvent> {
  44.         @Override
  45.         public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
  46.             System.out.println("Receiving event " + jobEvent);
  47.             // do something...
  48.             // 完成后发送新的 Event —— JOB_START
  49.             jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_START));
  50.         }
  51.     }
  52.     public static class StartTransition implements SingleArcTransition<JobStateMachine, JobEvent> {
  53.         @Override
  54.         public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
  55.             System.out.println("Receiving event " + jobEvent);
  56.             jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_SETUP_COMPLETED));
  57.         }
  58.     }
  59.     public static class SetupCompletedTransition implements SingleArcTransition<JobStateMachine, JobEvent> {
  60.         @Override
  61.         public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
  62.             System.out.println("Receiving event " + jobEvent);
  63.             jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_COMPLETED));
  64.         }
  65.     }
  66.     public static class JobTasksCompletedTransition implements MultipleArcTransition<JobStateMachine, JobEvent, JobStateInternal> {
  67.         @Override
  68.         public JobStateInternal transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
  69.             System.out.println("Receiving event " + jobEvent);
  70.             // 这是多结果状态部分,因此需要人为制定后续状态
  71.             // 这里整个流程结束,设置一下对应的状态
  72.             boolean flag = true;
  73.             if (flag) {
  74.                 return JobStateInternal.SUCCEEDED;
  75.             } else {
  76.                 return JobStateInternal.KILLED;
  77.             }
  78.         }
  79.     }
  80.     @Override
  81.     public void handle(JobEvent jobEvent) {
  82.         try {
  83.             // 注意这里为了避免静态条件,使用了读写锁
  84.             writeLock.lock();
  85.             JobStateInternal oldState = getInternalState();
  86.             try {
  87.                 getStateMachine().doTransition(jobEvent.getType(), jobEvent);
  88.             } catch (InvalidStateTransitionException e) {
  89.                 System.out.println("Can't handle this event at current state!");
  90.             }
  91.             if (oldState != getInternalState()) {
  92.                 System.out.println("Job Transitioned from " + oldState + " to " + getInternalState());
  93.             }
  94.         } finally {
  95.             writeLock.unlock();
  96.         }
  97.     }
  98.     public JobStateInternal getInternalState() {
  99.         readLock.lock();
  100.         try {
  101.             return getStateMachine().getCurrentState();
  102.         } finally {
  103.             readLock.unlock();
  104.         }
  105.     }
  106.     public enum JobStateInternal {
  107.         NEW,
  108.         SETUP,
  109.         INITED,
  110.         RUNNING,
  111.         SUCCEEDED,
  112.         KILLED
  113.     }
  114. }
复制代码
二)状态机可视化

hadoop 中提供了状态机可视化的工具类 VisualizeStateMachine.java,可以拷贝到我们的工程中使用。
根据提示,运行需要三个参数:
  1. Usage: %s <GraphName> <class[,class[,...]]> <OutputFile>%n
复制代码

运行后会在项目根目录生成图文件 jsm.gv。
需要使用 graphviz工具将 gv 文件转换成 png 文件:
  1. # linux 安装
  2. yum install graphviz
  3. # mac 安装
  4. brew install graphviz
复制代码
转换:
  1. dot -Tpng jsm.gv > jsm.png
复制代码
可视化状态机展示:

再使用这个工具对 Yarn 中的 Application 状态进行展示:

三)如果不用状态机库

【思考】
如果不用状态机,代码结构会是什么样呢?
下面这样的代码,如果要增加或修改逻辑可能就是很痛苦的一件事情了。
  1. // 一堆的函数调用
  2. // 一堆的 if 嵌套
  3. // 或者 switch case
复制代码
三、总结

本节对 Yarn 状态机库进行了介绍。实际使用时会结合事件库、服务库一同使用。
状态机库的使用帮助代码结构更加的清晰,新增状态处理逻辑只需要增加一个状态类别,或者增加一个方法处理对应类型的事件即可。将整个处理逻辑进行了拆分,便于编写和维护。
参考文章:
源码|Yarn的事件驱动模型与状态机

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表