Hadoop-2.4.1学习之Map任务源码分析(上),hadoop-2.4.1map


      众所周知,Mapper是MapReduce编程模式中最重要的环节之一(另一个当然是Reducer了)。在Hadoop-2.x版本中虽然不再有JobTracker和TaskTracker,但Mapper任务的功能却没有变化,本篇文章将结合源代码深入分析Mapper任务时如何执行的,包括处理InputSplit,mapper的输出、对输出分类等。在进行分析之前先明确几个概念:作业、任务、任务的阶段和任务的状态,可以将作业理解为要最终实现的功能或目的,比如统计单词的数量,而任务就是对该作业的拆分,只负责一部分作业,比如在统计单词数量的例子中,将一个作业交由10个任务去完成。任务的阶段指的是当前任务在执行什么功能,比如map和分类功能,在hadoop中一个任务的阶段由枚举类Phase定义,具体有6个阶段:STARTING、MAP、SHUFFLE、SORT、REDUCE、CLEANUP。任务的状态指的是该任务所处于的状态,比如运行中,失败等,具体由枚举类State定义:RUNNING、SUCCEEDED、FAILED、UNASSIGNED、KILLED、COMMIT_PENDING、FAILED_UNCLEAN、KILLED_UNCLEAN。

      在hadoop中map任务是由类MapTask表示的,该类提供了众多的内部类用于完成map任务,比如读取输入,收集输出等。在该类的开头语句块中定义了map任务的阶段:

{   // set phase for this task
    setPhase(TaskStatus.Phase.MAP); 
    getProgress().setStatus("map");
}

      该语句将任务的阶段设置为MAP,任务的初始阶段为STARTING。MapTask中的run方法用于执行任务,该方法中与map任务相关的源代码为(省略了作业清理、作业安装、任务清理的源代码):

public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;
    if (isMapTask()) {
      // If there are no reducers then there won't be any sort. Hence the map 
      // phase will govern the entire attempt's progress.
      if (conf.getNumReduceTasks() == 0) {
        mapPhase = getProgress().addPhase("map", 1.0f);
      } else {
        // If there are reducers then the entire attempt's progress will be 
        // split between the map phase (67%) and the sort phase (33%).
        mapPhase = getProgress().addPhase("map", 0.667f);
        sortPhase  = getProgress().addPhase("sort", 0.333f);
      }
}
//实现了Runnable和Report接口,用于报告进度、更新计数器和状态信息等
    TaskReporter reporter = startReporter(umbilical);
boolean useNewApi = job.getUseNewMapper();
//初始化OutputFormat、OutputCommitter,创建任务临时输出目录
    initialize(job, getJobID(), reporter, useNewApi);
    if (useNewApi) {
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }

      该方法的TaskUmbilicalProtocol参数是用于任务子进程与其父进程的通信协议。在该方法体中,首先判断该任务是不是map任务(总是true),然后根据Reducer的数量决定是否将整个map任务进度分割为MAP(66.7%)和SORT(33.3%)阶段,二者的和必须为1,也就是说在整个map任务的进行过程中,66.7%的时间用于MAP阶段,33.3%的时间用于SORT阶段(将任务执行的进度与任务执行的时间对等考虑)。接着根据是否使用新版本的API调用不同的方法(hadoop-1.x和hadoop-2.x使用的是新版本的API,hadoop-0.x使用的旧版本的API),此处将只考虑使用新版本API的情况,也就是将调用runNewMapper方法,该方法执行实际的任务工作,前半部分用于实例化TaskAttemptContext taskContext、Mapper mapper、InputFormat inputFormat、InputSplit split、RecordReader input、Mapper.Context mapperContext等对象,且根据Reducer的数量创建不同的output对象,源代码如下:

org.apache.hadoop.mapreduce.RecordWriter output = null;
   if (job.getNumReduceTasks() == 0) {
      //直接输出map的结果
      output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }

      在上述对象中,mapper和inputFormat为由作业调用setMapperClass和setInputFormatClass设置的类,split表示数据块,input用于从InputSplit中读取键值对,mapperContext为map方法中Context对象。在实例化完毕上述对象后,将进行实际的map过程,源代码如下:

try {
      input.initialize(split, mapperContext);
      //调用Mapper任务的run方法
      mapper.run(mapperContext);
      mapPhase.complete();
      setPhase(TaskStatus.Phase.SORT);
      statusUpdate(umbilical);
      input.close();
      input = null;
      output.close(mapperContext);
      output = null;
    } finally {
      closeQuietly(input);
      closeQuietly(output, mapperContext);
    }

      其中的mapper.run(mapperContext)的源代码如下所示。由上面的分析可知,mapper为自定义的Mapper类,而在mapper的run方法中将调用该类的map方法以完成map任务,在该方法中对分配给该mapper的InputSplit中的数据循环调用map方法。

setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
      至此完成了MAP阶段的任务,并将任务的阶段设置为SORT,并更新任务状态,然后关闭输入和输出。由于SORT阶段的代码比较长(占整个MapTask的一半以上),所以将在后续的文章中进行学习。

相关内容