InputFormat&OutputFormat,hadoopinputformat


本文的主要目的是从源码级别讲解Hadoop中InputFormat和OutputFormat部分,首先简介InputFormat和OutputFormat,然后介绍两个重要的组件,RecordWriter和RecordReader,再以FileInputFormat和FileOutputFormat为例,介绍一组InputFormat和OutputFormat的实现细节,最后以SqoopInputFormat和SqoopOutputFormat为例,体会一下InputFormat和OutputFormat的灵活应用。

InputFormat和OutputFormat简介

InputFormat规定了MR框架,如何解析输入文件,比如说TextOutputFormat内部实现了一个LineRecordReader,从LineRecordReader#nextKeyValue方法可以看出,它的key是将要读的这一行数据的起始字节位置,value是这一行的内容。

OutputFormat规定了MR框架,最后输出的文件的内容格式,比如说SequenceFileOutputFormat,其getRecordWriter方法返回一个RecordWriter的内部类,具体的操作在SequenceFile类里write相关的方法中。SequenceFile的具体实现只从类来看比价复杂,其一条记录可以简单理解成如下的格式[record length][key length][key][value]。

RecordReader和RecordWriter简介

上边提到了RecordReader和RecordWriter,这里简单介绍一下,RecordWriter和RecordReader在MR框架每一个输入输出的地方用到,读写操作调用的都是RecordReader和RecordWriter提供的接口,比如说我们在Mapper#map和Reducer#reduce的结束通常都会写这样一行代码context.write(key,value),这行代码实际调用的就是RecordWriter#write方法,RecordReader是MR框架调用的。

一个InputFormat对应一个RecordReader,一个OutputFormat对应一个RecordWriter。这样在后边的DIY的时候,也知道应该写些什么了。

FileInputFormat和FileOutputFormat

这里以FileInputFormat,FileOutputFormat和其对应的子类TextInputFormat和TextOutputFormat为例,分析一套InputFormat和OutputFormat的具体实现。

1.      InputFormat

InputFormat抽象类中有两个方法:

InputFormat#getSplits(JobContext context)

对输入文件进行逻辑上的分片

InputFormat#createRecordReader(InputSplitsplit,TaskAttemptContext context)

返回一个RecordReader对象,该对象可将输入的InputSplit解析成若干个key/value对。

2.      FileInputFormat

所有需要处理文件的InputFormat的基类,从其实现可以看出,FileInputFormat主要是实现InputFormat#getSplit(JobContext context)方法,由于每个子类对应不同的输入格式,所以解析InputSplit的方法InputFormat#createRecordReader(InputSplit split,TaskAttemptContextcontext)由各个子类自己实现。

这里我们先分析FileInputFormat#getSplit(JobContext context)方法。

 

/**
  * Generate the list of files and make them into FileSplits.
  * @param job the job context
  * @throws IOException
  */
 public List<InputSplit> getSplits(JobContext job) throwsIOException {
   long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
   long maxSize = getMaxSplitSize(job);
 
// generatesplits
   List<InputSplit> splits = new ArrayList<InputSplit>();
   List<FileStatus> files = listStatus(job);//获取输入的所有文件
   for (FileStatus file: files) {//一次循环处理一个文件
     Path path = file.getPath();
     long length = file.getLen();
     if (length != 0) {//文件不是空的
        FileSystem fs =path.getFileSystem(job.getConfiguration());
        BlockLocation[] blkLocations =fs.getFileBlockLocations(file, 0, length);//获取一个文件的所有的Block的带位置的信息
        if (isSplitable(job, path)) {
          long blockSize = file.getBlockSize();
          long splitSize =computeSplitSize(blockSize, minSize, maxSize);//计算一个FileSplit的大小,计算过程如下,Math.max(minSize,Math.min(maxSize, blockSize))
 
         long bytesRemaining =length;
                     //这个while循环就是根据上边的准备的信息,不停的读文件,产生FileSplit,一直到文件末尾。
          while (((double)bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex =getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path,length-bytesRemaining, splitSize,
                                    blkLocations[blkIndex].getHosts()));
            bytesRemaining -= splitSize;
          }
                     //处理最后不足splitSize大小的数据
          if (bytesRemaining != 0) {
           int blkIndex =getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path,length-bytesRemaining, bytesRemaining,
                      blkLocations[blkIndex].getHosts()));
          }
        } else { // not splitable不能分割的话,文件只有一块blkLocations[0]就是这一块
          splits.add(makeSplit(path, 0, length,blkLocations[0].getHosts()));
        }
     } else {
        //Create empty hosts array for zerolength files
        splits.add(makeSplit(path, 0, length,new String[0]));
     }
   }
   // Save the number of input files for metrics/loadgen
   job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
   LOG.debug("Total # of splits: " + splits.size());
   return splits;
  }

3.      TextInputFormat

TextInputFormat直接使用了FileInputFormat的getSplits方法,自己实现了createRecordReader方法。

TextInputFormat#createRecordReader方法逻辑比较简单,最后返回一个LineRecordReader对象,下面主要看这个方法LineRecordReader#nextKeyValue。

public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = newLongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    // We always read oneextra line, which lies outside the upper
    // split limit i.e. (end -1)
