Hadoop2.x Yarn作业提交(服务端)


  RM接收到客户端作业提交请求时会通过RPC server做回应,其实客户端就是通过ApplicationClientProtocol的RPC客户端提交作业的,客户端的提交流程参见上篇文章,在提交阶段的代码中,首先会调用getNewApplication来获得一个GetNewApplicationResponse,该返回类中包含了APP的ApplicationId,调度器资源信息。需要注意的是在RM的服务端有多个RPCserver,服务于作业提交的server为ClientRMService,默认监听18032端口,可以通过yarn.resourcemanager.address配置,下面是服务端的getNewApplication,包含在ClientRMService.java中
@Override
public GetNewApplicationResponse getNewApplication(
    GetNewApplicationRequest request) throws YarnException {
  //构建GetNewApplicationResponse对象
  GetNewApplicationResponse response = recordFactory
      .newRecordInstance(GetNewApplicationResponse.class);
  //设置作业ID
  response.setApplicationId(getNewApplicationId());
  // 设置调度器资源信息,作业ID设置完后,接下来设置调度器资源,目前包括CPU 内存两部分信息,相关函数有:yarn.scheduler.minimum-allocation-mb yarn.scheduler.minimum-allocation-vcores yarn.scheduler.maximum-allocation-mb yarn.scheduler.maximum-allocation-vcores,这些信息在调度器启动时指定。


  response.setMaximumResourceCapability(scheduler
      .getMaximumResourceCapability());       
  
  return response;
}
作业ID的获得通过getNewApplicationId,是由集群启动时间戳和计数器计算得来
ApplicationId getNewApplicationId() {
  ApplicationId applicationId = org.apache.hadoop.yarn.server.utils.BuilderUtils
      .newApplicationId(recordFactory, ResourceManager.getClusterTimeStamp(),
          applicationCounter.incrementAndGet());
  LOG.info("Allocated new applicationId: " + applicationId.getId());
  return applicationId;
}
作业ID的构建函数
public static ApplicationId newInstance(long clusterTimestamp, int id) {
  ApplicationId appId = Records.newRecord(ApplicationId.class);
  appId.setClusterTimestamp(clusterTimestamp);
  appId.setId(id);
  appId.build();
  return appId;
}
在客户端接收到返回信息后,便知道了自己的作业ID、资源分配的最大值,下面进入提交阶段,依然在ClientRMService中
@Override
public SubmitApplicationResponse submitApplication(
    SubmitApplicationRequest request) throws YarnException {
  //获得提交上下文
  ApplicationSubmissionContext submissionContext = request
      .getApplicationSubmissionContext();
  //获得作业ID
  ApplicationId applicationId = submissionContext.getApplicationId();


  // ApplicationSubmissionContext needs to be validated for safety - only
  // those fields that are independent of the RM's configuration will be
  // checked here, those that are dependent on RM configuration are validated
  // in RMAppManager.
	//进入一系列的安全校验
  String user = null;
  try {
    // 提交账户是否安全
    user = UserGroupInformation.getCurrentUser().getShortUserName();
  } catch (IOException ie) {
    LOG.warn("Unable to get the current user.", ie);
    RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
        ie.getMessage(), "ClientRMService",
        "Exception in submitting application", applicationId);
    throw RPCUtil.getRemoteException(ie);
  }


  // Though duplication will checked again when app is put into rmContext,
  // but it is good to fail the invalid submission as early as possible.
  //作业ID是否已经存在
  if (rmContext.getRMApps().get(applicationId) != null) {
    String message = "Application with id " + applicationId +
        " is already present! Cannot add a duplicate!";
    LOG.warn(message);
    RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
        message, "ClientRMService", "Exception in submitting application",
        applicationId);
    throw RPCUtil.getRemoteException(message);
  }
	//设置作业队列
  if (submissionContext.getQueue() == null) {
    submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
  }
  //设置作业缺省名称
  if (submissionContext.getApplicationName() == null) {
    submissionContext.setApplicationName(
        YarnConfiguration.DEFAULT_APPLICATION_NAME);
  }
  //设置作业类型
  if (submissionContext.getApplicationType() == null) {
    submissionContext
      .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
  } else {
  	//作业类型长度???有什么用?
    if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
      submissionContext.setApplicationType(submissionContext
        .getApplicationType().substring(0,
          YarnConfiguration.APPLICATION_TYPE_LENGTH));
    }
  }


  try {
    // call RMAppManager to submit application directly
    //开始提交作业
    rmAppManager.submitApplication(submissionContext,
        System.currentTimeMillis(), user, false, null);


    LOG.info("Application with id " + applicationId.getId() + 
        " submitted by user " + user);
    RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
        "ClientRMService", applicationId);
  } catch (YarnException e) {
    LOG.info("Exception in submitting application with id " +
        applicationId.getId(), e);
    RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
        e.getMessage(), "ClientRMService",
        "Exception in submitting application", applicationId);
    throw e;
  }


  SubmitApplicationResponse response = recordFactory
      .newRecordInstance(SubmitApplicationResponse.class);
  return response;
}
作业提交阶段
@SuppressWarnings("unchecked")
protected void submitApplication(
    ApplicationSubmissionContext submissionContext, long submitTime,
    String user, boolean isRecovered, RMState state) throws YarnException {
  //获得作业ID
  ApplicationId applicationId = submissionContext.getApplicationId();
  //构建一个app并放入applicationACLS
  RMAppImpl application =
      createAndPopulateNewRMApp(submissionContext, submitTime, user);
  //判断是否需要恢复
  if (isRecovered) {
    recoverApplication(state, application);
    RMAppState rmAppState =
        state.getApplicationState().get(applicationId).getState();
    if (isApplicationInFinalState(rmAppState)) {
      // We are synchronously moving the application into final state so that
      // momentarily client will not see this application in NEW state. Also
      // for finished applications we will avoid renewing tokens.
      application
          .handle(new RMAppEvent(applicationId, RMAppEventType.RECOVER));
      return;
    }
  }
  
  if (UserGroupInformation.isSecurityEnabled()) {
    Credentials credentials = null;
    try {
      credentials = parseCredentials(submissionContext);
    } catch (Exception e) {
      LOG.warn(
          "Unable to parse credentials.", e);
      // Sending APP_REJECTED is fine, since we assume that the
      // RMApp is in NEW state and thus we haven't yet informed the
      // scheduler about the existence of the application
      assert application.getState() == RMAppState.NEW;
      this.rmContext.getDispatcher().getEventHandler().handle(
          new RMAppRejectedEvent(applicationId, e.getMessage()));
      throw RPCUtil.getRemoteException(e);
    }
    this.rmContext.getDelegationTokenRenewer().addApplication(
        applicationId, credentials,
        submissionContext.getCancelTokensWhenComplete(), isRecovered);
  } else {
  	//触发app启动事件
    this.rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId,
            isRecovered ? RMAppEventType.RECOVER : RMAppEventType.START));
  }
}
application在下面函数中创建并加入相应集合,如果加入集合成功则代表作业提交成功
private RMAppImpl createAndPopulateNewRMApp(
    ApplicationSubmissionContext submissionContext,
    long submitTime, String user)
    throws YarnException {
  ApplicationId applicationId = submissionContext.getApplicationId();
  validateResourceRequest(submissionContext);
  //构建APP,submissionContext中包含了一个APP的绝大部分信息
  RMAppImpl application =
      new RMAppImpl(applicationId, rmContext, this.conf,
          submissionContext.getApplicationName(), user,
          submissionContext.getQueue(),
          submissionContext, this.scheduler, this.masterService,
          submitTime, submissionContext.getApplicationType());


  // Concurrent app submissions with same applicationId will fail here
  // Concurrent app submissions with different applicationIds will not
  // influence each other
  //再次判断作业是否存在,若不存在则放入hashMap中,一旦放入成功则表明作业提交成功
  if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
      null) {
    String message = "Application with id " + applicationId
        + " is already present! Cannot add a duplicate!";
    LOG.warn(message);
    throw RPCUtil.getRemoteException(message);
  }
  // Inform the ACLs Manager
  this.applicationACLsManager.addApplication(applicationId,
      submissionContext.getAMContainerSpec().getApplicationACLs());
  return application;
}
一个app包含的信息比较多,如下(RMAppImpl.java)
private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
private static final String UNAVAILABLE = "N/A";


