Hadoop-2.2.0中文文档——MapReduce--写YARN应用


目的

这份文档在一个高层次上描述了为YARN实现新应用的方式。

概念和流

普遍的概念是一个 '一个应用提交客户端' 提交一个 '应用' 给 YARN资源管理器。客户端和服务端用 'ApplicationClientProtocol' 通信,若需要会首次通过ApplicationClientProtoco#getNewApplicationl获取一个新的 'ApplicationId' ,然后通过ApplicationClientProtocol#submitApplication提交 'Application' 以执行。作为 ApplicationClientProtocol#submitApplication 请求的一部分, 客户端需要提供充足的信息给 ResourceManager 去 '启动' 应用的应用的第一个容器,例如, ApplicationMaster。 你需要提供你的应用能运行的如关于本地文件/jar的信息,实际要执行的命令(有必要的命令行参数),任何Unix环境设置(可选)等。你需要为你的ApplicationManager描述Unix进程以便有效地执行。


YARN ResourceManager 然后会在一个分配的容器上启动(指定的) ApplicationMaster。ApplicationManager 被期望 用 'ApplicationMasterProtocol' 和 ResourceManager 通信。首先, ApplicationMaster 需要把自己注册到ResourceManager. 要完成非配给它的任务, ApplicationMaster 然后会通过 ApplicationMasterProtocol#allocate 请求接收容器. 一个容器分配给了它之后, ApplicationMaster 采用 ContainerManager#startContainer 与 NodeManager 通信 去为它的任务启动容器。作为启动容器的一部分, ApplicationMaster 必须指定 ContainerLaunchContext,这与 ApplicationSubmissionContext 相似, 包含诸如命令行说明,环境等的启动信息。一旦任务完成了, ApplicationMaster 必须通过 ApplicationMasterProtocol#finishApplicationMaster 告知 ResourceManager 关于它的任务的完成情况。

同时,客户端可以通过查询ResourceManager 或 直接查询 ApplicationManager (如果支持) 来监控应用的状态。如果需要,它会通过ApplicationClientProtocol#forceKillApplication 关闭应用。

接口

你最可能关注的接口是 :

  • ApplicationClientProtocol - Client<-->ResourceManager
  • 此协议提供给 想要与 ResourceManager 通信来启动一个新应用的客户端 (例如.  ApplicationMaster), 检查应用的状态或关闭应用。例如,一个 job-client (来自网管的 job 启动程序) 会使用这个协议。
  • ApplicationMasterProtocol - ApplicationMaster<-->ResourceManager
    此协议由  ApplicationMaster用于 去注册/注销 它自己 到/从  ResourceManager, 也从 Scheduler 请求资源以完成它的任务。
  • ContainerManager - ApplicationMaster<-->NodeManager
    此协议由 ApplicationMaster 告知 NodeManager 去启动/停止 容器,如需要获得容器更新的状态。

写一个简单的 Yarn 应用

