MapReduce源码分析之MapTask分析(二),mapreducemaptask


SpillThread分析

为什么需要Spill

         内存大小总是有效,因此在Mapper在处理过程中,数据持续输出到内存中时,必然需要有机制能将内存中的数据换出,合理的刷出到磁盘上。SpillThread就是用来完成这部分工作。

         SpillThread的线程处理函数只是做一层封装,当索引表中的kvstart和kvend指向一样的索引位置时,会持续处于等待过程,等待外部通知需要触发spill动作,当有spill请求时,会触发StartSpill来唤醒SpillThread线程,进入到sortAndSpill。

    下面就是SpillThread线程体函数。

    protected class SpillThread extends Thread {
      @Override
      public void run() {
        spillLock.lock();
        spillThreadRunning = true;
        try {
          while (true) {
            spillDone.signal();
            while (kvstart == kvend) {
			// 等待被唤醒
              spillReady.await();
            }
            try {
              spillLock.unlock();
			// spill处理
              sortAndSpill();
            } catch (...) {
			...
            } finally {
              spillLock.lock();
			// 重置索引区,更新buf缓冲区的尾部位置信息
              if (bufend < bufindex && bufindex < bufstart) {
                bufvoid = kvbuffer.length;
              }
              kvstart = kvend;
              bufstart = bufend;
            }
          }
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        } finally {
          spillLock.unlock();
          spillThreadRunning = false;
        }
      }
    }

线程函数内的处理逻辑比较简单,主要分为三个步骤:

1.等待唤醒

2.对内存中的数据进行排序并将数据溢出写入到磁盘,这部分内部分析见下文。

3.重置索引区和缓存区的end标记

sortAndSpill

内存数据的溢出处理是有此函数进行封装,下面我们将该函数按块进行详细分析。

    private void sortAndSpill() throws IOException, ClassNotFoundException,
                                       InterruptedException {
      //approximate the length of the output file to be the length of the
      //buffer + header lengths for the partitions
      long size = (bufend >= bufstart
          ? bufend - bufstart
          : (bufvoid - bufend) + bufstart) +
                  partitions * APPROX_HEADER_LENGTH;
      FSDataOutputStream out = null;
      try {
		// part1
        // create spill file
        final SpillRecord spillRec = new SpillRecord(partitions);
        final Path filename =
            mapOutputFile.getSpillFileForWrite(numSpills, size);
        out = rfs.create(filename);
		
		// part2
        final int endPosition = (kvend > kvstart)
          ? kvend
          : kvoffsets.length + kvend;
        sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
	
		
        int spindex = kvstart;
        IndexRecord rec = new IndexRecord();
        InMemValBytes value = new InMemValBytes();
        for (int i = 0; i < partitions; ++i) {
          IFile.Writer<K, V> writer = null;
          try {
			// part3
            long segmentStart = out.getPos();
            writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
                                      spilledRecordsCounter);
			// part4
            if (combinerRunner == null) {
              // spill directly
              DataInputBuffer key = new DataInputBuffer();
              while (spindex < endPosition &&
                  kvindices[kvoffsets[spindex % kvoffsets.length]
                            + PARTITION] == i) {
                final int kvoff = kvoffsets[spindex % kvoffsets.length];
                getVBytesForOffset(kvoff, value);
                key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
                          (kvindices[kvoff + VALSTART] - 
                           kvindices[kvoff + KEYSTART]));
                writer.append(key, value);
                ++spindex;
              }
            } else {
			// part5
              int spstart = spindex;
              while (spindex < endPosition &&
                  kvindices[kvoffsets[spindex % kvoffsets.length]
                            + PARTITION] == i) {
                ++spindex;
              }
              // Note: we would like to avoid the combiner if we've fewer
              // than some threshold of records for a partition
              if (spstart != spindex) {
                combineCollector.setWriter(writer);
                RawKeyValueIterator kvIter =
                  new MRResultIterator(spstart, spindex);
                combinerRunner.combine(kvIter, combineCollector);
              }
            }
		
			// part6
            // close the writer
            writer.close();
			
            // record offsets
            rec.startOffset = segmentStart;
            rec.rawLength = writer.getRawLength();
            rec.partLength = writer.getCompressedLength();
            spillRec.putIndex(rec, i);

            writer = null;
          } finally {
            if (null != writer) writer.close();
          }
        }
		
		// part7
        if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
          // create spill index file
          Path indexFilename =
              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
          spillRec.writeToFile(indexFilename, job);
        } else {
          indexCacheList.add(spillRec);
          totalIndexCacheMemory +=
            spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
        }
        LOG.info("Finished spill " + numSpills);
        ++numSpills;
      } finally {
        if (out != null) out.close();
      }
    }