// Immutable fields
private final ApplicationId applicationId;
private final RMContext rmContext;
private final Configuration conf;
private final String user;
private final String name;
private final ApplicationSubmissionContext submissionContext;
private final Dispatcher dispatcher;
private final YarnScheduler scheduler;
private final ApplicationMasterService masterService;
private final StringBuilder diagnostics = new StringBuilder();
private final int maxAppAttempts;
private final ReadLock readLock;
private final WriteLock writeLock;
private final Map<ApplicationAttemptId, RMAppAttempt> attempts
    = new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
private final long submitTime;
private final Set<RMNode> updatedNodes = new HashSet<RMNode>();
private final String applicationType;


// Mutable fields
private long startTime;
private long finishTime = 0;
private long storedFinishTime = 0;
private RMAppAttempt currentAttempt;
private String queue;
@SuppressWarnings("rawtypes")
private EventHandler handler;
private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
private static final AppFinishedTransition FINISHED_TRANSITION =
    new AppFinishedTransition();


// These states stored are only valid when app is at killing or final_saving.
private RMAppState stateBeforeKilling;
private RMAppState stateBeforeFinalSaving;
private RMAppEvent eventCausingFinalSaving;
private RMAppState targetedFinalState;
private RMAppState recoveredFinalState;


Object transitionTodo;

相关内容