写一个简单的客户端

  • 一个客户端需要做的第一步是连接 ResourceManager ,或者更确切地说是 ResourceManager 的接口 ApplicationsManager (AsM)。
  ApplicationClientProtocol applicationsManager; 
    YarnConfiguration yarnConf = new YarnConfiguration(conf);
    InetSocketAddress rmAddress = 
        NetUtils.createSocketAddr(yarnConf.get(
            YarnConfiguration.RM_ADDRESS,
            YarnConfiguration.DEFAULT_RM_ADDRESS));             
    LOG.info("Connecting to ResourceManager at " + rmAddress);
    configuration appsManagerServerConf = new Configuration(conf);
    appsManagerServerConf.setClass(
        YarnConfiguration.YARN_SECURITY_INFO,
        ClientRMSecurityInfo.class, SecurityInfo.class);
    applicationsManager = ((ApplicationClientProtocol) rpc.getProxy(
        ApplicationClientProtocol.class, rmAddress, appsManagerServerConf));    
  • 一旦一个到ASM的句柄获得了,客户端需要向 ResourceManager 请求一个新的 ApplicationId.
 GetNewApplicationRequest request = 
        Records.newRecord(GetNewApplicationRequest.class);              
    GetNewApplicationResponse response = 
        applicationsManager.getNewApplication(request);
    LOG.info("Got new ApplicationId=" + response.getApplicationId());
  • ASM 对一个新应用的返回中包含诸如 集群的最小/最大 资源容量 的集群信息。需要这些信息以便你可以正确地设置能启动ApplicationManager 的容器说明 。请转到 GetNewApplicationResponse 以获得更多细节。
  • 一个客户端的关键是设置定义了所有ResourceManager 需要启动ApplicationManager 的信息。一个客户端需要在上下文中按照如下设置:
    • Application Info: id, name
    • Queue, Priority info: 应用要提交给哪个Queue ,给应用分配的优先级。
    • User: 提交应用的用户
    • ContainerLaunchContext: 定义 ApplicationMaster 会启动和运行的容器的信息。如前所述, ContainerLaunchContext, 定义所有运行 ResourceManager 需要的信息,诸如本地资源(二进制,jar,文件等),安全标识,环境设置(CLASSPATH等)和要执行的命令。
  //创建一个新的 ApplicationSubmissionContext
    ApplicationSubmissionContext appContext = 
        Records.newRecord(ApplicationSubmissionContext.class);
    //设置 ApplicationId 
    appContext.setApplicationId(appId);
    //设置 application 名称
    appContext.setApplicationName(appName);
    
    // 创建一个新的容器为AM容器 启动上下文
    ContainerLaunchContext amContainer = 
        Records.newRecord(ContainerLaunchContext.class);

    // 定义需要的本地资源
    Map<String, LocalResource> localResources = 
        new HashMap<String, LocalResource>();
    // 假设 ApplicationMaster 需要的jar包在 HDFS上的一个已知路径上可用,我们想让它对 
    // 在启动了的容器中的 ApplicationMaster 可用
    Path jarPath; // <- jar包 所在的已知路径
    FileStatus jarStatus = fs.getFileStatus(jarPath);
    LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
    // 设置资源的类型 - 文件或归档
    // 归档未解压到框架上的目标地址
    amJarRsrc.setType(LocalResourceType.FILE);
    // 设置资源的可见度
    // 设置最 private 的选项,例如,这个资源只对正运行应用的实例可见
    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);          
    // 设置资源要被拷贝到工作目录的地址
    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); 
    // 设置时间戳和文件长度以便框架能在复制完成后对本地文件做完整性检查 
    // 以确保它是客户端要在应用中使用的同样的资源
    amJarRsrc.setTimestamp(jarStatus.getModificationTime());
    amJarRsrc.setSize(jarStatus.getLen());
    // 框架会在工作目录中创建一个符号链接,叫做 AppMaster.jar 
    // 这个链接会指向实际的文件
    // ApplicationMaster 如果需要引用jar 包,需要使用 符号链接文件名
    localResources.put("AppMaster.jar",  amJarRsrc);    
    // 设置本地资源到启动上下文
    amContainer.setLocalResources(localResources);

    // 设置启动上下文需要的环境
    Map<String, String> env = new HashMap<String, String>();    
    // 例如,我们要设置classpath
    // 假设我们的类或jar在命令行运行的工作目录作为本地资源可用
    // 我们会在路径后面追加 “.”
    // 默认情况下,所有的hadoop声明classpath 已经在$CLASSPATH中已经可用
    // 所以我们需要仔细,不要把它覆盖了
    String classPathEnv = "$CLASSPATH:./*:";    
    env.put("CLASSPATH", classPathEnv);
    amContainer.setEnvironment(env);
    
    // 在启动的容器上构建要执行的命令
    String command = 
        "${JAVA_HOME}" + /bin/java" +
        " MyAppMaster" + 
        " arg1 arg2 arg3" + 
        " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
        " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";                     

    List<String> commands = new ArrayList<String>();
    commands.add(command);
    // 如需要,添加命令                

    // 在容器声明中设置命令数组
    amContainer.setCommands(commands);
    
    // 为容器定义需要的资源
    // 现在 YARN 仅支持内存,所以我们设置内存
    // 如果处理占用了给它分配的内存,它将被框架关闭
    // 被请求的内存应该小于集群的最大容量,所有的请求应该是多个最小的容量
    Resource capability = Records.newRecord(Resource.class);
    capability.setMemory(amMemory);
    amContainer.setResource(capability);
    
    // 在 ApplicationSubmissionContext 设置容器启动上下文
    appContext.setAMContainerSpec(amContainer);
  • 在设置处理结束后,客户端最终准备提交任务到ASM
 // 创建发给 ApplicationsManager 的请求
    SubmitApplicationRequest appRequest = 
        Records.newRecord(SubmitApplicationRequest.class);
    appRequest.setApplicationSubmissionContext(appContext);

    // 提交应用到 ApplicationsManager
    // 成功返回一个合法的对象或由于失败而抛出异常,都忽略返回
    applicationsManager.submitApplication(appRequest);

  • 此刻, ResourceManager 已经在后台接受了应用,正在根据要求的说明分配一个容器,然后最终在分配的容器上设置和启动ApplicationMaster 。
  • 一个客户端跟踪实际任务的流程有多种方式。
    • 它能和 ResourceManager 通信并通过ApplicationClientProtocol#getApplicationReport 请求一个关于应用的报告。
 GetApplicationReportRequest reportRequest = 
          Records.newRecord(GetApplicationReportRequest.class);
      reportRequest.setApplicationId(appId);
      GetApplicationReportResponse reportResponse = 
          applicationsManager.getApplicationReport(reportRequest);
      ApplicationReport report = reportResponse.getApplicationReport();
