yarn运行simpleyarnapp


版本:hadoop2.2.0

源码在https://github.com/hortonworks/simple-yarn-app这里可以下载。之前一直试验这个simpleyarnapp一直没有成功过,作为yarn的hello world应该没有那么难运行吧。几经排查,发现还是classpath路径的问题。

首先,还是要按照http://blog.csdn.net/fansy1990/article/details/22896249配置环境。

这里说是classpath的问题,主要是指linux和windows里面设置java的classpath的方式是不同的。假如按照github上面的源码(由于我是使用windows提交任务的,所以会出现这样的问题,如果是linux提交任务则不会出现这样的问题),设置断点查看到的classpath的路径为:

{CLASSPATH=$HADOOP_CONF_DIR;$HADOOP_COMMON_HOME/share/hadoop/common/*;$HADOOP_COMMON_HOME/share/hadoop/common/lib/*;$HADOOP_HDFS_HOME/share/hadoop/hdfs/*;$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*;$HADOOP_YARN_HOME/share/hadoop/yarn/*;$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*;%PWD%\*}
而使用修改过的源码,其路径为:

{CLASSPATH=$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/share/hadoop/common/*:$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:$HADOOP_YARN_HOME/share/hadoop/yarn/*:$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*:$PWD/*}
分号和冒号以及$和%的差别。

client的源码如下:

package com.hortonworks.simpleyarnapp;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class Client {
  Logger log = LoggerFactory.getLogger(Client.class);
  Configuration conf = new YarnConfiguration();
  
  public void run(String[] args) throws Exception {
    final String command = args[0];
    final int n = Integer.valueOf(args[1]);
    final Path jarPath = new Path(args[2]);

    // Create yarnClient
    // YarnConfiguraton extends Configuration
 //   YarnConfiguration conf = new YarnConfiguration();
    conf.set("fs.defaultFS", "hdfs://node31:9000");
	conf.set("mapreduce.framework.name", "yarn");
	conf.set("yarn.resourcemanager.address", "node31:8032");
    YarnClient yarnClient = YarnClient.createYarnClient();
    yarnClient.init(conf);
    yarnClient.start();
    
    // Create application via yarnClient
    YarnClientApplication app = yarnClient.createApplication();
    
    // Set up the container launch context for the application master
    ContainerLaunchContext amContainer = 
        Records.newRecord(ContainerLaunchContext.class);
    amContainer.setCommands(
        Collections.singletonList(
            "$JAVA_HOME/bin/java" +
            " -Xmx256M" +
           /* " com.hortonworks.simpleyarnapp.Work" +*/
           " com.hortonworks.simpleyarnapp.ApplicationMaster" +
            " " + command +
            " " + String.valueOf(n) +
            " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + 
            " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" 
            )
        );
    
    // Setup jar for ApplicationMaster
    LocalResource appMasterJar = Records.newRecord(LocalResource.class);
    setupAppMasterJar(jarPath, appMasterJar);
    amContainer.setLocalResources(
        Collections.singletonMap("simpleapp.jar", appMasterJar));

    // Setup CLASSPATH for ApplicationMaster
    Map<String, String> appMasterEnv = new HashMap<String, String>();
    setupAppMasterEnv(appMasterEnv);
    amContainer.setEnvironment(appMasterEnv);
    
    // Set up resource type requirements for ApplicationMaster
    Resource capability = Records.newRecord(Resource.class);
    capability.setMemory(256);
    capability.setVirtualCores(1);

    // Finally, set-up ApplicationSubmissionContext for the application
    ApplicationSubmissionContext appContext = 
    app.getApplicationSubmissionContext();
    appContext.setApplicationName("simple-yarn-app"); // application name
    appContext.setAMContainerSpec(amContainer);
    appContext.setResource(capability);
    appContext.setQueue("default"); // queue 

    // Submit application
    ApplicationId appId = appContext.getApplicationId();
    System.out.println("Submitting application " + appId);
    log.info("Submitting application " + appId);
    try {
		yarnClient.submitApplication(appContext);
	} catch (Exception e) {
		e.printStackTrace();
	}
   /* log.info("-----------------------------------");
    for(ApplicationReport appli:yarnClient.getApplications()){
    	log.info("appli.getApplicationType():"+appli.getApplicationType()+"\n"
    			+"appli.getHost():"+appli.getHost()+"\n"
    			+"appli.getOriginalTrackingUrl():"+appli.getOriginalTrackingUrl()+"\n"
    			+"appli.getTrackingUrl():"+appli.getTrackingUrl()+"\n"
    			+"appli.getUser():"+appli.getUser());
    }
    log.info("--------------------------------------");*/
    ApplicationReport appReport = yarnClient.getApplicationReport(appId);
    YarnApplicationState appState = appReport.getYarnApplicationState();
    while (appState != YarnApplicationState.FINISHED && 
           appState != YarnApplicationState.KILLED && 
           appState != YarnApplicationState.FAILED) {
      Thread.sleep(100);
      appReport = yarnClient.getApplicationReport(appId);
      appState = appReport.getYarnApplicationState();
    }
    
    System.out.println(
        "Application " + appId + " finished with" +
    		" state " + appState + 
    		" at " + appReport.getFinishTime());

  }
  
  private void setupAppMasterJar(Path jarPath, LocalResource appMasterJar) throws IOException {
    FileStatus jarStat = FileSystem.get(conf).getFileStatus(jarPath);
    appMasterJar.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
    appMasterJar.setSize(jarStat.getLen());
    appMasterJar.setTimestamp(jarStat.getModificationTime());
    appMasterJar.setType(LocalResourceType.FILE);
    appMasterJar.setVisibility(LocalResourceVisibility.PUBLIC);
  }
  private static void addToEnvironment(
        Map<String, String> environment,
        String variable, String value) {
      String val = environment.get(variable);
      String separator = ":";
      if (val == null) {
        val = value;
      } else {
        val = val  +separator + value;
      }
      environment.put(StringInterner.weakIntern(variable), 
          StringInterner.weakIntern(val));
    }
  private void setupAppMasterEnv(Map<String, String> appMasterEnv) {
    for (String c : conf.getStrings(
        YarnConfiguration.YARN_APPLICATION_CLASSPATH,
        YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
      addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(),
          c.trim());
    }
   /* Apps.addToEnvironment(appMasterEnv,
        Environment.CLASSPATH.name(),
        Environment.PWD.$() + File.separator + "*");*/
    addToEnvironment(appMasterEnv,
            Environment.CLASSPATH.name(),
            "$PWD" + Path.SEPARATOR + "*");
  }
  
  public static void main(String[] args) throws Exception {
    Client c = new Client();
    String[] arg= {"/root/myShell.sh","1","hdfs://node31:9000/input/"};
    /*String[] arg= {"java","1","hdfs://node31:9000/input/"};*/
    c.run(arg);
  }
}

