MapReduce任务执行过程研究之Collect过程


最近一直在找工作,写论文,对MapReduce源代码的学习搁置了很久,想来想去认为不能放弃,有意义的事情一定要做好,要做到底,要尽力。前面的文章到后来写的有些心不在焉,有应付之嫌,如今重新拾起,认真学习,认真写下去。MR 2.0已经发布很久了,新架构新思想很值得学习,学无止境啊。

参考书目:

【1】《Java编程思想(第四版)》

【2】《Hadoop 技术内幕:深入解析MapReduce架构设计与实现原理》

【3】《Hadoop 技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理》

从Map阶段的Collect过程开始吧,力求有所收获。

看MapTask类的入口函数run,根据配置判断启用old mapper或new mapper。如果是前者:

准备一个MapRunner跑用户的map函数。这个MapRunner实现了MapRunnable泛型接口,四个泛型参数分别代表map的输入键值对和输出键值对的类型(INKEY,INVALUE,OUTKEY,OUTVALUE)。对于MapRunner来说,两个泛型参数来自RecordReader<INKEY,INVALUE>对象;另外两个来自OldOutputCollector对象。后者使用MapOutputBuffer<OUTKEY,OUTVALUE>对象构造。MapOutputBuffer类实现了泛型接口MapOutputCollector,因而具有collect功能。这样一个MapRunner就具备了读取数据(read)和输出数据(collect)的功能。MapRunner通过run函数使用上述两个功能对象。

如果是后者,就有了新的一套RecordReader和OutputCollector,并使用了一个Context对象封装上述功能,传入run函数。不打算详细学习这部分内容。

回到old mapper的实现中,前面提到泛型,由于对java中的泛型技术比较陌生,这里详细学习一下MapRunner.run方法中涉及到泛型技术,顺便还有反射的内容。

首先,看RecordReader,它是一个泛型接口。使用泛型而不是普通接口的好处是,实现接口不仅仅具有了接口的功能,同时接口方法的参数和返回值支持多种类型。对于RecordReader来说,支持多种类型的key和value。如果不使用泛型,则在接口中使用key和value类型的基类,这样就只支持基类及其派生类,不支持该派生体系外的类型。Java支持泛型方法,这使得方法能够独立于类产生变化【1】。如果能使用泛型方法解决问题,就不使用泛型类。MapTask类的runOldMapper方法就是一个泛型方法,其签名如下:

private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runOldMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    )
MapTask本身不是个泛型类。四个泛型参数在构造MapRunnable对象时使用:

MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner = ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
MapRunnable也是个泛型接口,其run方法的参数RecordReader和OutputCollector使用了泛型参数:

public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
                  Reporter reporter)
再看runner的构造,使用ReflectionUtils的静态方法实现,该类是MapReduce提供的一个反射工具类。newInstance方法是个静态泛型方法:
public static <T> T newInstance(Class<T> theClass, Configuration conf) {
    T result;
    try {
      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
      if (meth == null) {
        meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);
        meth.setAccessible(true);
        CONSTRUCTOR_CACHE.put(theClass, meth);
      }
      result = meth.newInstance();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
    setConf(result, conf);
    return result;
  }

该方法的作用是根据给定的类型和配置创建对象。在Java中,一个static方法无法访问泛型类的泛型参数。因此,如果static方法需要使用泛型能力,就必须使其成为泛型方法【1】。静态泛型方法经常用于一些工具类作为创建对象的工具。具体看方法实现,首先查看缓存中有没有该类型的构造方法对象,这个缓存对象是这样实现的:

 /** 
   * Cache of constructors for each class. Pins the classes so they
   * can't be garbage collected until ReflectionUtils can be collected.
   */
  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = 
    new ConcurrentHashMap<Class<?>, Constructor<?>>();

ConcurrentHashMap允许并发的读取和写入,在修改完成之前,读取者无法看到。构造器缓存存储了很多类型的构造器对象,因此并发访问是必需的。从注释中看出,将构造器对象存入缓存,使得这些构造器所属类的对象不致被垃圾回收,除非整个工具类被垃圾回收。这里还有一个细节,在声明容器对象时泛型参数列表重复出现。使用泛型方法的类型参数推断特性可以简化代码。容器的创建可以通过下面类来实现:

public class Maps{
	public static <K,V> Map<K,V> map(){
		return new HashMap<K,V>();
	}
	public static <K,V> ConcurrentHashMap<K,V> cMap(){
		return new ConcurrentHashMap<K,V>();
	}
	
	public static void main(String[] args){
		Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.cMap();
	}
}

现在构造器缓存就使用工具类Maps的静态方法创建,而参数K和V可以由类型推断机制得知。回到newInstance方法中,getDeclaredConstructor方法返回指定类的构造器对象,其参数即表示构造器的参数列表。Constructor对象的newInstance方法的作用相当于调用该构造器所属类的构造器生成一个类的实例。Class对象的newInstance方法有相同的功能,实际上内部调用的也是Constructor对象的newInstance方法:

final Constructor<T> c = getConstructor0(empty, Member.DECLARED);
cachedConstructor = c;
Constructor<T> tmpConstructor = cachedConstructor;
return tmpConstructor.newInstance((Object[])null);

上面四句就是使用Class的newInstance构造对象的大概过程。

回到run方法,MapRunnable的run有两种实现:一是普通的MapRunner,另一个是多线程MultithreadedMapRunner,这里先学习前者。每个MapRunner对象中都有一个Mapper泛型对象用于执行用户提交的Map函数。

this.mapper = ReflectionUtils.newInstance(job.getMapperClass(), job);
mapper.map(key, value, output, reporter);

mapper的构造也是通过反射实现,用户提交作业时指定的是Mapper的Class类名。由此看来,MapReduce编程框架带给用户的方便之处与Java的反射机制是分不开的。

前面的文章提到过,collect方法是由用户的map函数调用的,例如Grep应用的mapper类RegexMap类中的map函数:

public void map(K key, Text value,
                  OutputCollector<Text, LongWritable> output,
                  Reporter reporter)
    throws IOException {
    String text = value.toString();
    Matcher matcher = pattern.matcher(text);
    while (matcher.find()) {
      output.collect(new Text(matcher.group(group)), new LongWritable(1));
    }

这里实际调用的是OldOutputCollector的collect方法,该方法通过partitioner获得key/value所在分区后,组成一个三元组以参数传递给MapOutputBuffer的collect方法:

collector.collect(key, value, partitioner.getPartition(key, value, numPartitions));

collect过程会将三元组写入环形缓冲区,详见前面文章,这里学习一些语言和设计上的特性。collect过程与spill过程同步进行,并发控制通过MapOutputBuffer的可重入互斥锁spillLock控制:

private final ReentrantLock spillLock = new ReentrantLock();

Thinking in Java中对可重入互斥锁的解释为:ReentrantLock允许你尝试着获取但最终未获取锁,这样如果其他人已经获取了这个锁,那你就可以决定离开去执行其他一些事情,而不是等待直至这个锁被释放。在Java中显式使用锁对象Lock的情况比较少,因为Lock对象必须显式地创建、锁定和释放。但有时synchronized关键字不能实现一些特殊需求:尝试着获取锁且最终获取失败,或者尝试获取锁一段时间,然后放弃。这里举一个书上的例子:

private ReentrantLock lock = new ReentrantLock();
	public void untimed(){
		boolean captured = lock.tryLock();
		try{
			System.out.println("tryLock(): " + captured);
		}finally{
			if(captured)
				lock.unlock();
		}
	}

lock.tryLock()如果没有获得锁,captured为false,此时不会阻塞线程,而是会继续执行下面语句输出一行。在finally块中,根据是否捕获到锁来释放锁。另外,在ReentrantLock上阻塞的任务具备可以被中断的能力,这与在synchronized方法或临界区上阻塞的任务不同(后者是不可中断的阻塞,不会抛出InterruptedException异常)。在collect方法中之所以使用可重入锁,我想就是因为使用上述后一种特性微笑,使其在阻塞时可中断,抛出异常。在MapOutputBuffer中还定义了两个条件变量spillReady和spillDone

private final Condition spillDone = spillLock.newCondition();
private final Condition spillReady = spillLock.newCondition();

这两个Condition对象通过互斥锁对象创建。在Java中Condition对象可以完成同步的功能,其操作方式类似信号量的操作,提供了await和signal以及signalAll方法,对应于Object中的wait,notify和notifyAll方法。Condition通常与Lock一起使用。在这里,startSpill方法中调用signalAll方法唤醒等待锁的线程:

spillReady.signal();

准备开始spill过程。当缓冲区满时,不能继续写入缓冲区,collect线程等待:

spillDone.await();

在看缓冲区的消费者spill线程spillThread的run方法框架(保留有关同步的部分):
   public void run() {
        spillLock.lock();
        spillThreadRunning = true;
        try {
          while (true) {
            spillDone.signal();
            while (kvstart == kvend) {
              spillReady.await();
            }
            try {
              spillLock.unlock();
              sortAndSpill();
           }finally {
              spillLock.lock();
              ...
              kvstart = kvend;
              bufstart = bufend;
            }
          }
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        } finally {
          spillLock.unlock();
          spillThreadRunning = false;
        }
      }
    }

