Hadoop-2.4.1学习之Map任务源码分析(下),hadoop-2.4.1map


      在Map任务源码分析(上)中,对MAP阶段的代码进行了学习,这篇文章文章将学习Map任务的SORT阶段。如果Reducer的数量不为0,则还需要进行SORT阶段,但从上面的学习中并未发现与MAP阶段执行完毕调用mapPhase.complete()类似的在SORT阶段执行完毕调用sortPhase.complete()的源码,那SORT阶段是在什么时候启动的?对于Map任务来说,有输入就有输出,输入由RecordReader负责,输出则由RecordWriter负责,当Reducer的数量不为0时,RecordWriter为NewOutputCollector(该类为MapTask的私有内部类),SORT阶段对map的输出进行处理,由此推断SORT阶段的工作是由NewOutputCollector完成的,下面将通过分析NewOutputCollector的源代码要验证这一推断是否成立。该类继承自RecordWriter,拥有的变量如下:

private final MapOutputCollector<K,V> collector;//负责实际的输出操作
private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;//对键空间进行分区
private final int partitions;//分区数量,与Reducer的数量相同
      该类的构造函数如下:

collector = createSortingCollector(job, reporter);
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}

      在该段代码中还是未发现与SORT阶段有关的任何信息,但却发现了Sorting,据此推断方法createSortingCollector具有最大的可能性。该方法的源代码如下:

//根据mapreduce.job.map.output.collector.class的值构建MapOutputCollector
//在未指定该参数值的情况,返回MapOutputBuffer对象
MapOutputCollector<KEY, VALUE> collector= (MapOutputCollector<KEY, VALUE>)
ReflectionUtils.newInstance.
(job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
MapOutputBuffer.class, MapOutputCollector.class), job);
LOG.info("Map output collector class = " + collector.getClass().getName());
MapOutputCollector.Context context =new MapOutputCollector.Context(this, job, reporter);
//默认调用MapOutputBuffer的init方法
collector.init(context);
return collector;

      经过上面的一系列分析可知,SORT阶段的工作由NewOutputCollector完成,而NewOutputCollector又将SORT工作交给了MapOutputCollector,最终由该接口的实现类MapOutputBuffer完成,该类做为MapTask的内部类占用了MapTask源代码中超过一半的行数(MapTask行数为2000行,MapOutputBuffer约为1100行),但从行数就可以得出该类的重要性。MapOutputBuffer的init方法的第一部分代码根据设置构建缓存,代码已经添加了相关注释:

//sanity checks
//当kvbuffer缓存达到该值时,溢出线程将缓存内容写入硬盘
final float spillper =job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
//用于排序文件的缓存的大小,默认为100MB
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
//mapreduce.task.index.cache.limit.bytes,默认值为1024 * 1024(1M)
indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,                                     INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
if (spillper > (float)1.0 || spillper <= (float)0.0) {
    throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
     "\": " + spillper);
}
//sortmb的最大值为2047Mb(111 1111 1111),取sortmb的最低11位
//若大于2047Mb,下面的左移20位将导致溢出
if ((sortmb & 0x7FF) != sortmb) {
    throw new IOException("Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
}
//默认使用快速排序
sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
QuickSort.class, IndexedSorter.class), job);
// buffers and accounting
//sortbm*(2的20次方),将sortbm转换为字节数(1024*1024,2的十次方乘以2的十次方)
int maxMemUsage = sortmb << 20;
//METASIZE=16,maxMemUsage=sortmb << 20
maxMemUsage -= maxMemUsage % METASIZE;
kvbuffer = new byte[maxMemUsage];
bufvoid = kvbuffer.length;
kvmeta = ByteBuffer.wrap(kvbuffer).order(ByteOrder.nativeOrder()).asIntBuffer();
setEquator(0);
//equator:标记元数据或者序列化数据的起源
//bufstart:标记溢出的起始位置,bufend:标记可收集收据的起始位置
//bufindex:标记已收集数据的结束位置。全部初始化为0
bufstart = bufend = bufindex = equator;
//kvstart:标记溢出元数据的起源,kvend:标记溢出元数据的结束位置
//kvindex:标记完全序列化的记录的结束位置
kvstart = kvend = kvindex;
maxRec = kvmeta.capacity() / NMETA;
softLimit = (int)(kvbuffer.length * spillper);
bufferRemaining = softLimit;

      MapOutputBuffer的init方法的第二部分启动SpillThread线程,该线程用于完成SORT阶段的工作,并负责溢出缓存中的数据。在该线程中的run方法中调用了sortAndSpill方法,由方法名就可以得知该方法负责map输出的排序和溢出工作,排序部分的源代码如下:

final int mstart = kvend / NMETA;
// kvend is a valid record
final int mend = 1 + (kvstart >= kvend? kvstart
          : kvmeta.capacity() + kvstart) / NMETA;
//对指定范围的数据进行排序,默认使用的QuickSort
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);

      对数据排完序后就需要将已排序数据写出到文件中,源代码如下:

int spindex = mstart;
//记录索引的startOffset、rawLength和partLength
final IndexRecord rec = new IndexRecord();
//封装value的字节表示的内部类
final InMemValBytes value = new InMemValBytes();
for (int i = 0; i < partitions; ++i) {
   //负责将map的输出写入中间文件
   IFile.Writer<K, V> writer = null;
   try {
       long segmentStart = out.getPos();
       writer = new Writer<K, V>(job, out, keyClass, valClass, codec,spilledRecordsCounter);
       if (combinerRunner == null) {
          // spill directly
          DataInputBuffer key = new DataInputBuffer();
          while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
                final int kvoff = offsetFor(spindex % maxRec);
                int keystart = kvmeta.get(kvoff + KEYSTART);
                int valstart = kvmeta.get(kvoff + VALSTART);
                key.reset(kvbuffer, keystart, valstart - keystart);
                getVBytesForOffset(kvoff, value);
                writer.append(key, value);
                ++spindex;
           }
        } else {
           int spstart = spindex;
           while (spindex < mend
&&kvmeta.get(offsetFor(spindex % maxRec)+ PARTITION) == i) {
                ++spindex;
           }
           // Note: we would like to avoid the combiner if we've fewer
           // than some threshold of records for a partition
           if (spstart != spindex) {
             combineCollector.setWriter(writer);
             RawKeyValueIterator kvIter =new MRResultIterator(spstart, spindex);
             combinerRunner.combine(kvIter, combineCollector);
            }
        }
       // close the writer
       writer.close();
       // record offsets
       rec.startOffset = segmentStart;
       rec.rawLength = writer.getRawLength();
       rec.partLength = writer.getCompressedLength();
       spillRec.putIndex(rec, i);
       writer = null;
   } finally {
       if (null != writer) writer.close();
   }
}

      当保存索引的缓存超过限制时,就将索引保存到文件中,源代码如下:

if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
 // create spill index file
//MAP_OUTPUT_INDEX_RECORD_LENGTH值为24,表示索引文件中每条记录的大小
 Path indexFilename =mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
 spillRec.writeToFile(indexFilename, job);
} else {
 indexCacheList.add(spillRec);
 totalIndexCacheMemory +=spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}

      综合上面的分析可知,当在map方法中执行context.write时,将先数据写入到缓存中,当缓存中的数据达到预先设置的阈值时由后台SpillThread线程负责数据排序并将数据溢出到map任务的中间输出文件中。

相关内容