part1:创建SpillRecord,创建文件流

    SpillRecord是一个记录集,用于记录分区在数据文件中的文件起始位置,原始长度,压缩后的长度信息。

    SpillRecord的成员只有两个。一个是buf,长度为分区个数*每条分区索引信息占用的长度,另一个是为记录方便转换成的LogBuffer。

    每条分区索引信息占用的长度由MAP_OUTPUT_INDEX_RECORD_LENGTH来表示,占用24个字节,即3个Long。

  public SpillRecord(int numPartitions) {
    buf = ByteBuffer.allocate(
        numPartitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
    entries = buf.asLongBuffer();
  }

    创建文件流,文件的路径是根据numspill来产生的,第一个溢出的文件就是spill0.out,以此类推后续的溢出的文件就是spill0.out,spill1.out ...

    在每次满足溢出的时候,都会产生一个溢出的文件,这些溢出的文件最后会在处理完Mapper在最后的flush阶段触发merge动作,将所有溢出的文件进行合并为一个文件。


part2:数据排序

    获取溢出的处理的索引区间的尾部位置,这个索引区间是有kvstart,kvend所标识出来,kvstart记录了索引区开始使用的起始位置,kvend记录了索引区使用的结束位置。这一段索引区所指向的数据缓冲区就是需要被处理刷入到文件的。在上文,我们提到了因为是循环缓冲区,索引在没有到缓冲区尾部时是kvstart<kvend,当kvend走到尾循环回来,kvstart>kvend。

在排序时,为处理简单,指定出一个统一的区间,使用endpostion表示出尾部位置。当kvend在前,endposition为kvoffsets的长度+kvend。

    MapReduce的核心是对数据排序,在MapTask需要对每次溢出的数据按分区进行排序,保证分区内的数据是有序的,分区从小到大递增。排序的工作是由sorter完成,排序在内存中排列完成。

    sorter是一个IndexedSorter类型,在MapOutputBuffer初始化时从conf中获取map.sort.class所指定的sort类,默认是使用QuickSort。我们截取部分排序函数的部分代码,来分析排序过程。

private static void sortInternal(final IndexedSortable s, int p, int r,
      final Progressable rep, int depth) {
    if (null != rep) {
      rep.progress();
    }
    while (true) {
    if (r-p < 13) {
	//p为其实位置,r为结束位置,s为MapOutputBuffer
      for (int i = p; i < r; ++i) {
        for (int j = i; j > p && s.compare(j-1, j) > 0; --j) {
          s.swap(j, j-1);
        }
      }
      return;
    }
	....
}

    sort的关键两步就是key之间比较,和交换。compare使用的和swap调用的都是MapOutputBuffer中的两个函数,先看compare函数,comapre传入的是两个kvoffsets索引区的两个index,因为endposition有可能是大于kevoffsets的长度,因此在取真实index的时候,需要对kvoffsets的长度进行取余。比较会先取出kvoffsets中的值,再通过该值定位到k,v在二级索引区kvindices中记录的k,v所属的分区,在kvbuffer的位置,长度。排序优先级为,低分区->高分区,分区一样则根据key排序。

    当符合条件,使用swap函数,交换kvoffsets中记录kvindices的索引值,因此排序的开销很小,不需要每次移动key,仅通过kvoffsets就完成比较排序。

    public int compare(int i, int j) {
      final int ii = kvoffsets[i % kvoffsets.length];
      final int ij = kvoffsets[j % kvoffsets.length];
      // sort by partition
      if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) {
        return kvindices[ii + PARTITION] - kvindices[ij + PARTITION];
      }
      // sort by key
      return comparator.compare(kvbuffer,
          kvindices[ii + KEYSTART],
          kvindices[ii + VALSTART] - kvindices[ii + KEYSTART],
          kvbuffer,
          kvindices[ij + KEYSTART],
          kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]);
    }
	
    public void swap(int i, int j) {
      i %= kvoffsets.length;
      j %= kvoffsets.length;
      int tmp = kvoffsets[i];
      kvoffsets[i] = kvoffsets[j];
      kvoffsets[j] = tmp;
    }

