This class manages the list of applications for the resource manager.
一. 前言RMAppManager负责应用程序的启动和关闭。
ClientRMService收到来自客户端的提交应用程序请求后 将调用函数RMAppManager#submitApplication创建一个RMApp对象它将维护这个应用程序的整个生命周期 从开始运行到最终结束 当RMApp运行结束后 将向RMAppManager发送一个RMAppManagerEventType.APP_COMPLETED事件 它收到该事件后将调用 RMAppManager#finishApplication进行收尾工作 包括
❑将该应用程序放入已完成应用程序列表中 以便用户查询历史应用程序运行信息。需要注意的是 该列表的大小是有限的 默认是10000管理员可通过参数yarn.resourcemanager.max-completed-applications修改 当已完成应用程序数目超过该值 时 将从内存数据结构中移除移除的应用程序可称为“过期的应用程序” 这样用户只能通过History Server获取过期的应用程序信息 History Server是从磁盘文件中获取这些信息的应用程序会将运行日志和基本信息写到磁盘上 。
❑将应用程序从RMStateStore中移除。 RMStateStore记录了运行中的应用程序的运行日志 当集群故障重启后 ResourceManager可通过这些日志恢复应用程序运行状态 从而避免全部重新运行 一旦应用程序运行结束后 这些日志便失去了意义 故可以对其进行删除。 这属于ResourceManager容错机制的范畴。
二. 属性// application内存最大存储数量// yarn.resourcemanager.max-completed-applications : 1000private int maxCompletedAppsInMemory;// 最大存储容量// yarn.resourcemanager.state-store.max-completed-applications : maxCompletedAppsInMemory [1000]private int maxCompletedAppsInStateStore;// 完成application存储的数量protected int completedAppsInStateStore 0;// 已完成的applicationprotected LinkedList completedApps new LinkedList();// RM的 Contextprivate final RMContext rmContext;// 负责与ApplicationMaster通讯private final ApplicationMasterService masterService;// 调度器private final YarnScheduler scheduler;// 访问控制清单private final ApplicationACLsManager applicationACLsManager;//配置信息private Configuration conf;// 权限相关private YarnAuthorizationProvider authorizer;// 是否启动timelineServerprivate boolean timelineServiceV2Enabled;// 是否启动node labelprivate boolean nodeLabelsEnabled; 三. 构造方法
由ResourceManager# serviceInit构建. 主要做一些初始化操作.
public RMAppManager(RMContext context,YarnScheduler scheduler, ApplicationMasterService masterService,ApplicationACLsManager applicationACLsManager, Configuration conf) {this.rmContext context;this.scheduler scheduler;this.masterService masterService;this.applicationACLsManager applicationACLsManager;this.conf conf;// yarn.resourcemanager.max-completed-applications : 1000this.maxCompletedAppsInMemory conf.getInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);// yarn.resourcemanager.state-store.max-completed-applications : maxCompletedAppsInMemory [1000]this.maxCompletedAppsInStateStore conf.getInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,this.maxCompletedAppsInMemory);// 如果最大存储大于内存中的存储, 将最大存储设置为内存存储的数量if (this.maxCompletedAppsInStateStore > this.maxCompletedAppsInMemory) {this.maxCompletedAppsInStateStore this.maxCompletedAppsInMemory;}// 构建权限this.authorizer YarnAuthorizationProvider.getInstance(conf);// timelineService 是否启用 : yarn.timeline-service.versions 2 启用this.timelineServiceV2Enabled YarnConfiguration.timelineServiceV2Enabled(conf);// nodeLabel 是否启用 : yarn.node-labels.enabled : falsethis.nodeLabelsEnabled YarnConfiguration.areNodeLabelsEnabled(rmContext.getYarnConfiguration());} 四.finishApplication
只是将applicationId completedApps加入集合 , 完成的completedAppsInStateStore 数量 加 1.
protected synchronized void finishApplication(ApplicationId applicationId) {if (applicationId null) {LOG.error("RMAppManager received completed appId of null, skipping");} else {//验证权限// Inform the DelegationTokenRenewerif (UserGroupInformation.isSecurityEnabled()) {rmContext.getDelegationTokenRenewer().applicationFinished(applicationId);}// 设置状态completedApps.add(applicationId);completedAppsInStateStore;writeAuditLog(applicationId);}} 五.submitApplication
提交 Application , 交由Dispatcher执行RMAppEvent类型事件.
SuppressWarnings("unchecked")protected void submitApplication(ApplicationSubmissionContext submissionContext, long submitTime,String user) throws YarnException {// 获取applicationIdApplicationId applicationId submissionContext.getApplicationId();// 构建 RMAppImpl// 设置startTime为-1,在构造方法中会初始化// Passing start time as -1. It will be eventually set in RMAppImpl constructor.RMAppImpl application createAndPopulateNewRMApp( submissionContext, submitTime, user, false, -1, null);try {// 验证权限if (UserGroupInformation.isSecurityEnabled()) {this.rmContext.getDelegationTokenRenewer().addApplicationAsync(applicationId,BuilderUtils.parseCredentials(submissionContext),submissionContext.getCancelTokensWhenComplete(),application.getUser(),BuilderUtils.parseTokensConf(submissionContext));} else {// 处理任务 RMAppEvent 启动// Dispatcher这时候还没有启动,所以dispatcher应该在启动的时候处理队列中的event// Dispatcher is not yet started at this time, so these START events// enqueued should be guaranteed to be first processed when dispatcher// gets started.this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(applicationId, RMAppEventType.START));}} catch (Exception e) {LOG.warn("Unable to parse credentials for " applicationId, e);// Sending APP_REJECTED is fine, since we assume that the// RMApp is in NEW state and thus we havent yet informed the// scheduler about the existence of the applicationthis.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(applicationId,RMAppEventType.APP_REJECTED, e.getMessage()));throw RPCUtil.getRemoteException(e);}} 六. handle
Overridepublic void handle(RMAppManagerEvent event) {// 获取 applicationIdApplicationId applicationId event.getApplicationId();LOG.debug("RMAppManager processing event for " applicationId " of type " event.getType());switch (event.getType()) {// APP 完成case APP_COMPLETED :finishApplication(applicationId);// 打印日志logApplicationSummary(applicationId);// 检查app 数量限制. 并处理checkAppNumCompletedLimit();break;// APP 移动case APP_MOVE :// moveAllApps from scheduler will fire this event for each of// those applications which needed to be moved to a new queue.// Use the standard move application api to do the same.try {moveApplicationAcrossQueue(applicationId,event.getTargetQueueForMove());} catch (YarnException e) {LOG.warn("Move Application has failed: " e.getMessage());}break;default :LOG.error("Invalid eventtype " event.getType() ". Ignoring!");}} 七. moveApplicationAcrossQueue
moveToQueue将调用调度程序api来执行移动队列操作。
/*** moveToQueue will invoke scheduler api to perform move queue operation.** param applicationId* Application Id.* param targetQueue* Target queue to which this app has to be moved.* throws YarnException* Handle exceptions.*/public void moveApplicationAcrossQueue(ApplicationId applicationId, String targetQueue)throws YarnException {// 获取 applicationRMApp app this.rmContext.getRMApps().get(applicationId);// Capacity 调度器将会遵循以下规则// 1. 检查变更是正确的.// 2. 更新存储信息// 3. 执行实际操作,并且更新内存结构// Capacity scheduler will directly follow below approach.// 1. Do a pre-validate check to ensure that changes are fine.// 2. Update this information to state-store// 3. Perform real move operation and update in-memory data structures.synchronized (applicationId) {// 验证app是否为null 或者已经执行完if (app null || app.isAppInCompletedStates()) {return;}// 获取 源 queueString sourceQueue app.getQueue();// 1. 验证移动application请求是否有访问权限或者其他的错误,如果验证失败,抛出YarnException// 1. pre-validate move application request to check for any access// violations or other errors. If there are any violations, YarnException// will be thrown.//rmContext.getScheduler().preValidateMoveApplication(applicationId,targetQueue);// 2. 使用新的队列, 更新存储信息// 2. Update to state store with new queue and throw exception is failed.updateAppDataToStateStore(targetQueue, app, false);// 3. 执行真实操作 , 根据调度器的不同,执行 moveApplication 方法// 3. Perform the real move applicationString queue "";try {queue rmContext.getScheduler().moveApplication(applicationId,targetQueue);} catch (YarnException e) {// Revert to source queue since in-memory move has failed. Chances// of this is very rare as we have already done the pre-validation.updateAppDataToStateStore(sourceQueue, app, true);throw e;}// 更新内存信息 update in-memoryif (queue ! null }}rmContext.getSystemMetricsPublisher().appUpdated(app,System.currentTimeMillis());}