从 ResourceManager 收到包含如下的ApplicationReport :
      • 普通应用信息: ApplicationId, 应用提交给的队列 ,提交应用的用户和应用的启动时间。
      • ApplicationMaster 细节: ApplicationMaster 运行在哪个主机上,rpc 端口(如有) 正在监听来自客户端的请求和客户端需要联系 ApplicationMaster 用的 词.
      • Application 跟踪信息: 如果应用支持一些格式的进度跟踪,它可以通过 ApplicationReport#getTrackingUrl 设置一个客户端可以监控进度的跟踪url。
      • ApplicationStatus:  ResourceManager 可以通过 ApplicationReport#getYarnApplicationState 看到的应用状态。如果 YarnApplicationState 被设置成 FINISHED, 客户端应该转到  ApplicationReport#getFinalApplicationStatus 去检查应用任务自身的实际 成功/失败。假设失败了,ApplicationReport#getDiagnostics 可能对失败抛出一些有用的信息。
    • 如果 ApplicationMaster支持, 客户端可以直接通过端口查找  ApplicationMaster自身的更新进程:从 ApplicationReport 获得rpc 端口信息。如果可以,也可用从报告中获得的跟踪url。
在一些情况下,由于一些原因应用运行了太长时间,客户端可能希望关闭应用。 ApplicationClientProtocol 支持 forceKillApplication 请求,允许一个客户端通过 ResourceManager  送一个杀死信号 给 ApplicationMaster 。一个 ApplicationMaster 如果是这样设计的,也可能支持一个通过它的rpc 层的关闭请求。
   KillApplicationRequest killRequest = 
        Records.newRecord(KillApplicationRequest.class);                
    killRequest.setApplicationId(appId);
    applicationsManager.forceKillApplication(killRequest);      