上图就是排序前后的变化过程。排序前kvbuffer中有key为字符串,value为int值。

第一个item的key为"ba",第二个item的key为"aa",所属分区都为分区1,按照字典序排序"aa","ba"。排序后,二级索引kvindices和kvbuffer都没有变动,只是在一级索引kvoffsets中交换指向,在kvoffsets[0]=1指向"aa",kvoffsets[1]=0指向"ba"。

part3: IFile数据格式

    IFile是一种存储格式,用于表示MapTask在处理数据溢出到磁盘文件,数据在磁盘文件中以什么形式组织。存储形式为如下

KeyLength

valueLength

key

Value

KeyLength

valueLength

key

Value

EOF_MARKER

EOF_MARKER

4字节CRC

 

每个key,value输出到文件中,都会以上述keylength,valuelength,key,value的形式逐个排列,在close时,会输出两个标记,非别是key,value长度的标记,标记为-1表示key,value输出结束,在尾部会后一个针对整个文件的crc校验码。

 

IFile类内有两个子类,分别是Reader,Writer用于读取和写入IFile文件。 

Writer的内部成员:

	//用于io操作的输出流,基于checksum的流产生
    FSDataOutputStream out;
	//记录原始的输出流,也就是第一部分中产生的文件流
    FSDataOutputStream rawOut;
	//基于文件流产生的checksum输出流,特点是write时内部会做crc
    IFileOutputStream checksumOut;

	//key,value的序列化,和"核心成员变量中的key,value序列化类一样的功能"
    Class<K> keyClass;
    Class<V> valueClass;
    Serializer<K> keySerializer;
    Serializer<V> valueSerializer;

    public Writer(Configuration conf, FSDataOutputStream out, 
        Class<K> keyClass, Class<V> valueClass,
        CompressionCodec codec, Counters.Counter writesCounter)
        throws IOException {
	 //根据文件流封了一层可以在输出时做crc
      this.checksumOut = new IFileOutputStream(out);
      this.rawOut = out;
      this.start = this.rawOut.getPos();
      
      if (codec != null) {
		...
      } else {
		//writer内部用于io输出的流是基于checksumOut产生的。
        this.out = new FSDataOutputStream(checksumOut,null);
      }
      
	// key,value序列化类,是输出key,value到buffer中,真正写的时候从buffer中取出
      this.keyClass = keyClass;
      this.valueClass = valueClass;
      SerializationFactory serializationFactory = new SerializationFactory(conf);
      this.keySerializer = serializationFactory.getSerializer(keyClass);
      this.keySerializer.open(buffer);
      this.valueSerializer = serializationFactory.getSerializer(valueClass);
      this.valueSerializer.open(buffer);
    }

