Hadoop任务提交过程,hadoop任务提交



Hadoop任务提交分析


分析工具和环境

下载一份hadoop的源码,这里以hadoop-1.1.2为例。本地IDE环境为eclipse,导入整个目录,然后可以在IDE里面看到目录结构了,要分析任务提交过程,需要找到入口代码,很明显,对于熟悉Hadoop应用程序开发的同学来说很容易的知道任务的提交是从job的配置开始的,所以需要这样一份提交Job的代码,在src/examples里面你可以找到一些例子,这里从最简单的wordcount进入去分析提交过程。

核心的一些类

Configuration Configuration类是整个框架的配置信息的集合类,里面主要包含了overlaypropertyresources,而主要的方法主要看getset,下面具体看一下这几个变量和方法overlayprivate Properties overlay;,其实overlay是Properties对象,用来存储key-value pair,但是它的主要作用还在于在添加Resource后,保证Resources重载后用户通过set函数设置的属性对会覆盖Resource中相同key的属性对。 properties: 用来存储key-value pair finalParameters:是一个set,用来保存final的配置信息,即不允许覆盖 addResourceObject会添加资源并调用reloadConfiguration,在reloadConfiguration中会清空properties对象和finalParameter,然后下次通过get去读某个属性的时候,会由于properties为null而,重新载入resources,这样资源得到更新,并且,在这之后立马会将overlay里面的内容覆盖进入properties,就是将之前通过set函数设置的属性对去覆盖resource中的默认属性对。

  private synchronized void addResourceObject(Object resource) {
    resources.add(resource);                      // add to resources
    reloadConfiguration();
  }

  public synchronized void reloadConfiguration() {
    properties = null;                            // trigger reload
    finalParameters.clear();                      // clear site-limits
  }

  public String get(String name) {
    return substituteVars(getProps().getProperty(name)); //这里先是getProps得到properties对象然后调用getProperty得到value,然后通过
                                                                                 //substitudeVars函数去替换value中的${..},其实就是引用变量
  }

  private synchronized Properties getProps() {
    if (properties == null) {    //properties为null会重新实例化properties,然后载入资源,并将overlay里面的内容去覆盖properties里面相同key
      properties = new Properties();          //的内容
      loadResources(properties, resources, quietmode);
      if (overlay!= null) {
        properties.putAll(overlay);
        for (Map.Entry<Object,Object> item: overlay.entrySet()) {
          updatingResource.put((String) item.getKey(), UNKNOWN_RESOURCE);
        }
      }
    }
    return properties;
  }

Job

Job类是hadoop任务配置的对象,继承于JobContext,而他们的构造函数都其实是以Configuration对象作为参数,然后提供了一些设置Job配置信息或者获取Job配置信息的一些函数。

上述的配置Job信息的函数并不是直接使用了Configuration对象,而是使用了Configuration对象的子类JobConf,JobContext以conf = JobConf(conf)作为内置成员变量。JobConf是在Confguration的基础上提供了Job配置的一些函数

Job类配置好类以及输入输出等之后,会调用Job.waitforcompletion当然也可以直接调用submit函数,其中waitForCompletion会调用submit然后jobClient.monitorAndPrintJob打印和监视Job的运行进度和情况。

submit函数分析

public void submit() throws IOException, InterruptedException, 
                              ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();

    // Connect to the JobTracker and submit the job
    connect(); //主要是生成了jobClient对象,而在该对象的初始化过程中,主要生成 了jobSubmitClient的一个动态代理,能够RPC调用
                        //JobTracker中的一些函数(分布式的情况,本地情况先不做分析)
    info = jobClient.submitJobInternal(conf); // 任务提交的过程
    super.setJobID(info.getID());
    state = JobState.RUNNING;
   }

jobClient构造函数分析

//jobClient的构造函数,设置了conf,然后就调用了init函数
public JobClient(JobConf conf) throws IOException {
    setConf(conf);
    init(conf);
  }

  /**
   * Connect to the default {@link JobTracker}.
   * @param conf the job configuration.
   * @throws IOException
   */
  public void init(JobConf conf) throws IOException {
    String tracker = conf.get("mapred.job.tracker", "local");
    tasklogtimeout = conf.getInt(
      TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
    this.ugi = UserGroupInformation.getCurrentUser();
    if ("local".equals(tracker)) {
      conf.setNumMapTasks(1);
      this.jobSubmitClient = new LocalJobRunner(conf); //本地模式下,会生成一个LocalJobRunner对象
    } else {
      this.rpcJobSubmitClient = 
          createRPCProxy(JobTracker.getAddress(conf), conf); // 分布式模式下,会生成了一个RPC代理
      this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf); //对这个代理再次封装了一下,其实里面主要是增加了retry的一些代码
    }        
  }

jobClient.submitJobInternal函数分析

