Reduce Task的学习笔记,reducetask


            MapReduce五大过程已经分析过半了,上次分析完Map的过程,着实花费了我的很多时间,不过收获很大,值得了额,这次用同样的方法分析完了Reduce的过程,也算是彻底摸透了MapReduce思想的2个最最重要的思想了吧。好,废话不多,切入正题,在学习Reduce过程分析的之前,我特意查了书籍上或网络上相关的资料,我发现很大都是大同小异,缺乏对于源码的参照分析,所以我个人认为,我了可以在某些细节上讲得跟明白些,也许会比较好。因为Map和Reduce的过程的整体流程是非常相近的,如果你看过之前我写的Map Task的分析,相信你也能很快理解我的Reduce过程的分析的。Reduce过程的集中表现体现于Reduce Task中,Reduce Task与Map Reduce一样,分为Job-setup Task,  Job-cleanup Task, Task-cleanup Task和Reduce Task。我分析的主要是最后一个Reduce Task 。Reduce Task 主要分为5个阶段:

Shuffle------------------->Merge------------------->Sort------------------->Reduce------------------->Write

其中最重要的部分为前3部分,我也会花最多的时间描述前面3个阶段的任务。

       Shuffle阶段。我们知道,Reduce的任务在最最开始的时候,就是接收Map任务中输出的中间结果的数据,key-value根据特定的分区算法,给相应的Reduce任务做处理,所以这时需要Reduce任务去远程拷贝Map输出的中间数据了,这个过程就称作Shuffle阶段,所以这个阶段也称为Copy阶段。在Shuffle阶段中,有个GetMapEventsThread,会定期发送RPC请求,获取远程执行好的Map Task的列表,把他们的输出location映射到mapLocation中。

....
        	//GetMapEventsThread线程是远程调用获得已经完成的Map任务的列表
            int numNewMaps = getMapCompletionEvents();
            if (LOG.isDebugEnabled()) {
              if (numNewMaps > 0) {
                LOG.debug(reduceTask.getTaskID() + ": " +  
                    "Got " + numNewMaps + " new map-outputs"); 
              }
            }
            Thread.sleep(SLEEP_TIME);
          } 
进入getMapCompletionEvents方法,继续看:

...
        for (TaskCompletionEvent event : events) {
          switch (event.getTaskStatus()) {
            case SUCCEEDED:
            {
              URI u = URI.create(event.getTaskTrackerHttp());
              String host = u.getHost();
              TaskAttemptID taskId = event.getTaskAttemptId();
              URL mapOutputLocation = new URL(event.getTaskTrackerHttp() + 
                                      "/mapOutput?job=" + taskId.getJobID() +
                                      "&map=" + taskId + 
                                      "&reduce=" + getPartition());
              List<MapOutputLocation> loc = mapLocations.get(host);
              if (loc == null) {
                loc = Collections.synchronizedList
                  (new LinkedList<MapOutputLocation>());
                mapLocations.put(host, loc);
               }
              //loc中添加新的已经完成的,mapOutputLocation,mapLocations是全局共享的
              loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));
              numNewMaps ++;
            }
            break;
            ....