part4:无Combiner处理

         当用户没有指定commbiner,就不需要做combiner处理,可以通过IFile.Writer直接将已排序好的数据逐个按分区输出到磁盘文件。区分是否是同一个分区的数据,是根据当前spindex所指向的一级索引kvoffsets所标识的数据是否属于当前分区号,如果是同一个分区,就使用writer进行输出,否则切换到处理下一个分区。

         这里有一点需要注意的是,二级索引kvindices中每一项(分区号,keyOffset,valOffset)标识一对key,value,key的长度可以根据valOffset-keyOffset获取到key的长度,而value的长度需要通过先取得kvindices中的下一项,通过下一个项中的key的偏移-当前的val的偏移获取到val的长度。这部分的代码会封装在getVBytesForOffset

         writer的输出比较简单,输出key,value之前,先输出key长度,value长度。

    public void append(DataInputBuffer key, DataInputBuffer value)
    throws IOException {
      int keyLength = key.getLength() - key.getPosition();
      if (keyLength < 0) {
        throw new IOException("Negative key-length not allowed: " + keyLength + 
                              " for " + key);
      }
      
      int valueLength = value.getLength() - value.getPosition();
      if (valueLength < 0) {
        throw new IOException("Negative value-length not allowed: " + 
                              valueLength + " for " + value);
      }

      WritableUtils.writeVInt(out, keyLength);
      WritableUtils.writeVInt(out, valueLength);
      out.write(key.getData(), key.getPosition(), keyLength); 
      out.write(value.getData(), value.getPosition(), valueLength); 

      // Update bytes written
      decompressedBytesWritten += keyLength + valueLength + 
                      WritableUtils.getVIntSize(keyLength) + 
                      WritableUtils.getVIntSize(valueLength);
      ++numRecordsWritten;
    }

part5: Combiner处理

    如果用户指定过Combiner,那么处理和无Combiner有一点小差别。需要在输出的时候,针对同一分区内的数据做一次过滤。同一分区的数据区间通过spstart,spindex标识出来。

    combineCollector.setWriter(writer);这里将IFile.Writer设置进去,在combiner处理中调用collect将会调用到CombineOutputCollector.collect,这一步就是和无Combiner一样将数据输出到IFile.Writer中。

    public synchronized void collect(K key, V value)
        throws IOException {
      outCounter.increment(1);
      writer.append(key, value);
      if ((outCounter.getValue() % progressBar) == 0) {
        progressable.progress();
      }
    }

    combinerRunner.combine(kvIter, combineCollector);是如何执行的呢,这里会因为combinerRunner的不同而不同,我们关注的是旧的MR处理,因此我们跟踪到OldCombinerRunner.combiner,可以看到流程实际上非常简单,整个迭代的过程是判断是否还有数据没有被处理掉,有则一直循环,依次调用reduce函数,每处理一次相同的key的数据后,通过nextKey切换到下一个不同的key再次重复。在用户的reduce函数内,因为collector是CombineOutputCollector,因此用户在collector.collect输出key,value,实际上是输出到IFile.Writer流中。

    protected void combine(RawKeyValueIterator kvIter,
                           OutputCollector<K,V> combineCollector
                           ) throws IOException {
	//combiner是一个Reduer
      Reducer<K,V,K,V> combiner = 
        ReflectionUtils.newInstance(combinerClass, job);
      try {
		//取得value的迭代器
        CombineValuesIterator<K,V> values = 
          new CombineValuesIterator<K,V>(kvIter, comparator, keyClass, 
                                         valueClass, job, Reporter.NULL,
                                         inputCounter);
		//判断依据是spstart是否走到spindex
        while (values.more()) {
          combiner.reduce(values.getKey(), values, combineCollector,
              Reporter.NULL);
		//跳过相同key直到读取到下一个不相同的key
          values.nextKey();
        }
      } finally {
        combiner.close();
      }
    }

    Reducer的处理函数reduce大家都知道入参是key和一串value,这里的一串value通过Iterator来表示。void reduce(K2 key,Iterator<V2> values, OutputCollector<K3, V3> output, Reporterreporter) throws IOException;

    那这里的Iterator<V2>values是如何从kvoffsets中将已经排序过的,相邻的相同的key的value放在一起的呢,这部分功能是有CombineValuesIterator的父类ValuesIterator来实现的,ValuesIterator的基类是Iterator,Iterator的接口hasNext和next都有实现。

    ValuesIterator有一个一直被调用到的方法,是readNextKey用来获取下一个key并判断是否后续还有数据(more标识)以及是否还有相同的key(hasNext标识)。

    private void readNextKey() throws IOException {
      //根据spstart是否到达spindex指向的区间尾部
	  more = in.next();
      if (more) {
        DataInputBuffer nextKeyBytes = in.getKey();
        keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(),
            nextKeyBytes.getLength() - nextKeyBytes.getPosition());
		//将keyIn中的key反序列化到nextKey变量中
        nextKey = keyDeserializer.deserialize(nextKey);
        //判断是否还有相同的key存在
        hasNext = key != null && (comparator.compare(key, nextKey) == 0);
      } else {
        hasNext = false;
      }
    }

    public boolean hasNext() { return hasNext; }

    private int ctr = 0;
	public VALUE next() {
      if (!hasNext) {
        throw new NoSuchElementException("iterate past last value");
      }
      try {
		//返回相同的key的value,读取下一个key,如果key相同,
		//下一次仍会进入到next函数中,读取到相同key的value
        readNextValue();
        readNextKey();
      } catch (IOException ie) {
        throw new RuntimeException("problem advancing post rec#"+ctr, ie);
      }
      reporter.progress();
      return value;
    }
	
	//在每调用一次reduce处理完相同key所对应的一串value,
	//会通过nextKey函数取得下一个不同的key,重新进入到reduce函数。
    /** Start processing next unique key. */
    void nextKey() throws IOException {
     //读取到下一个不同的key,实际上combiner的处理,是不会进入到while循环内
      while (hasNext) { 
        readNextKey();
      }
      ++ctr;
      
      // move the next key to the current one
      KEY tmpKey = key;
      key = nextKey;
      nextKey = tmpKey;
      hasNext = more;
    }