写一个 ApplicationMaster

  • ApplicationMaster 是job 的实际拥有着。它会被 ResourceManager 启动,通过客户端会被提供关于它要监管的和完成的job的所有必要信息和资源。
  • 由于 ApplicationMaster 是随一个容器启动,有可能和别的容器共享一个物理主机,假设是多租用的情况,除去所有别的问题,不能做任何如它能监听的端口被预设置了。
  • 当 ApplicationMaster 启动,通过环境,多个参数对其可用。这些包含 ApplicationMaster 容器的 ContainerId ,应用提交时间和关于NodeManager主机运行的 Application Master 的细节。参考 ApplicationConstants 可看参数名。
  • 和 ResourceManager 的所有交互需要一个 ApplicationAttemptId (可能失败,每个应用会有多次尝试). ApplicationAttemptId 能从 ApplicationMaster containerId 获得. 有帮助 api 可把从环境中获得的值转成对象 。
 Map<String, String> envs = System.getenv();
    String containerIdString = 
        envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
    if (containerIdString == null) {
      // 容器id应该始终由框架设置在环境中
      throw new IllegalArgumentException(
          "ContainerId not set in the environment");
    }
    ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
    ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
  • 一个 ApplicationMaster 完全初始化自己之后,它需要通过 ApplicationMasterProtocol#registerApplicationMaster 注册到 ResourceManager 。ApplicationMaster 始终通过 ResourceManager 的接口 Scheduler 通信。
  // 连接 ResourceManager 的 Scheduler
   YarnConfiguration yarnConf = new YarnConfiguration(conf);
    InetSocketAddress rmAddress = 
        NetUtils.createSocketAddr(yarnConf.get(
            YarnConfiguration.RM_SCHEDULER_ADDRESS,
            YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));           
    LOG.info("Connecting to ResourceManager at " + rmAddress);
    ApplicationMasterProtocol resourceManager = 
        (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class, rmAddress, conf);

    //注册一个 AM 到 RM
    // 在注册请求中设置必要的信息
    // ApplicationAttemptId, 
    // app master 运行的主机
    // app master 从客户端接收请求的 rpc 端口
    // 客户端跟踪app master 进度的跟踪url 
    RegisterApplicationMasterRequest appMasterRequest = 
        Records.newRecord(RegisterApplicationMasterRequest.class);
    appMasterRequest.setApplicationAttemptId(appAttemptID);     
    appMasterRequest.setHost(appMasterHostname);
    appMasterRequest.setRpcPort(appMasterRpcPort);
    appMasterRequest.setTrackingUrl(appMasterTrackingUrl);

    // 注册返回有用,因为返回了关于集群的信息
    // 与在客户端中的 GetNewApplicationResponse 相似,
    // 它提供了ApplicationMaster 请求容器时需要的 集群的 最小/最大资源容量
    RegisterApplicationMasterResponse response = 
        resourceManager.registerApplicationMaster(appMasterRequest);
  • ApplicationMaster 发出自己还活着并运行着的心跳以保持通知给 ResourceManager。 ResourceManager 的时间延迟 由YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS默认配置设定。ApplicationMasterProtocol#allocate 调用 ResourceManager 为心跳计数也支持发送进度更新信息。因此,如果有一个合法的方式心跳调用ResourceManager,一个无容器请求的分配调用和进度信息将被更新。
  • 在任务需要的基础上, ApplicationMaster可以请求一系列的容器来运行它的任务。ApplicationMaster 必须用 ResourceRequest 类定义如下的容器说明:
    • 主机名: 如果容器被要求放置在一个特定的机架或主机上。'*' 可以用来代表任意主机。
    • 资源容量: 当前, YARN 只支持基于内存的资源需求,所以请求应该定义需要多少内存。这个值用MB定义并且必须小于集群的最大容量和一个现存的最小的容量。内存资源与机架容器上的物理内存相关。
    • 优先级: 当请求多个容器时,一个 ApplicationMaster 可能给每一个定义不同的优先级。例如, Map-Reduce ApplicationMaster 可能需要分配以较高的优先级给 Map 任务和一个较低的优先级给 Reduce 任务的容器.
    // 资源请求
    ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);

    // 设置主机需求
    // 需要哪个机架/主机
    // 对重要的应用有用
    // 数据本地化
    rsrcRequest.setHostName("*");

    // 设置请求的优先级
    Priority pri = Records.newRecord(Priority.class);
    pri.setPriority(requestPriority);
    rsrcRequest.setPriority(pri);           

    // 设置需要的资源类型
    // 现在,只支持内存,所以我们设置内存
    Resource capability = Records.newRecord(Resource.class);
    capability.setMemory(containerMemory);
    rsrcRequest.setCapability(capability);

    // 设置需要容器的数量
    // 匹配需求
    rsrcRequest.setNumContainers(numContainers);
