YARN/MRv2 ResourceManager端 源码分析1
2. ResourceManager端
Client端通过YarnRunner.submitJob()将Application提交给了ResourceManager。
连接Client与ResourceManager的协议为ClientRMProtocol,该协议的实现类为ClientRMService。
1) ClientRMService.java
Client端与ResourceManager交互的所有操作最终都是由ClientRMService中的操作实现的。以submitApplication()为例。
?
??
? 2) RMAppmanager.java
RMAppManager实现了EventHandler接口,代表该类是用于处理某种事件的
?
??
3) EventHandler
自此AsyncDispatcher将接管之后所有的事件分发,所有事件都将由AsyncDispatcher分发给对应的EventDispatcher。EventDispatcher会初始化处理该事件的类,并将事件交给创建的类来进行处理。以RMAppEventType.START事件为例,该类将分发给ApplicationEventDispatcher,然后由ApplicationEventDispatcher初始化RMApp的实现类RMAppImpl来处理。
?
??
?RMAppImpl.java
?
??
状态机的工作方式如下,以RMAppImpl中的状态机为例
?
/* 泛型参数从左到右依次为执行状态变换的类、封装状态的类、封装事件类型的类、封装事件的类 构造函数参数为该状态机的起始状态 */ private static final StateMachineFactory<RMAppImpl, RMAppState, RMAppEventType, RMAppEvent> stateMachineFactory = new StateMachineFactory<RMAppImpl, RMAppState, RMAppEventType, RMAppEvent>(RMAppState.NEW)/* 添加状态之间的变换以及变换时的需要进行的操作的封装类 以下即表示状态从RMAppState.NEW -> RMAppState.SUBMITTED 的触发事件类型为RMAppEventType.START事件 需要执行的方法被封装在StartAppAttemptTransition类中 */.addTransition(RMAppState.NEW, RMAppState.SUBMITTED,RMAppEventType.START, new StartAppAttemptTransition())......// 构建状态机.installTopology();// 封装执行状态变化所需的方法的类需要实现SingleArcTransition接口,以StartAppAttemptTransition为例// 每个状态机都会对应一个实现了SingleArcTransition接口的类,在这里为RMAppTransition// StartAppAttemptTransition通过继承RMAppTransition并实现transition方法,在该方法中实现状态变化的处理逻辑 private static final class StartAppAttemptTransition extends RMAppTransition {public void transition(RMAppImpl app, RMAppEvent event) { app.createNewAttempt();}; }??
至此,一个典型的由状态机分发事件并进行处理的相关类介绍完毕。总结如下:
(1) AsyncDispatcher根据接收到的事件按它的类分发给相应的EventDispatcher
(2) EventDispatcher初始化处理该类事件的类A,并将事件传递给A
(3) A调用状态机的doTransition方法确定该事件类型对应的状态变化和封装了需要执行的方法的类
(4) 实现了SingleArcTransition接口的类,将调用transition方法完成状态变换
由于状态机处理代码大都相同,以下将以事件为标题来描述状态变换和涉及的类和操作
4) RMAppEventType.START
EventDispatcher:ApplicationEventDispatcher
事件处理类:RMAppImpl
状态更新:RMAppState.NEW -> RMAppState.SUBMITTED
所需操作:创建RMAppAttemptImpl对象,初始化其状态为RMAppAttemptState.NEW
触发RMAppAttemptEventType.START事件
5) RMAppAttemptEventType.START
EventDispatcher:ApplicationAttemptEventDispatcher
事件处理类:RMAppAttemptImpl
状态更新:RMAppAttemptState.NEW -> RMAppAttemptState.SUBMITTED
所需操作:向ApplicationMasterService注册该AppAttempt
触发AppAddedSchedulerEvent事件
6) AppAddedSchedulerEvent
EventDispatcher:SchedulerEventDispatcher
事件处理类:FifoScheduler/CapacityScheduler
所需操作:创建SchedulerApp对象
触发RMAppAttemptEventType.APP_ACCEPTED事件
7) RMAppAttemptEventType.APP_ACCEPTED
EventDispatcher:ApplicationAttemptEventDispatcher
事件处理类:RMAppAttemptImpl
状态更新:RMAppAttemptState.SUBMITTED -> RMAppAttemptState.SCHEDULED
所需操作:调用ResourceScheduler的allocate函数,向ResourceManager申请运行 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ApplicationMaster需要的Container
触发RMAppEventType.APP_ACCEPTED事件
8) RMAppEventType.APP_ACCEPTED
EventDispatcher:ApplicationEventDispatcher
事件处理类:RMAppImpl
状态更新:RMAppState.SUBMITTED -> RMAppState.ACCEPTED
9) 某个NodeManager向ResourceManager发送心跳
? ? ? 10) ResourceManager的ResourceTrackerService收到心跳信息后触发封装了 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?RMNodeEventType.STATUS_UPDATE的RMNodeStatusEvent事件
? ? ? 11) RMNodeEventType.STATUS_UPDATE
EventDispatcher:ApplicationEventDispatcher
事件处理类:RMNodeImpl
状态更新:RMNodeState.RUNNING -> RMNodeState.RUNNING
所需操作:更新节点的健康状态
触发NodeUpdateSchedulerEvent事件
? ? ? 12) NodeUpdateSchedulerEvent
EventDispatcher:SchedulerEventDispatcher
事件处理类:FifoScheduler/CapacityScheduler
所需操作:创建SchedulerApp对象,调用assginContainers为该application分配一个 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? container,此时还未真正分配
触发RMContainerEventType.START事件
? ? ? 13) RMContainerEventType.START
EventDispatcher:NodeEventDispatcher
事件处理类:RMContainerImpl
状态更新:RMContainerState.NEW -> RMContainerState.ALLOCATED
所需操作:触发RMAppAttemptContainerAllocatedEvent
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (RMAppAttemptEventType.CONTAINER_ALLOCATED)事件
? ? ? 14) RMAppAttemptEventType.CONTAINER_ALLOCATED
EventDispatcher:ApplicationAttemptEventDispatcher
事件处理类:RMAppAttemptImpl
状态更新:RMAppAttemptState.SCHEDULED -> RMAppAttemptState.ALLOCATED
所需操作:调用Scheduler的allocate函数申请一个container
触发AMLauncherEventType.LAUNCH事件
? ? ? 15) AMLauncherEventType.LAUNCH
事件处理类:ApplicationMasterLauncher
状态更新:RMAppAttemptState.SCHEDULED -> RMAppAttemptState.ALLOCATED
所需操作:创建AMLauncher对象,并将其添加到队列masterEvents中;
LauncherThread不断从masterEvents取出,进行处理,并调用
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? AMLauncher.launch()函数;
AMLancher.launch()调用ContainerManager.startContainer()函数创建
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? container;
同时触发RMAppAttemptEventType.LAUNCHED事件。
? ? ? 16) RMAppAttemptEventType.LAUNCHED
EventDispatcher:ApplicationAttemptEventDispatcher
事件处理类:RMAppAttemptImpl
状态更新:RMAppAttemptState.ALLOCATED -> RMAppAttemptState.LAUNCHED
所需操作:向AMLivelinessMonitor注册,用于实时监控该Application的状态
? ? ? 17) 第9)步中向ResourceManager发送心跳的NodeManager,调用
? ? ? ? ? ? ?AMRMProtocol.registerApplicationMaster()向ApplicationMasterService进行注册
? ? ? 18) ApplicationMasterService.registerApplicationMaster()
从request中获取ApplicationAttempt的相关信息
触发RMAppAttemptEventType.REGISTERED事件
返回给调用端response
? ? ? 19) RMAppAttemptEventType.REGISTERED
EventDispatcher:ApplicationAttemptEventDispatcher
事件处理类:RMAppAttemptImpl
状态更新:RMAppAttemptState.LAUNCHED -> RMAppAttemptState.RUNNING
所需操作:设置Application注册后的信息,如运行的host、端口、TrackingURL等
触发RMAppEventType.ATTEMPT_REGISTERED事件
? ? ? 20) 触发RMAppEventType.REGISTERED
EventDispatcher:ApplicationAttemptEventDispatcher
事件处理类:RMAppAttemptImpl
状态更新:RMAppAttemptState.LAUNCHED -> RMAppAttemptState.RUNNING
所需操作:设置Application注册后的信息,如运行的host、端口、TrackingURL等
触发RMAppEventType.ATTEMPT_REGISTERED事件
? ? ? 21) RMAppEventType.ATTEMPT_REGISTERED
EventDispatcher:ApplicationEventDispatcher
事件处理类:RMAppImpl
状态更新:RMAppState.ACCEPTED -> RMAppState.RUNNING ??
至此,ApplicationMaster(MRAppMaster)创建完毕,之后Application的运行将由ApplicationMaster(MRAppMaster)接管,它将负责向ResourceManager申请运行子任务所需的资源,监控子任务的运行状态,并向ResourceManager汇报Application的运行状态
?