part6:关闭流,记录索引信息

         在输出输出完成后,会调用IFile.Writer的close函数,插入两个EOF_MARKER,并写入checksum.

    public void close() throws IOException {

      // Close the serializers
      keySerializer.close();
      valueSerializer.close();

      // Write EOF_MARKER for key/value length
      WritableUtils.writeVInt(out, EOF_MARKER);
      WritableUtils.writeVInt(out, EOF_MARKER);
      decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
      
      //Flush the stream
      out.flush();
  
      if (compressOutput) {
        // Flush
        compressedOut.finish();
        compressedOut.resetState();
      }
      
      // Close the underlying stream iff we own it...
      if (ownOutputStream) {
        out.close();
      }
      else {
        //写入checksum
        checksumOut.finish();
      }

      compressedBytesWritten = rawOut.getPos() - start;

      if (compressOutput) {
        // Return back the compressor
        CodecPool.returnCompressor(compressor);
        compressor = null;
      }

      out = null;
      if(writtenRecordsCounter != null) {
        writtenRecordsCounter.increment(numRecordsWritten);
      }
    }

    checkSum的写入可以看到IFileOutputStream的finish函数,会从DataChecksum取出4个字节的checksum值写入到文件尾部。

  public void finish() throws IOException {
    if (finished) {
      return;
    }
    finished = true;
    sum.writeValue(barray, 0, false);
    out.write (barray, 0, sum.getChecksumSize());
    out.flush(); 
  }

    关闭掉输出流后,会将当前分区在磁盘文件中的起始位置,结束位置信息记录到索引信息中。索引信息在内存中是存放在SpillRecord中。

part7: IFile的索引文件

    每个IFile数据文件就有一个对应的索引文件和它一一对应,这个索引文件有可能在内存,也有可能在磁盘上真实存在的索引文件。IFile文件对应的的索引信息会在满足条件的情况下内存中缓存着,一个IFile对应的索引信息封装在SpillRecord中,这个索引信息SpillRecord存储在indexCacheList中,当索引的cache超过1M大小后,那么会将后来产生的索引信息输出到磁盘上形成一个索引文件。这个索引文件的文件名为"spill"+ spillNumber +".out.index",spillNumber就是:numSpills变量所记录的当前进行到第几次spill。

 

         以每个文件用户设置了两个ReduceTask那么paritition个数为2,那么IFile的索引文件在磁盘中的形式为:

索引对应的数据

索引文件存储内容