appMaster的源码如下(这个好像没有改动):

package com.hortonworks.simpleyarnapp;

import java.util.Collections;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ApplicationMaster {

  static Logger log = LoggerFactory.getLogger(ApplicationMaster.class);
  
  public static void main(String[] args) throws Exception {
    final String command = args[0];
    final int n = Integer.valueOf(args[1]);
    if(log.isDebugEnabled()){
    	log.debug("Entering the ApplicationMaster");
    }
    // Initialize clients to ResourceManager and NodeManagers
    Configuration conf = new YarnConfiguration();
    /*conf.set("fs.defaultFS", "hdfs://node31:9000");
	conf.set("mapreduce.framework.name", "yarn");
	conf.set("yarn.resourcemanager.address", "node31:8032");*/
    AMRMClient<ContainerRequest> rmClient = AMRMClient.createAMRMClient();
    rmClient.init(conf);
    rmClient.start();

    NMClient nmClient = NMClient.createNMClient();
    nmClient.init(conf);
    nmClient.start();

    // Register with ResourceManager
    System.out.println("registerApplicationMaster 0");
    rmClient.registerApplicationMaster("", 0, "");
    System.out.println("registerApplicationMaster 1");
    
    // Priority for worker containers - priorities are intra-application
    Priority priority = Records.newRecord(Priority.class);
    priority.setPriority(0);

    // Resource requirements for worker containers
    Resource capability = Records.newRecord(Resource.class);
    capability.setMemory(128);
    capability.setVirtualCores(1);

    // Make container requests to ResourceManager
    for (int i = 0; i < n; ++i) {
      ContainerRequest containerAsk = new ContainerRequest(capability, null, null, priority);
      System.out.println("Making res-req " + i);
      rmClient.addContainerRequest(containerAsk);
    }

    // Obtain allocated containers and launch 
    int allocatedContainers = 0;
    while (allocatedContainers < n) {
      AllocateResponse response = rmClient.allocate(0);
      for (Container container : response.getAllocatedContainers()) {
        ++allocatedContainers;

        // Launch container by create ContainerLaunchContext
        ContainerLaunchContext ctx = 
            Records.newRecord(ContainerLaunchContext.class);
        ctx.setCommands(
            Collections.singletonList(
                command + 
               /* "$JAVA_HOME/bin/java" +*/
            
            /*"/bin/bash /root/myShell.sh" +*/
                " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + 
                " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" 
                ));
        System.out.println("Launching container " + allocatedContainers);
        nmClient.startContainer(container, ctx);
      }
      Thread.sleep(100);
    }

    // Now wait for containers to complete
    int completedContainers = 0;
    while (completedContainers < n) {
      AllocateResponse response = rmClient.allocate(completedContainers/n);
      for (ContainerStatus status : response.getCompletedContainersStatuses()) {
        ++completedContainers;
        System.out.println("Completed container " + completedContainers);
      }
      Thread.sleep(100);
    }

    // Un-register with ResourceManager
    rmClient.unregisterApplicationMaster(
        FinalApplicationStatus.SUCCEEDED, "", "");
  }
}