while (getFilePosition() <= end) {
//这一句就是读数据的地方了
      newSize =in.readLine(value, maxLineLength,
         Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
        break;
      }
      pos += newSize;
      if (newSize <maxLineLength) {
        break;
      }
 
      // line too long. tryagain
      LOG.info("Skippedline of size " + newSize + " at pos " +
               (pos -newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }

4.      OutputFormat

OutputFormat# getRecordWriter (TaskAttemptContext context)

返回一个RecordWriter对象

OutputFormat# checkOutputSpecs (JobContext context)

判断输出文件存不存在,写过MR程序的人应该很熟悉了,如果输出路径已经存在话,会抛出一个Output directory " + outDir + " already exists"错误,就是这个方法抛出的。

OutputFormat# getOutputCommitter (TaskAttemptContext context)

MR运行过程中,会产生很多中间文件,比如Mapper的输出,推测式执行Task时产生的文件等等,这个方法负责在任务执行完成后,处理这些中间文件。顺便说下,OutputCommitter对象里的方法都是回调方法,MR自动调用。

5.      FileOutputFormat

FileOutputFormat对上边提到的3个方法中的后两个提供了通用的实现,OutputFormat# getRecordWriter (TaskAttemptContext context)方法需要子类自己实现。

public void checkOutputSpecs(JobContext job
                              ) throws FileAlreadyExistsException, IOException{
    // Ensure that the outputdirectory is set and not already there
    Path outDir =getOutputPath(job);
    if (outDir == null) {
      throw newInvalidJobConfException("Output directory not set.");
    }
 
    // get delegation tokenfor outDir's file system
   TokenCache.obtainTokensForNamenodes(job.getCredentials(),
        new Path[] { outDir },job.getConfiguration());
 
    if(outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
      throw newFileAlreadyExistsException("Output directory " + outDir +
                                          " already exists");
    }
  }
 
public synchronized
     OutputCommittergetOutputCommitter(TaskAttemptContext context
                                        )throws IOException {
    if (committer == null) {
      Path output =getOutputPath(context);
      committer = newFileOutputCommitter(output, context);
    }
    return committer;
  }

6.      TextOutputFormat

TextOutputFormat直接使用了FileInputFormat的getSplits方法,自己实现了createRecordReader方法。

TextOutputFormat#createRecordWriter方法逻辑比较简单,最后返回一个LineRecordWriter对象,下面主要看这个方法LineRecordReader#write(K key,V Value)。

public synchronized void write(K key, V value)
      throws IOException {
 
      boolean nullKey = key ==null || key instanceof NullWritable;
      boolean nullValue =value == null || value instanceof NullWritable;
      if (nullKey &&nullValue) {
        return;
      }
          //这个地方是最主要的逻辑
      if (!nullKey) {
        writeObject(key);//写出key
      }
      if (!(nullKey ||nullValue)) {
       out.write(keyValueSeparator);//写出key/value的分隔符
      }
      if (!nullValue) {
        writeObject(value);//写出value
      }
      out.write(newline);//写出行结束符
}

SqoopInputFormat和SqoopOutputFormat

下面的分析基于Sqoop1.99.3

在Sqoop中,InputFormat和OutputFormat的子类有3个,分别是:

public class SqoopInputFormat extendsInputFormat<SqoopSplit, NullWritable>
public class SqoopFileOutputFormat extendsFileOutputFormat<Data, NullWritable>
public class SqoopNullOutputFormat extendsOutputFormat<Data, NullWritable>

下面一个个分析:

1.      SqoopInputFormat

从泛型传入的类型可以看出,SqoopInputFormat的key是SqoopSplit,value是NullWritable。直接把整个InputSplit作为key,传到Mapper中,SqoopInputFormat不对InputSplit做任何的解析操作。

2.      SqoopFileOutputFormat

从泛型传入的类型来看,跟SqoopInputFormat类似,只是用KEY部分保存信息。SqoopFileOutputFormat只重写了父类FileOutputFormat的getRecordWriter方法和getOutputCommitter方法,checkOutputSpec方法使用的父类的。

getRecordWriter方法调用SqoopOutputFormatLoadExecutor#getRecordWriter,这个方法在返回一个SqoopRecordWriter的同时,开启一个消费者线程,SqoopRecordWriter是生产者线程。[后面接着分析Sqoop源码时细说]

3.      SqoopNullOutputFormat

SqoopNullOutputFormat将OutputFormat的3个方法都重写了。SqoopNullOutputFormat#getRecordWriter方法同样是调用SqoopOutputFormatLoadExecutor#getRecordWriter。


jobsetInputFormatClass(SequenceFileAsBinaryInputFormatclass);为何setInputFormatClass总是出错?

1. Good Job means
 

为hive写inputFormat,怎把这个inputFormat类加到hive中?

将你的inputformat类打成jar包,如MyInputFormat.jar
将MyInputFormat.jar放到 hive/lib里,然后就可以建表了
假设你的inputFormat类路径是com.hive.myinput
则建表语句为:create table tbname(name stirng,id int, ...) stored as INPUTFORMAT 'com.hive.myinput' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

HiveIgnoreKeyTextOutputFormat是系统自带的outputformat类,你也可以自定义

由于hive是基于hadoop集群运行的,所以hadoop/lib里面也必须放入MyInputFormat.jar,还需要重启mapreduce服务才行,否则hive运行mapreduce会失败
参考资料:经验
 

相关内容