为了避免出现网络热点,Reduce Task对输出的位置进行了混洗的操作,然后保存到scheduleCopies中,后续的拷贝操作都是围绕着这个列表进行的。这个变量保存在了一个叫ReduceCopier的类里面。确认拷贝的目标位置,还只是Shuffle阶段的前半部分,这时看一下,执行的入口代码在哪里。回到Reduce Task的入口run()代码:

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, InterruptedException, ClassNotFoundException {
    this.umbilical = umbilical;
    job.setBoolean("mapred.skip.on", isSkipping());

    if (isMapOrReduce()) {
      //设置不同阶段任务的进度
      copyPhase = getProgress().addPhase("copy");
      sortPhase  = getProgress().addPhase("sort");
      reducePhase = getProgress().addPhase("reduce");
    }
    // start thread that will handle communication with parent
    //创建Task任务报告,与父进程进行联系沟通
    TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
        jvmContext);
    reporter.startCommunicationThread();
    //判断是否使用的是新的额API
    boolean useNewApi = job.getUseNewReducer();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    //和map任务一样,Task有4种,Job-setup Task, Job-cleanup Task, Task-cleanup Task和ReduceTask
    if (jobCleanup) {
      //这里执行的是Job-cleanup Task
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      //这里执行的是Job-setup Task
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      //这里执行的是Task-cleanup Task
      runTaskCleanupTask(umbilical, reporter);
      return;
    }
    
    /*  后面的内容就是开始执行Reduce的Task */
    
    // Initialize the codec
    codec = initCodec();

    boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
    if (!isLocal) {
      reduceCopier = new ReduceCopier(umbilical, job, reporter);
      if (!reduceCopier.fetchOutputs()) {
    	  ......
到了reduceCopier.fetchOutps()这里必须停一步了,因为后面的Shuffle阶段和Merge阶段都在这里实现:

/**
     * 开启n个线程远程拷贝Map中的输出数据
     * @return
     * @throws IOException
     */
    public boolean fetchOutputs() throws IOException {
      int totalFailures = 0;
      int            numInFlight = 0, numCopied = 0;
      DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
      final Progress copyPhase = 
        reduceTask.getProgress().phase();
      //单独的线程用于对本地磁盘的文件进行定期的合并
      LocalFSMerger localFSMergerThread = null;
      //单独的线程用于对内存上的文件进行进行定期的合并
      InMemFSMergeThread inMemFSMergeThread = null;
      GetMapEventsThread getMapEventsThread = null;
      
      for (int i = 0; i < numMaps; i++) {
        copyPhase.addPhase();       // add sub-phase per file
      }
      
      //建立拷贝线程列表容器
      copiers = new ArrayList<MapOutputCopier>(numCopiers);
      
      // start all the copying threads
      for (int i=0; i < numCopiers; i++) {
    	//新建拷贝线程,逐一开启拷贝线程
        MapOutputCopier copier = new MapOutputCopier(conf, reporter, 
            reduceTask.getJobTokenSecret());
        copiers.add(copier);
        //添加到列表容器中,并开启此线程
        copier.start();
      }
      
      //start the on-disk-merge thread
      localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys);
      //start the in memory merger thread
      inMemFSMergeThread = new InMemFSMergeThread();
      //定期合并的2个线程也开启,也就是说copy阶段和merge阶段是并行操作的
      localFSMergerThread.start();
      inMemFSMergeThread.start();
      
      // start the map events thread
      getMapEventsThread = new GetMapEventsThread();
      getMapEventsThread.start();
      .....
在上面的代码中出现很多陌生的Thread的定义,这个可以先不用管,我们发现getMapEventsThread就是在这里开启的,去获取了最新的位置,位置获取完成当然是要启动很多的拷贝线程了,这里叫做MapOutputCopier线程,作者是把他放入一个线程列表中,逐个开启。看看里面的具体实现,他是如何进行远程拷贝的呢。

@Override
      public void run() {
        while (true) {        
          try {
            MapOutputLocation loc = null;
            long size = -1;
            
            synchronized (scheduledCopies) {
              //从scheduledCopies列表中获取获取map Task的输出数据的位置
              while (scheduledCopies.isEmpty()) {
            	//如果scheduledCopies我空,则等待
                scheduledCopies.wait();
              }
              //获取列表中的第一个数据作为拷贝的地址
              loc = scheduledCopies.remove(0);
            }
           
            CopyOutputErrorType error = CopyOutputErrorType.OTHER_ERROR;
            readError = false;
            try {
              shuffleClientMetrics.threadBusy();
              //标记当前的map输出位置为loc
              start(loc);
              //进行只要的copy操作,返回拷贝字节数的大小
              size = copyOutput(loc);
              shuffleClientMetrics.successFetch();
              //如果进行到这里,说明拷贝成功吗,标记此error的标记为NO_ERROR
              error = CopyOutputErrorType.NO_ERROR;
            } catch (IOException e) {
              //抛出异常,做异常处理
              ....
从location列表中去取出,然后进行拷贝操作,核心方法在copyOutput(),接着往里跟踪:

.....
        // Copy the map output
        //根据loc Map任务的数据输出位置,进行RPC的拷贝
        MapOutput mapOutput = getMapOutput(loc, tmpMapOutput,
                                           reduceId.getTaskID().getId());
继续往里:

private MapOutput getMapOutput(MapOutputLocation mapOutputLoc, 
                                     Path filename, int reduce)
      throws IOException, InterruptedException {
        // Connect
    	//打开url资源定位符的连接
        URL url = mapOutputLoc.getOutputLocation();
        URLConnection connection = url.openConnection();
        
        //得到远程数据的输入流
        InputStream input = setupSecureConnection(mapOutputLoc, connection);
 
        ......
        //We will put a file in memory if it meets certain criteria:
        //1. The size of the (decompressed) file should be less than 25% of 
        //    the total inmem fs
        //2. There is space available in the inmem fs
        
        // Check if this map-output can be saved in-memory
        //向ShuffleRamManager申请内存存放拷贝的数据,判断内存是否内存是否装得下,装不下则放入DISK磁盘
        boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); 

        // Shuffle
        MapOutput mapOutput = null;
        if (shuffleInMemory) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Shuffling " + decompressedLength + " bytes (" + 
                compressedLength + " raw bytes) " + 
                "into RAM from " + mapOutputLoc.getTaskAttemptId());
          }

          //如果内存装得下,则将输入流中的数据放入内存
          mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
                                      (int)decompressedLength,
                                      (int)compressedLength);
        } else {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Shuffling " + decompressedLength + " bytes (" + 
                compressedLength + " raw bytes) " + 
                "into Local-FS from " + mapOutputLoc.getTaskAttemptId());
          }
          
          //装不下,则放入文件中
          mapOutput = shuffleToDisk(mapOutputLoc, input, filename, 
              compressedLength);
        }
            
        return mapOutput;
      }
在这里我们看到了,Hadoop通过URL资源定位符,获取远程输入流,进行操作的,在拷贝到本地的时候,还分了2种情况处理,当当前的内存能方得下当前数据的时候,放入内存中,放不下则写入到磁盘中。这里还出现了ShuffleRamManager的用法。至此,Shuffle阶段宣告完成。还是比较深的,一层,又一层的。

       Merger阶段。Merge阶段其实是和Shuffle阶段并行进行的,刚刚也看到了,在fetchOutputs中,这些相关进程都是同时开启的,

public boolean fetchOutputs() throws IOException {
      int totalFailures = 0;
      int            numInFlight = 0, numCopied = 0;
      DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
      final Progress copyPhase = 
        reduceTask.getProgress().phase();
      //单独的线程用于对本地磁盘的文件进行定期的合并
      LocalFSMerger localFSMergerThread = null;
      //单独的线程用于对内存上的文件进行进行定期的合并
      InMemFSMergeThread inMemFSMergeThread = null;
      ....
Merge的主要工作就是合并数据,当内存中或者磁盘中的文件比较多的时候,将小文件进行合并变成大文件。挑出其中的一个run方法
....
      public void run() {
        LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
        try {
          boolean exit = false;
          do {
            exit = ramManager.waitForDataToMerge();
            if (!exit) {
              //进行内存merger操作
              doInMemMerge();
目的非常明确,就是Merge操作,这是内存文件的合并线程的run方法,LocalFSMerger与此类似,不分析了。这个Mergr处理是并与Shuffle阶段的。在这里这2个阶段都完成了。还是有点复杂的。下面是相关的一些类关系图,主要要搞清4个线程是什么作用的。


4个线程的调用都是在ReduceCopier.fetchOutput()方法中进行的。在Shuffle,Merge阶段的后面就来到了,Sort阶段。

       Sort阶段,的任务和轻松,就是完成一次对内存和磁盘总的一次Merge合并操作,其中还会对其中进行一次sort排序操作。

....
    //标识copy操作已经完成
    copyPhase.complete();                         // copy is already complete
    setPhase(TaskStatus.Phase.SORT);
    statusUpdate(umbilical);

    //进行内存和磁盘中的总的merge阶段的操作,Sort包含其中执行
    final FileSystem rfs = FileSystem.getLocal(job).getRaw();
    RawKeyValueIterator rIter = isLocal
      ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
          job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
          !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
          new Path(getTaskID().toString()), job.getOutputKeyComparator(),
          reporter, spilledRecordsCounter, null)
      : reduceCopier.createKVIterator(job, rfs, reporter);
那么Sort操作在哪里呢,就在最下面的createKVIterator中:

private RawKeyValueIterator createKVIterator(
        JobConf job, FileSystem fs, Reporter reporter) throws IOException {

      .....
      //在Merge阶段对所有的数据进行归并排序
      Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
        public int compare(Segment<K, V> o1, Segment<K, V> o2) {
          if (o1.getLength() == o2.getLength()) {
            return 0;
          }
          return o1.getLength() < o2.getLength() ? -1 : 1;
        }
      });

      // build final list of segments from merged backed by disk + in-mem
      List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
,Sort阶段的任务就是这么简单。下面看一下前3个阶段主要的执行流程,这3个阶段构成了Reduce Task的核心。



       Reduce阶段,跟随这个图的执行方向,接下来我们应该执行的是key-value的reduce()函数了,没错就是循环键值对,执行此函数

....
    //判断执行的是新的API还是旧的API
    if (useNewApi) {
      runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    } else {
      runOldReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    }
在这里我们执行的就是runReducer方法了,我们往老的API跳:

  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runOldReducer(JobConf job,
                     TaskUmbilicalProtocol umbilical,
                     final TaskReporter reporter,
                     RawKeyValueIterator rIter,
                     RawComparator<INKEY> comparator,
                     Class<INKEY> keyClass,
                     Class<INVALUE> valueClass) throws IOException {
    Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = 
      ReflectionUtils.newInstance(job.getReducerClass(), job);
    // make output collector
    String finalName = getOutputName(getPartition());

    //获取输出的key,value
    final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
        reduceOutputCounter, job, reporter, finalName);
    
    OutputCollector<OUTKEY,OUTVALUE> collector = 
      new OutputCollector<OUTKEY,OUTVALUE>() {
        public void collect(OUTKEY key, OUTVALUE value)
          throws IOException {
          //将处理后的key,value写入输出流中,最后写入HDFS作为最终结果
          out.write(key, value);
          // indicate that progress update needs to be sent
          reporter.progress();
        }
      };
    
    // apply reduce function
    try {
      //increment processed counter only if skipping feature is enabled
      boolean incrProcCount = SkipBadRecords.getReducerMaxSkipGroups(job)>0 &&
        SkipBadRecords.getAutoIncrReducerProcCount(job);
      
      //判断是否为跳过错误记录模式
      ReduceValuesIterator<INKEY,INVALUE> values = isSkipping() ? 
          new SkippingReduceValuesIterator<INKEY,INVALUE>(rIter, 
              comparator, keyClass, valueClass, 
              job, reporter, umbilical) :
          new ReduceValuesIterator<INKEY,INVALUE>(rIter, 
          job.getOutputValueGroupingComparator(), keyClass, valueClass, 
          job, reporter);
      values.informReduceProgress();
      while (values.more()) {
        reduceInputKeyCounter.increment(1);
        //Record迭代器中获取每一对,执行用户定义的Reduce函数,此阶段为Reduce阶段
        reducer.reduce(values.getKey(), values, collector, reporter);
        if(incrProcCount) {
          reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
              SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS, 1);
        }
        //获取下一个key,value
        values.nextKey();
        values.informReduceProgress();
      }
     //...
和Map Task的过程很类似,也正如我们预期的那样,循环迭代执行,这就是Reduce阶段。

        Write阶段。Write阶段是最后一个阶段,在用户自定义的reduce中,一般用户都会调用collect.collect方法,这时候就是写入的操作了。这时的写入就是将最后的结果写入HDFS作为最终结果了。这里先定义了OutputCollector的collect方法:

OutputCollector<OUTKEY,OUTVALUE> collector = 
      new OutputCollector<OUTKEY,OUTVALUE>() {
        public void collect(OUTKEY key, OUTVALUE value)
          throws IOException {
          //将处理后的key,value写入输出流中,最后写入HDFS作为最终结果
          out.write(key, value);
          // indicate that progress update needs to be sent
          reporter.progress();
        }
      };
至此,完成了Reduce任务的所有阶段。下面是一张时序图,便于理解:


掌握了Map ,Reduce2个过程核心实现的过程将会帮助我们更加理解Hadoop作业运行的整个流程。整个分析的过程也许会有点枯燥,但是苦中作乐。


相关内容