Spill0的partition0

startOffset

rawLength

partLength

Spill0的partition1

startOffset

rawLength

partLength

Spill1的partition0

startOffset

rawLength

partLength

Spill1的partition1

startOffset

rawLength

partLength

 

8字节的crc

 

Merge

         sortAndSpill已经将内存中的数据写成一个个IFile数据文件,这些文件最终会被合并为一个数据文件以及该数据文件对应的索引文件。Merge这部分将会分析数据文件是如何被merge成单个文件。

         先回到runOldMapper中,在前面我们介绍过这部分代码了,再次重新看看这部分。collector.flush将会触发将MapOutputBuffer中的剩余数据flush到磁盘上,并最终将已经存在磁盘上的数据文件合并为一个文件。

runOldMapper:
      runner.run(in, new OldOutputCollector(collector, conf), reporter);
      collector.flush();

	// MapOutputBuffer.flush
    public synchronized void flush() throws IOException, ClassNotFoundException,
                                            InterruptedException {
      LOG.info("Starting flush of map output");
      spillLock.lock();
      try {
		//等待正在进行中的spill动作完成
        while (kvstart != kvend) {
          reporter.progress();
          spillDone.await();
        }
        if (sortSpillException != null) {
          throw (IOException)new IOException("Spill failed"
              ).initCause(sortSpillException);
        }
		//缓存中剩余的数据,需要触发一次spill动作将剩余数据spill到磁盘上
        if (kvend != kvindex) {
          kvend = kvindex;
          bufend = bufmark;
          sortAndSpill();
        }
      } catch (InterruptedException e) {
        throw (IOException)new IOException(
            "Buffer interrupted while waiting for the writer"
            ).initCause(e);
      } finally {
        spillLock.unlock();
      }
      assert !spillLock.isHeldByCurrentThread();
      // shut down spill thread and wait for it to exit. Since the preceding
      // ensures that it is finished with its work (and sortAndSpill did not
      // throw), we elect to use an interrupt instead of setting a flag.
      // Spilling simultaneously from this thread while the spill thread
      // finishes its work might be both a useful way to extend this and also
      // sufficient motivation for the latter approach.
      try {
        spillThread.interrupt();
        spillThread.join();
      } catch (InterruptedException e) {
        throw (IOException)new IOException("Spill failed"
            ).initCause(e);
      }
      // release sort buffer before the merge
      kvbuffer = null;
	//触发将小的spill文件合并为大的spill文件。
      mergeParts();
      Path outputPath = mapOutputFile.getOutputFile();
      fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
    }

	merge的动作会有mergeParts函数触发,先看该函数的实现:
    private void mergeParts() throws IOException, InterruptedException, 
                                     ClassNotFoundException {
      // get the approximate size of the final output/index files
      long finalOutFileSize = 0;
      long finalIndexFileSize = 0;
      final Path[] filename = new Path[numSpills];
      final TaskAttemptID mapId = getTaskID();
	//获取磁盘上的所有spill文件的文件名
      for(int i = 0; i < numSpills; i++) {
        filename[i] = mapOutputFile.getSpillFile(i);
        finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
      }
	//只有一个spill文件,那么只需要将数据文件和索引文件rename即可
      if (numSpills == 1) { //the spill is the final output
        rfs.rename(filename[0],
            new Path(filename[0].getParent(), "file.out"));
        if (indexCacheList.size() == 0) {
          rfs.rename(mapOutputFile.getSpillIndexFile(0),
              new Path(filename[0].getParent(),"file.out.index"));
        } else {
          indexCacheList.get(0).writeToFile(
                new Path(filename[0].getParent(),"file.out.index"), job);
        }
        return;
      }
	
	
      // read in paged indices
	//加载因内存中缓存不下而刷出到磁盘上的索引文件到内存中
      for (int i = indexCacheList.size(); i < numSpills; ++i) {
        Path indexFileName = mapOutputFile.getSpillIndexFile(i);
        indexCacheList.add(new SpillRecord(indexFileName, job, null));
      }

      //make correction in the length to include the sequence file header
      //lengths for each partition
      finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
      finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
	//获取最终的spill文件和index文件的路径名
      Path finalOutputFile =
          mapOutputFile.getOutputFileForWrite(finalOutFileSize);
      Path finalIndexFile =
          mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);

      //The output stream for the final single output file
      FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
	
	//没有触发过spill动作,则形成一个空的spill文件和index文件。
      if (numSpills == 0) {
        //create dummy files
        IndexRecord rec = new IndexRecord();
        SpillRecord sr = new SpillRecord(partitions);
        try {
          for (int i = 0; i < partitions; i++) {
            long segmentStart = finalOut.getPos();
            Writer<K, V> writer =
              new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
            writer.close();
            rec.startOffset = segmentStart;
            rec.rawLength = writer.getRawLength();
            rec.partLength = writer.getCompressedLength();
            sr.putIndex(rec, i);
          }
          sr.writeToFile(finalIndexFile, job);
        } finally {
          finalOut.close();
        }
        return;
      }
      {
        IndexRecord rec = new IndexRecord();
        final SpillRecord spillRec = new SpillRecord(partitions);
        for (int parts = 0; parts < partitions; parts++) {
          //create the segments to be merged
		//抽取spill文件中属于该paritition的索引形成一个segment
		//所有属于同一个分区的信息性能一个segment列表
          List<Segment<K,V>> segmentList =
            new ArrayList<Segment<K, V>>(numSpills);
          for(int i = 0; i < numSpills; i++) {
            IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);

            Segment<K,V> s =
              new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
                               indexRecord.partLength, codec, true);
            segmentList.add(i, s);

            if (LOG.isDebugEnabled()) {
              LOG.debug("MapId=" + mapId + " Reducer=" + parts +
                  "Spill =" + i + "(" + indexRecord.startOffset + "," +
                  indexRecord.rawLength + ", " + indexRecord.partLength + ")");
            }
          }
		
		//将属于同一个分区的数据进行merge
          //merge
          @SuppressWarnings("unchecked")
          RawKeyValueIterator kvIter = Merger.merge(job, rfs,
                         keyClass, valClass, codec,
                         segmentList, job.getInt("io.sort.factor", 100),
                         new Path(mapId.toString()),
                         job.getOutputKeyComparator(), reporter,
                         null, spilledRecordsCounter);

          //write merged output to disk
		//将merge后的数据写入到最终的spill文件中
          long segmentStart = finalOut.getPos();
          Writer<K, V> writer =
              new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
                               spilledRecordsCounter);
          if (combinerRunner == null || numSpills < minSpillsForCombine) {
            Merger.writeFile(kvIter, writer, reporter, job);
          } else {
            combineCollector.setWriter(writer);
            combinerRunner.combine(kvIter, combineCollector);
          }

          //close
          writer.close();

          // record offsets
		//记录当前分区在spill.out文件中的索引信息
          rec.startOffset = segmentStart;
          rec.rawLength = writer.getRawLength();
          rec.partLength = writer.getCompressedLength();
          spillRec.putIndex(rec, parts);
        }
		//将索引信息写入到spill.index.out        		    
	    spillRec.writeToFile(finalIndexFile, job);
        finalOut.close();
		//将每次触发spill而产生的spill文件删除
        for(int i = 0; i < numSpills; i++) {
          rfs.delete(filename[i],true);
        }
      }
    }