定义了容器需要后, ApplicationMaster 必须创建一个 AllocateRequest 以发给 ResourceManager. AllocateRequest 包括:
  • 被请求的容器: 被来自ResourceManager 的 ApplicationMaster 请求的容器说明和容器数量。
  • 被释放的容器: 有些情况下,ApplicationMaster 可能已经请求了超过它需要的容器或由于失败而决定分配别的容器给它。在所有的情况下,如果 ApplicationMaster 释放这些容器回到 ResourceManager 以便他们能够重新分配给别的应用,这对集群是有益的。
  • ResponseId: 返回 id 会从从请求调用中返回。
  • 进度更新信息: ApplicationMaster 能够发送它的进度更新(范围是0到1) 给 ResourceManager.
   List<ResourceRequest> requestedContainers;
    List<ContainerId> releasedContainers    
    AllocateRequest req = Records.newRecord(AllocateRequest.class);

    // The response id set in the request will be sent back in 
    // the response so that the ApplicationMaster can 
    // match it to its original ask and act appropriately.
    req.setResponseId(rmRequestID);
    
    // Set ApplicationAttemptId 
    req.setApplicationAttemptId(appAttemptID);
    
    // Add the list of containers being asked for 
    req.addAllAsks(requestedContainers);
    
    // If the ApplicationMaster has no need for certain 
    // containers due to over-allocation or for any other
    // reason, it can release them back to the ResourceManager
    req.addAllReleases(releasedContainers);
    
    // Assuming the ApplicationMaster can track its progress
    req.setProgress(currentProgress);
    
    AllocateResponse allocateResponse = resourceManager.allocate(req);       
  • 从 ResourceManager发回的 AllocateResponse 提供了如下信息:
    • 重启标识: ApplicationMaster 可能与 ResourceManager 脱离同步的情况下。
    • 分配的容器: 已经分配给 ApplicationMaster 的容器.
    • Headroom: 集群中资源的 Headroom . 基于这个信息知道他的需要,一个 ApplicationMaster 可以做出明智的决定,例如重新分配二级任务利用当前容器的优先级,资源不被利用的话尽快释放等。
    • 完成了的容器: 一旦一个 ApplicationMaster 触发启动一个分配的容器,容器完成时它将从 ResourceManager 接到一个更新。ApplicationMaster 能查看完成了的容器的状态,采取适当的动作,如失败的话尝试重启一个特定的二级任务。
    • 集群节点的数量: 集群中可用主机的数量。
