Hadoop学习之MapReduce(六)


在这篇文章中主要关注MapReduce作业的输入和输出,由于Hadoop版本的变化及本人对这些变化了解的还不够深入,难免有描述不清楚的地方,会在进一步学习后更正不准确的地方。

作业输入

InputFormat描述了MapReduce作业的输入规范。MapReduce框架依靠作业的InputFormat实现:

1.   验证作业的输入规范。

2.   将输入文件分割为逻辑的InputSplit,每个InputSplit被分配给单个的Mapper。

3.   提供RecordReader的实现用于收集来自逻辑InputSplit的输入记录,进而被Mapper处理。

基于文件的InputFormat的默认行为,通常是FileInputFormat的子类,是基于输入文件的总的大小(单位为字节)将输入分割到逻辑InputSplit中。但是输入文件的FileSystem块大小作为InputSplit的上限,下限可以通过mapred.min.split.size设置。

显然,基于输入大小的逻辑分割对于很多应用程序是不够的,因为记录边界需要遵守。在这种情况下,应用程序必须实现一个RecordReader,遵守记录边界和为单个的任务呈现逻辑InputSplit的面向记录的视图的责任依赖于该RecordReader

TextInputFormat是默认的InputFormat。如果TextInputFormat作为给定作业的InputFormat,MapReduce框架检查输入文件是否以.gz结尾,并自动使用合适的CompressionCodec解压缩文件。但是必须注意以上述扩展结尾的压缩文件不能被分隔,每个这样的压缩文件被单个的mapper作为整体处理。

InputSplit

InputSplit表示被单独的Mapper处理的数据。通常InputSplit呈现了输入的面向字节的视图,RecordReader的任务是处理和呈现一个面向字节的视图。FileSplit是默认的InputSplit,它设置map.input.file为用于逻辑分隔的输入文件的路径。

RecordReader

RecordReader从InputSplit读取<key, value>对。通常RecordReader转化由InputSplit提供的输入的面向字节的视图,并将面向记录的数据呈现给Mapper处理,因而RecordReader承担处理记录边界和呈现带键值对的任务的责任。

作业输出

OutputFormat 描述了MapReduce作业的输出规范。MapReduce框架依靠作业的OutputFormat 实现:

1.   验证作业的输出规范。比如,检查输出目录不是已经存在的,否则会报错。

2.   提供RecordWriter 的实现用于写入作业的输出文件。输出文件存储在FileSystem中。

TextOutputFormat是默认的OutputFormat。

OutputCommitter

OutputCommitter描述了MapReduce作业的任务输出的提交。MapReduce框架依靠作业的OutputCommitter实现:

1.       在初始化时建立作业。比如,在作业初始化阶段建立作业的临时输出目录。当作业处于PREP状态和完成初始化任务后,作业被单独的任务建立,一旦建立任务完成,作业将变为RUNNING状态。

2.       在作业完成后清理作业。比如,移除临时输出目录。作业清理是在作业结束时由单独的任务执行的。在清理任务完成,作业声明为SUCCEDED/FAILED/KILLED状态。

3.       建立任务的临时输出目录。任务的建立是在任务初始化期间作为相同任务的一部分完成的。

4.       检查一个任务是否需要提交。这可以避免当任务不需要提交时的提交过程。

5.       提交任务输出。一旦任务完成,如果需要,任务将会提交它的输出。

6.       放弃任务提交。如果任务失败或者被杀死,输出将会被清理。如果任务不能执行清理(在异常处理块中),一个带有相同尝试ID的不同任务将会被加载来执行清理。

FileOutputCommitter 是默认的OutputCommitter。作业的建立和清理任务占据map或者reduce槽,无论哪个在TaskTracker上是可用的。JobCleanup任务、TaskCleanup任务和JobSetup任务依次拥有最高的优先级。

任务的副作用文件

在某些应用程序中,组件任务需要创建和写副文件,这种文件与实际的作业输出文件不同。在这种情况下,相同Mapper或者Reducer的两个实例同时(比如推测任务)尝试打开或者写入FileSystem上的相同文件将会存在问题。因此,开发应用程序的人员必须为每次任务尝试(使用attemptid,指明attempt_200709221812_0001_m_000000_0)选择唯一的名字而不仅仅是每个任务。

为了避免这些问题,MapReduce框架在当OutputCommitter是FileOutputCommitter时维护了一个特殊的${mapred.output.dir}/_temporary/_${taskid}子目录,该目录对于FileSystem(存储任务尝试的输出)上的每次任务尝试可以使用${mapred.work.output.dir} 访问。当任务尝试成功完成时,${mapred.output.dir}/_temporary/_${taskid}中的文件迁移到${mapred.output.dir}中,该过程对应用程序是完全透明地。

开发应用程序的人员可以利用上述的特性。具体该如何做呢?可以在任务执行期间使用FileOutputFormat.getWorkOutputPath()在${mapred.work.output.dir} 中创建任何要求的副文件,并且MapReduce框架将为成功的任务尝试迁移它们,这样就消除了为每个任务尝试选择唯一路径的需要。

需要注意的是,${mapred.work.output.dir} 的值在一个特定的任务尝试期间实际是${mapred.output.dir}/_temporary/_${taskid},这样该值就被MapReduce框架设置了。这样只需要在FileOutputFormat.getWorkOutputPath()返回的路径上创建任何副文件就可以利用该特性。上述的讨论必须将reducer设置为0,因为map的输出直接进入到HDFS中。

RecordWriter

RecordWriter将输出的<key, value>对写入到输出文件中。RecordWriter的实现将作业的输出写入到FileSystem中。

相关内容