mergeParts的处理流程主要分为几个步骤:

1.获取到磁盘上属于这个Task的所有spill文件名

2.整个MapTask运行过程中只是触发过一次spill动作,那么只需要做一下rename那mergeParts就算完成了。rename的过程是将spill0.out重命名为spill.out,将索引文件spill0.out.index重命名为spill.out.index。如果spill0.out的索引文件还在缓存中,则只要将缓存的索引写入到spill.out.index。

3.前面触发产生的spill文件的索引会缓存在cache中也就是在indexCacheList,因为cache大小有限,因此后面spill产生的索引信息会落到索引文件中,这里需要加载因内存中缓存不下而刷出到磁盘上的索引文件。

4.获取最终的spill文件的路径名:spill.out和索引文件的路径名:spill.out.index,并创建spill.out文件的输出流。

5.如果传递给这个MapTask的一个空的InputSplit,那么就没有后续的merge动作,只要在spill.out文件中只是输出两个end标记和一个4个字节的crc,在spill.out.index中记录下索引信息。

6.首先,先介绍下两个变量类型,IndexRecord和Segment。

IndexRecord:是记录一分区的索引信息,一个spill文件的索引信息是由n个partition的索引IndexRecord组成一个Spill文件的索引SpillRecord。

Segment:类似IndexRecord,但是还多一些信息,表示这个分区的索引是对应的那个spill文件。