有一件事要注意的是,容器不会立刻被分配给 ApplicationMaster. 这并不是说 ApplicationMaster 应该持续请求需要的容器中未给的那些。一旦一个分配请求发出去了, ApplicationMaster 将会被根据集群大小,优先级和调度策略来分配容器。 当且仅当它的原预估改变了而需要额外的容器时,ApplicationMaster 才再次请求容器。
 // Retrieve list of allocated containers from the response 
    // and on each allocated container, lets assume we are launching 
    // the same job.
    List<Container> allocatedContainers = allocateResponse.getAllocatedContainers();
    for (Container allocatedContainer : allocatedContainers) {
      LOG.info("Launching shell command on a new container."
          + ", containerId=" + allocatedContainer.getId()
          + ", containerNode=" + allocatedContainer.getNodeId().getHost() 
          + ":" + allocatedContainer.getNodeId().getPort()
          + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
          + ", containerState" + allocatedContainer.getState()
          + ", containerResourceMemory"  
          + allocatedContainer.getResource().getMemory());
          
          
      // Launch and start the container on a separate thread to keep the main 
      // thread unblocked as all containers may not be allocated at one go.
      LaunchContainerRunnable runnableLaunchContainer = 
          new LaunchContainerRunnable(allocatedContainer);
      Thread launchThread = new Thread(runnableLaunchContainer);        
      launchThreads.add(launchThread);
      launchThread.start();
    }

    // Check what the current available resources in the cluster are
    Resource availableResources = allocateResponse.getAvailableResources();
    // Based on this information, an ApplicationMaster can make appropriate 
    // decisions

    // Check the completed containers
    // Let's assume we are keeping a count of total completed containers, 
    // containers that failed and ones that completed successfully.                     
    List<ContainerStatus> completedContainers = 
        allocateResponse.getCompletedContainersStatuses();
    for (ContainerStatus containerStatus : completedContainers) {                               
      LOG.info("Got container status for containerID= " 
          + containerStatus.getContainerId()
          + ", state=" + containerStatus.getState()     
          + ", exitStatus=" + containerStatus.getExitStatus() 
          + ", diagnostics=" + containerStatus.getDiagnostics());

      int exitStatus = containerStatus.getExitStatus();
      if (0 != exitStatus) {
        // container failed 
        // -100 is a special case where the container 
        // was aborted/pre-empted for some reason 
        if (-100 != exitStatus) {
          // application job on container returned a non-zero exit code
          // counts as completed 
          numCompletedContainers.incrementAndGet();
          numFailedContainers.incrementAndGet();                                                        
        }
        else { 
          // something else bad happened 
          // app job did not complete for some reason 
          // we should re-try as the container was lost for some reason
          // decrementing the requested count so that we ask for an
          // additional one in the next allocate call.          
          numRequestedContainers.decrementAndGet();
          // we do not need to release the container as that has already 
          // been done by the ResourceManager/NodeManager. 
        }
        }
        else { 
          // nothing to do 
          // container completed successfully 
          numCompletedContainers.incrementAndGet();
          numSuccessfulContainers.incrementAndGet();
        }
      }
    }
  • 一个容器已经被分配给 ApplicationMaster 之后, 它需要为最终任务在其上运行而执行在ContainerLaunchContext设置中的相似的流程。一旦 ContainerLaunchContext 被定义了, ApplicationMaster 可以与 ContainerManager 通信来启动它的容器。
      
    //Assuming an allocated Container obtained from AllocateResponse
    Container container;   
    // Connect to ContainerManager on the allocated container 
    String cmIpPortStr = container.getNodeId().getHost() + ":" 
        + container.getNodeId().getPort();              
    InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);               
    ContainerManager cm = 
        (ContainerManager)rpc.getProxy(ContainerManager.class, cmAddress, conf);     

    // Now we setup a ContainerLaunchContext  
    ContainerLaunchContext ctx = 
        Records.newRecord(ContainerLaunchContext.class);

    ctx.setContainerId(container.getId());
    ctx.setResource(container.getResource());

    try {
      ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
    } catch (IOException e) {
      LOG.info(
          "Getting current user failed when trying to launch the container",
          + e.getMessage());
    }

    // Set the environment 
    Map<String, String> unixEnv;
    // Setup the required env. 
    // Please note that the launched container does not inherit 
    // the environment of the ApplicationMaster so all the 
    // necessary environment settings will need to be re-setup 
    // for this allocated container.      
    ctx.setEnvironment(unixEnv);

    // Set the local resources 
    Map<String, LocalResource> localResources = 
        new HashMap<String, LocalResource>();
    // Again, the local resources from the ApplicationMaster is not copied over 
    // by default to the allocated container. Thus, it is the responsibility 
          // of the ApplicationMaster to setup all the necessary local resources 
          // needed by the job that will be executed on the allocated container. 
      
    // Assume that we are executing a shell script on the allocated container 
    // and the shell script's location in the filesystem is known to us. 
    Path shellScriptPath; 
    LocalResource shellRsrc = Records.newRecord(LocalResource.class);
    shellRsrc.setType(LocalResourceType.FILE);
    shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);          
    shellRsrc.setResource(
        ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
    shellRsrc.setTimestamp(shellScriptPathTimestamp);
    shellRsrc.setSize(shellScriptPathLen);
    localResources.put("MyExecShell.sh", shellRsrc);

    ctx.setLocalResources(localResources);                      

    // Set the necessary command to execute on the allocated container 
    String command = "/bin/sh ./MyExecShell.sh"
        + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
        + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";

    List<String> commands = new ArrayList<String>();
    commands.add(command);
    ctx.setCommands(commands);

    // Send the start request to the ContainerManager
    StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
    startReq.setContainerLaunchContext(ctx);
    cm.startContainer(startReq);
  • 如前所述, ApplicationMaster 将会从 ApplicationMasterProtocol#allocate 调用中获得完成了的容器的更新。它也能通过查询ContainerManager 监控它的启动容器的状态。
  GetContainerStatusRequest statusReq = 
        Records.newRecord(GetContainerStatusRequest.class);
    statusReq.setContainerId(container.getId());
    GetContainerStatusResponse statusResp = cm.getContainerStatus(statusReq);
    LOG.info("Container Status"
        + ", id=" + container.getId()
        + ", status=" + statusResp.getStatus());

问题

我如何发布我的应用 jar包到 YARN 集群上的所有需要它的节点上?

你可以用 LocalResource 添加资源到你的应用请求。这回引发YARN 发布资源到 ApplicationMaster 节点上。如果资源是一个 tgz, zip, or jar - 你可以用 YARN 来解压。然后,所有你需要做的就是添加解压文件夹到你的classpath。例如,创建你的应用请求时:

  File packageFile = new File(packagePath);
    Url packageUrl = ConverterUtils.getYarnUrlFromPath(
        FileContext.getFileContext.makeQualified(new Path(packagePath)));

    packageResource.setResource(packageUrl);
    packageResource.setSize(packageFile.length());
    packageResource.setTimestamp(packageFile.lastModified());
    packageResource.setType(LocalResourceType.ARCHIVE);
    packageResource.setVisibility(LocalResourceVisibility.APPLICATION);

    resource.setMemory(memory)
    containerCtx.setResource(resource)
    containerCtx.setCommands(ImmutableList.of(
        "java -cp './package/*' some.class.to.Run "
        + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout "
        + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"))
    containerCtx.setLocalResources(
        Collections.singletonMap("package", packageResource))
    appCtx.setApplicationId(appId)
    appCtx.setUser(user.getShortUserName)
    appCtx.setAMContainerSpec(containerCtx)
    request.setApplicationSubmissionContext(appCtx)
    applicationsManager.submitApplication(request)

如你所见, setLocalResources 命令接受一个组资源名字。在你的应用cwd中名字成了引用链接,所以你只需用 ./包名/* 去指向结果。

说明:Java 的 classpath 参数非常敏感。确保你的语法正确。

一旦你的包发布到你的 ApplicationMaster, 你需要跟着和你的 ApplicationMaster 启动一个新容器(假设你希望资源发送你你的容器上)相同的流程 。这个代码是一样的。你只需要确保你给你的 ApplicationMaster 包路径(HDFS或本地),以便它能随容器一起发送资源路径。

我如何获得 ApplicationMaster 的 ApplicationAttemptId?

ApplicationAttemptId 会通过环境被传给 ApplicationMaster ,同时环境中的值会通过ConverterUtils 帮助函数被转化为一个ApplicationAttemptId。

我的容器被 Node Manager 杀掉了

这可能是由于使用的内存超出请求容器的内存大小。出现这种情况有很多原因。首先,看node manager 杀死你的容器时抛出的进度树。你可能会感兴趣的两件事是物理内存和虚拟内存。是否你的应用已经超出了物理内存限制。如果你正运行一个Java应用,你可以用 -hprof 来查看什么占了堆空间。如果你已经超过了虚拟内存,你可能需要增加集群配置变量值 yarn.nodemanager.vmem-pmem-ratio。

我如何包含本地库?

启动一个容器时在命令行设置 -Djava.library.path 会导致本地库被Hadoop 使用,不会被正确加载并导致错误。要明确的用 LD_LIBRARY_PATH 来代替。


相关内容