在client中的Configuration需要配置(conf.set()...),而在AppMaster中就不需要这样做了。

把上面两个文件编译打包放在$hadoop_home/share/hadoop/yarn/lib下面即可。

编写shell文件:在/root/myShell.sh中输入下面的内容:

#!/bin/bash

touc "/root/a.txt"
cho "oh ,it works !" > /root/a.txt

这里可以看到shell中的语法是错的,touc-->touch , cho-->echo 。

然后运行client的程序,查看/root下面是否有a.txt文件。如果有,则说明确实是执行了shell文件了。如果没有则说明有问题。(正常的情况下是有这个文件的)。同时如果把shell的错误改为正确的,还可以看到a.txt里面的文字:oh, it works !

这里可以知道运行shell其实是失败了的,但是在resourcemanager的log里面看到这个任务是成功的,并且没有提示其他错误信息。所以,这说明其实这个程序只是可以运行shell而已,至于是否运行正确或者错误就不管了?

又或者说是我的shell编写的太简单了,没有含有程序失败的控制之类的或者说是容错的程序代码段?可以看到在AppMaster中其实也是有log的

ctx.setCommands(
            Collections.singletonList(
                command + 
               /* "$JAVA_HOME/bin/java" +*/
            
            /*"/bin/bash /root/myShell.sh" +*/
                " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + 
                " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" 
                ));

这个log应该记录的是command的log吧,但是这个log暂时不清楚在哪。假如说这个log是记录command的错误信息的话,那找到这个log就可以看到我们的command执行情况了。但是这个command和我们集群的resourceManger是否有通信?即resourceManger如果可以获取command执行的状态的话,应该是以这个状态来返回作为最后job运行的状态。

command是在container中运行的,resourcemanager应该可以获取container的任务执行状态。所以应该是编写的shell没有通知到container来做相应的变化么?

另外,如果我去掉AppMaster的话,而是自己写一个一般的java程序,比如就把一些数据写入hdfs,如下:

package com.hortonworks.simpleyarnapp;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import com.google.common.io.Closeables;

public class Work {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		String info="first argument is:"+args[0]+"\n"
				+"second argument is :"+args[1];
		System.out.println("--------------------------------"+info);
		Configuration conf = new YarnConfiguration();
		conf.set("fs.defaultFS", "hdfs://node31:9000");
		conf.set("mapreduce.framework.name", "yarn");
		conf.set("yarn.resourcemanager.address", "node31:8032");
		writeString(info,conf);
	}
	
	
	private static void writeString(String value,Configuration conf) {
		  Path path=new Path("hdfs://node31:9000/input/work.info");
		   FileSystem fs;
		   FSDataOutputStream out=null;
		    try {
		    	fs = FileSystem.get(path.toUri(),conf);
			    out = fs.create(path);
			    out.writeUTF(value);
		    } catch(Exception e){
		    	e.printStackTrace();
		    }finally {
		      Closeables.closeQuietly(out);
		    } 
	  }


}

然后把client的命令改为:

 amContainer.setCommands(
        Collections.singletonList(
            "$JAVA_HOME/bin/java" +
            " -Xmx256M" +
            " com.hortonworks.simpleyarnapp.Work" +
          /* " com.hortonworks.simpleyarnapp.ApplicationMaster" +*/
            " " + command +
            " " + String.valueOf(n) +
            " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + 
            " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" 
            )
        );
那么,其实也是可以运行Work的,不过,Work的内容正确运行(确实在hdfs中写入了数据),但是job的状态返回的是fail的。但是确实是可以提交任务的。

还有一点,假如,我把AppMaster的command换为java命令,然后来执行我的Work,这样应该也是可以的。但是目前的情况是,任务执行成功,但是Work的内容却是没有执行(hdfs没有写入数据)。


分享,成长,快乐

转载请注明blog地址:http://blog.csdn.net/fansy1990




相关内容

    暂无相关文章