首先获得锁,然后唤醒在spillDone上等待的collect线程,表明spill溢写结束,可以继续写入缓冲区了。当进入正常写入状态后(kvstart==kvend),调用spillReady.await(),挂起spill线程,暂停溢写,直到collect线程再次调用startSpill方法。当spill线程被唤醒并再次获得锁时,调用sortAndSpill对缓冲区数据进行依次快速排序然后写入磁盘。

注意,在调用await,signal和signalAll之前必须拥有锁,这里两个Condition变量在使用前必须拥有锁spillLock。collect同步控制的核心逻辑如下,用于对比:

     spillLock.lock();
      try {
        boolean kvfull;
        do {
          // sufficient acct space
          kvfull = kvnext == kvstart;
          final boolean kvsoftlimit = ((kvnext > kvend)
              ? kvnext - kvend > softRecordLimit
              : kvend - kvnext <= kvoffsets.length - softRecordLimit);
          if (kvstart == kvend && kvsoftlimit) {
            startSpill();
          }
          if (kvfull) {
            while (kvstart != kvend) {
              spillDone.await();
            }
          }
        } while (kvfull);
      } finally {
        spillLock.unlock();
      }

有一个疑问,如果spill线程发现collect正在写缓冲区而挂起,那么spill获得的锁就挂起了,collect就获得不到锁了,也就无法调用startSpill方法去唤醒挂起的spill线程,这样岂不是死锁了?

研究了相关资料,找到答案:await操作在调用时会先释放锁,然后挂起线程,并将线程加入一个等待队列。在调用signal时会让等待队列中第一个线程重新获得锁,并继续执行。这样spill挂起时,spillLock被释放掉,collect线程会持续获得锁,直到满足spill条件,调用startSpill方法,唤醒挂起的spill线程,当collect释放锁时,spill线程会重新获得spillLock,并继续执行。

使用Condition对象的目的相当于在锁上加上一个条件,实现更精细的同步控制。这里在同一个锁spillLock上使用了两个条件,spillReady条件表示只有缓冲区满足一定条件才能发生spill,读取缓冲区(消费者行为);spillDone条件表示只有满足一定条件(在这里只有缓冲区满的时候collect线程才挂起,即便是spill正在进行,缓冲区依然可以写入,读写不冲突,这体现了环形缓冲区的优势)才能写缓冲区(生产者行为)。

这个地方关于条件的锁的理解有什么错误或不足请不吝赐教。关于缓冲区的操作比较复杂,参考书目【2】第8章内容中作者针对各类情况给出了图文描述,推荐阅读。


看看sortAndSpill做了些什么,首先对bufstart和bufend下标之间的数据排序:

sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);

sorter是一个IndexSorter接口类型,其实现有HeapSort和QuickSort两种,这里采用的是后者快速排序。关于快速排序的实现请参考另一篇短文。接着按照分区依次将记录写入临时文件,如果有Combiner,先进行combine。最后记录分区的元信息到spillRecord。仔细观察combiner发现,它其实是一个reducer,调用reduce函数,并通过collect过程输出结果:

 Reducer<K,V,K,V> combiner = ReflectionUtils.newInstance(combinerClass, job);
      try {
        CombineValuesIterator<K,V> values = 
          new CombineValuesIterator<K,V>(kvIter, comparator, keyClass,  valueClass, job, Reporter.NULL,  inputCounter);
        while (values.more()) {
          combiner.reduce(values.getKey(), values, combineCollector, Reporter.NULL);
          values.nextKey();
        }
      } finally {
        combiner.close();
      }