public 
  RunningJob submitJobInternal(final JobConf job
                               ) throws FileNotFoundException, 
                                        ClassNotFoundException,
                                        InterruptedException,
                                        IOException {
    /*
     * configure the command line options correctly on the submitting dfs
     */
    return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
      public RunningJob run() throws FileNotFoundException, 
      ClassNotFoundException,
      InterruptedException,
      IOException{
        JobConf jobCopy = job;
        Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
            jobCopy); //获得任务提交的目录,其实是作为参数的JobClient去通过RPC从JobTracker那里获取得到jobStagingArea
                            // ${hadoop.tmp.dir}/mapred/staging/user-name/.staging目录
        JobID jobId = jobSubmitClient.getNewJobId();  //从JobTracker那里获得分配的jobid
        Path submitJobDir = new Path(jobStagingArea, jobId.toString()); //submitJobDir = ${jobStagingArea}/job-id
        jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
        JobStatus status = null;
        try {
          populateTokenCache(jobCopy, jobCopy.getCredentials());

          copyAndConfigureFiles(jobCopy, submitJobDir); // 这个函数主要是负责处理通过参数--libjars,--archives,--files引入的jar包,以及要使用分布式缓存的文件和archive,并且将job.jar也拷贝到分布式文件系统上

          // get delegation token for the dir
          TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),
                                              new Path [] {submitJobDir},
                                              jobCopy);

          Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
          int reduces = jobCopy.getNumReduceTasks();
          InetAddress ip = InetAddress.getLocalHost();
          if (ip != null) {
            job.setJobSubmitHostAddress(ip.getHostAddress());
            job.setJobSubmitHostName(ip.getHostName());
          }
          JobContext context = new JobContext(jobCopy, jobId);

        //检查output,如果已经存在则会抛出异常
          // Check the output specification
          if (reduces == 0 ? jobCopy.getUseNewMapper() : 
            jobCopy.getUseNewReducer()) {
            org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
              ReflectionUtils.newInstance(context.getOutputFormatClass(),
                  jobCopy);
            output.checkOutputSpecs(context);
          } else {
            jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
          }

          jobCopy = (JobConf)context.getConfiguration();

        //利用设置好的inputformat的来对输入进行分片,然后将分片写入到job.split中,然后也会将分片的元信息写入到job.splitmetainfo中
        //
          // Create the splits for the job
          FileSystem fs = submitJobDir.getFileSystem(jobCopy);
          LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
          int maps = writeSplits(context, submitJobDir);
          jobCopy.setNumMapTasks(maps);

          // write "queue admins of the queue to which job is being submitted"
          // to job file.
          String queue = jobCopy.getQueueName();
          AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
          jobCopy.set(QueueManager.toFullPropertyName(queue,
              QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());


            //将job配置相关的信息写入到job.xml中
          // Write job file to JobTracker's fs        
          FSDataOutputStream out = 
            FileSystem.create(fs, submitJobFile,
                new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

          try {
            jobCopy.writeXml(out);  //正式写入
          } finally {
            out.close();
          }
          //
          // Now, actually submit the job (using the submit name)
          //
          printTokens(jobId, jobCopy.getCredentials());
          status = jobSubmitClient.submitJob(
              jobId, submitJobDir.toString(), jobCopy.getCredentials()); //通过RPC提交任务给JobTracker,让其去管理任务,在JobTrakcker的
                                                                                                //subnitJob里面会实例化JobInProgress并保存在一个jobs的Map<jobID,JobInProgress>的map里面
          JobProfile prof = jobSubmitClient.getJobProfile(jobId); //获得任务的概要信息,主要包括了jobname,jobid,提交的队列名,job配置信息的路径,用户,web接口的url等
          if (status != null && prof != null) {
            return new NetworkedJob(status, prof, jobSubmitClient); //对status,prof,以及jobSubmitClient的封装,主要用来可以查询进度信息和profile信息。而status里面提供了getMapProgress和setMapProgress类型的接口函数
          } else {
            throw new IOException("Could not launch job");
          }
        } finally {
          if (status == null) {
            LOG.info("Cleaning up the staging area " + submitJobDir);
            if (fs != null && submitJobDir != null)
              fs.delete(submitJobDir, true);
          }
        }
      }
    });
  }

JobClient

从上面的一些代码也可以看出真正提交Job是在JobClient做的,它主要处理了一些参数像libjars files archives把他们指定的文件拷贝到HDFS的submitJobDir,然后把JOb的配置信息job.xml , 分片信息以及job.jar也都拷贝到HDFS上。然后利用jobClient里面的一个代理类jobSubmitClient来提交任务,其实就是一个RPC请求,向JObTracker发送请求,然后返回的是一个JobStatus类型的对象,该对象可以查询map reduce进度等。


下一篇讲一下JObTracker相关


hadoop程序到底怎写,作业怎提交

提交指令应该:bin/hadoop jar b.jar wordcount /input /output,其中jar是文件类型,b.jar文件名,wordcount主类名
 

hadoop 用eclipse提交Mapreduce任务,每次都报错

问题:
你使用的是hdfs,但你在Configuration中只设置了mapred.job.tracker值,这个是jobtracker的地址,你需要设置namenode的地址。而放到集群时在new Configuration时会自动加载集群的配置文件,如core-site.xml,hdfs-site.xml等
解决方法:
1)直接调用Configuration的set方法为fs.default.name设置值,值为namenode地址
2)直接将集群的三个*-site.xml配置文件放入项目的classpath下,简单方便
 

相关内容