1)在这部分处理中,标明了磁盘上必然有有多个spill文件,需要将这些spill文件属于同一个partition的索引信息封装在segment列表中。

2)Merge.merge需要根据segment 列表将不同spill文件中同一个parition的数据进行merge。

3)在merge完成后,如果没有指定combiner那么直接通过IFile.Writer将数据写入到文件中,如果有则调用用户指定的Combiner,对同一个key的数据进行过滤,combiner的处理在前面已经分析过了,不再累赘。

4)在IndexRecord中记录合并后属于这个partition的索引信息,将该索引信息记录到SpillRecord中。

5)重复1)到4)直至对所有partition处理完毕。

7.将spill.out的索引文件写入到spill.out.index中。

8.删除spill文件:spill0.out,...spilln.out,这里有一点奇怪的是没有删除spill文件对应的索引文件。我看到在hadoop2.4.0中也没有删除,这个还不清楚是否故意而为之。

总结

         至此,整个详细的MapTask的分析就此为完成了,在分析过程中我们知道了MapTask是如何使用循环缓存区管理数据,知道了数据在缓存不下是如何做spill处理的,spill输出的数据格式,combiner如何处理,如何将多一个文件merge为一个等等。也希望通过阅读这部分源码能学习到部分设计思路,能在未来的设计中提供多一种思路。


hadoop的编译环境,就是编写mapreduce代码的,我有IBM提供的mapreduce_tools插件与eclipse搭建环境

map的JVM内存溢出,这个值默认是200M,有些小,你需要设置的大点,设置mapred.child.java.opts为512M试试
 

hadoop的mapreduce常见算法案例有几种

基本MapReduce模式

计数与求和
问题陈述:
有许多文档,每个文档都有一些字段组成。需要计算出每个字段在所有文档中的出现次数或者这些字段的其他什么统计值。例如,给定一个log文件,其中的每条记录都包含一个响应时间,需要计算出平均响应时间。
解决方案:
让我们先从简单的例子入手。在下面的代码片段里,Mapper每遇到指定词就把频次记1,Reducer一个个遍历这些词的集合然后把他们的频次加和。

1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Reducer
7 method Reduce(term t, counts [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)

这种方法的缺点显而易见,Mapper提交了太多无意义的计数。它完全可以通过先对每个文档中的词进行计数从而减少传递给Reducer的数据量:

1 class Mapper
2 method Map(docid id, doc d)
3 H = new AssociativeArray
4 for all term t in doc d do
5 H{t} = H{t} + 1
6 for all term t in H do
7 Emit(term t, count H{t})

如果要累计计数的的不只是单个文档中的内容,还包括了一个Mapper节点处理的所有文档,那就要用到Combiner了:

1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Combiner
7 method Combine(term t, [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)
12
13 class Reducer
14 method Reduce(term t, counts [c1, c2,...])
15 sum = 0
16 for all count c in [c1, c2,...] do
17 sum = sum + c
18 Emit(term t, count sum)

应用:Log 分析, 数据查询

整理归类

问题陈述:
有一系列条目,每个条目都有几个属性,要把具有同一属性值的条目都保存在一个文件里,或者把条目按照属性值分组。 最典型的应用是倒排索引。
解决方案:
解决方案很简单。 在 Mapper 中以每个条......余下全文>>
 

相关内容