在调用combine方法前,指定了combineCollector的writer对象用于在reduce函数调用collect时输出结果。

最后研究一下Hadoop中的序列化【3】。Hadoop重新定义了序列化的机制,原因是:在Java序列化的过程中,序列化输出中保存了大量的附加信息,导致序列化结果膨胀,对于需要保存和处理大规模数据的Hadoop来说,需要一个新的序列化机制。使用Java实现对象的序列化简单概括为:

1. 实现Serializable接口。

2. 在某种OutputStream的基础上创建ObjectOutputStream对象。

3. 调用writeObject方法进行序列化。

反序列化的过程类似,只需要使用对应的输入流,并调用readObject即可。Hadoop对序列化过程的优化为:同一个类对象的序列化结果只输出一份元数据;重用对象,在已有对象上进行反序列化操作。

具体的实现机制是:

可序列化的对象需要实现Writable接口,该接口含有两个方法:write(DataOutput out) 和 readFields(DataInput in)。前者借助Java的DataOutput类对象将Java原生类型以字节形式写入二进制流,后者从二进制流中读取字段。

Hadoop还提供了带有比较功能的WritableComparable接口,具有高效比较能力的RawComparator接口。前者兼具比较和序列化的功能;后者可以比较流中读取的未被反序列化为对象的记录,节省了创建对象的开销,十分高效。

Hadoop中的Text类型即为一种常见的键类型,其声明如下:

public class Text extends BinaryComparable
    implements WritableComparable<BinaryComparable> {}

首先它是一种BinaryComparable,然后它具有序列化和比较功能。Java为每个基本类型提供了对应的Writable类,short和char类型可以存入int类型。对于整型,有定长和变长两种版本,前者序列化后数据是定长的,后者可根据数据的实际长度变化,节约存储空间。如果需要序列化不同类型的对象到某个字段,可使用ObjectWritable类:

public class ObjectWritable implements Writable, Configurable {

  private Class declaredClass;//实际类型的类对象
  private Object instance;//需要序列化的对象
  private Configuration conf;

  public ObjectWritable() {}
  
  public ObjectWritable(Object instance) {
    set(instance);
  }

  public ObjectWritable(Class declaredClass, Object instance) {
    this.declaredClass = declaredClass;
    this.instance = instance;
  }
...
}

它的write方法中,分别处理了null、Java数组、String、Java基本类型、枚举类型和Writable子类六种情况。在输出时要记住对象的实际类型,因为传递给instance字段可能是实际类型的父类型。在readFields方法中,使用工厂方法根据传入的Class对象,创建Writable对象。

Hadoop还提供了简单的序列化框架API。通过Serialization实现获得一个Serializer对象,可将一个对象转换为一个字节流的实现实例。在collect过程中,会将key/value序列化到缓冲区中。这里使用了Serializer:

 private final Serializer<K> keySerializer;
 private final Serializer<V> valSerializer;

Serializer有两种实现,一种是JavaSerializationSerializer,一种是WritableSerializer。看后者的实现:

static class WritableSerializer implements Serializer<Writable> {
    private DataOutputStream dataOut;
    public void open(OutputStream out) {
      if (out instanceof DataOutputStream) {
        dataOut = (DataOutputStream) out;
      } else {
        dataOut = new DataOutputStream(out);
      }
    }
    public void serialize(Writable w) throws IOException {
      w.write(dataOut);
    }
    public void close() throws IOException {
      dataOut.close();
    }
  }

使用Serializer序列化首先通过open方法打开,传入一个输出流对象;然后调用serialize方法将对象序列化到流中,该方法实际调用的是Writable对象的write方法,传入的就是打开的流;最后使用close方法关闭。在collect阶段将key和value序列化:

    keySerializer.serialize(key);
    valSerializer.serialize(value);

注意JavaSerializationSerializer直接采用了Java的序列化机制,因此效率不高。另外,与Serializer对应的是Deserializer,用于反序列化,不赘述。今后遇到与序列化相关的功能时,再继续学习。

Collect过程结束,以后可能会就某个话题有补充。下篇计划学习Reduce任务的过程。

2014.04.01






相关内容

    暂无相关文章