MapReduce小文件处理之CombineFileInputFormat实现,mapreducecombine



在MapReduce使用过程中,通常会遇到输入文件特别小(几百KB、几十MB),而Hadoop默认会为每个文件向yarn申请一个container启动map,container的启动关闭是非常耗时的。Hadoop提供了CombineFileInputFormat,一个抽象类,作用是将多个小文件合并到一个map中,我们只需实现三个类:

CompressedCombineFileInputFormat
CompressedCombineFileRecordReader
CompressedCombineFileWritable


maven

<dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.5.0-cdh5.2.1</version>
</dependency>


CompressedCombineFileInputFormat.java

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

import java.io.IOException;


public class CompressedCombineFileInputFormat
        extends CombineFileInputFormat<CompressedCombineFileWritable, Text> {

    public CompressedCombineFileInputFormat() {
        super();

    }

    public RecordReader<CompressedCombineFileWritable, Text>
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) throws IOException {
        return new
                CombineFileRecordReader<CompressedCombineFileWritable,
                        Text>((CombineFileSplit) split, context,
                CompressedCombineFileRecordReader.class);
    }

    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }

}


CompressedCombineFileRecordReader.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.util.LineReader;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class CompressedCombineFileRecordReader
        extends RecordReader<CompressedCombineFileWritable, Text> {

    private long startOffset;
    private long end;
    private long pos;
    private FileSystem fs;
    private Path path;
    private Path dPath;
    private CompressedCombineFileWritable key = new CompressedCombineFileWritable();
    private Text value;
    private long rlength;
    private FSDataInputStream fileIn;
    private LineReader reader;


    public CompressedCombineFileRecordReader(CombineFileSplit split,
                                             TaskAttemptContext context, Integer index) throws IOException {

        Configuration currentConf = context.getConfiguration();
        this.path = split.getPath(index);
        boolean isCompressed = findCodec(currentConf, path);
        if (isCompressed)
            codecWiseDecompress(context.getConfiguration());

        fs = this.path.getFileSystem(currentConf);

        this.startOffset = split.getOffset(index);

        if (isCompressed) {
            this.end = startOffset + rlength;
        } else {
            this.end = startOffset + split.getLength(index);
            dPath = path;
        }

        boolean skipFirstLine = false;

        fileIn = fs.open(dPath);

        if (isCompressed) fs.deleteOnExit(dPath);

        if (startOffset != 0) {
            skipFirstLine = true;
            --startOffset;
            fileIn.seek(startOffset);
        }
        reader = new LineReader(fileIn);
        if (skipFirstLine) {
            startOffset += reader.readLine(new Text(), 0,
                    (int) Math.min((long) Integer.MAX_VALUE, end - startOffset));
        }
        this.pos = startOffset;
    }

    public void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
    }

    public void close() throws IOException {
    }

    public float getProgress() throws IOException {
        if (startOffset == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (pos - startOffset) / (float)
                    (end - startOffset));
        }
    }

    public boolean nextKeyValue() throws IOException {
        if (key.fileName == null) {
            key = new CompressedCombineFileWritable();
            key.fileName = dPath.getName();
        }
        key.offset = pos;
        if (value == null) {
            value = new Text();
        }
        int newSize = 0;
        if (pos < end) {
            newSize = reader.readLine(value);
            pos += newSize;
        }
        if (newSize == 0) {
            key = null;
            value = null;
            return false;
        } else {
            return true;
        }
    }

    public CompressedCombineFileWritable getCurrentKey()
            throws IOException, InterruptedException {
        return key;
    }

    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }


    private void codecWiseDecompress(Configuration conf) throws IOException {

        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
        CompressionCodec codec = factory.getCodec(path);

        if (codec == null) {
            System.err.println("No Codec Found For " + path);
            System.exit(1);
        }

        String outputUri =
                CompressionCodecFactory.removeSuffix(path.toString(),
                        codec.getDefaultExtension());
        dPath = new Path(outputUri);

        InputStream in = null;
        OutputStream out = null;
        fs = this.path.getFileSystem(conf);

        try {
            in = codec.createInputStream(fs.open(path));
            out = fs.create(dPath);
            IOUtils.copyBytes(in, out, conf);
        } finally {
            IOUtils.closeStream(in);
            IOUtils.closeStream(out);
            rlength = fs.getFileStatus(dPath).getLen();
        }
    }

    private boolean findCodec(Configuration conf, Path p) {

        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
        CompressionCodec codec = factory.getCodec(path);

        if (codec == null)
            return false;
        else
            return true;

    }

}


CompressedCombineFileWritable.java

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class CompressedCombineFileWritable implements WritableComparable {

    public long offset;
    public String fileName;


    public CompressedCombineFileWritable() {
        super();
    }

    public CompressedCombineFileWritable(long offset, String fileName) {
        super();
        this.offset = offset;
        this.fileName = fileName;
    }

    public void readFields(DataInput in) throws IOException {
        this.offset = in.readLong();
        this.fileName = Text.readString(in);
    }

    public void write(DataOutput out) throws IOException {
        out.writeLong(offset);
        Text.writeString(out, fileName);
    }


    public int compareTo(Object o) {
        CompressedCombineFileWritable that = (CompressedCombineFileWritable) o;

        int f = this.fileName.compareTo(that.fileName);
        if (f == 0) {
            return (int) Math.signum((double) (this.offset - that.offset));
        }
        return f;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj instanceof CompressedCombineFileWritable)
            return this.compareTo(obj) == 0;
        return false;
    }

    @Override
    public int hashCode() {

        final int hashPrime = 47;
        int hash = 13;
        hash = hashPrime * hash + (this.fileName != null ? this.fileName.hashCode() : 0);
        hash = hashPrime * hash + (int) (this.offset ^ (this.offset >>> 16));

        return hash;
    }

    @Override
    public String toString() {
        return this.fileName + "-" + this.offset;
    }

}



MR测试类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.StringTokenizer;


public class CFWordCount extends Configured implements Tool {

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new Configuration(), new CFWordCount(), args));
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        conf.setLong(CombineFileInputFormat.SPLIT_MAXSIZE, 128 * 1024 * 1024);
        conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
        conf.setClass(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);
        Job job = new Job(conf);
        job.setJobName("CombineFile Demo");
        job.setJarByClass(CFWordCount.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        job.setInputFormatClass(CompressedCombineFileInputFormat.class);
        job.setMapperClass(TestMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(IntSumReducer.class);
        job.setNumReduceTasks(1);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.submit();
        job.waitForCompletion(true);

        return 0;
    }

    public static class TestMapper extends Mapper<CompressedCombineFileWritable, Text, Text, IntWritable> {
        private Text txt = new Text();
        private IntWritable count = new IntWritable(1);

        public void map(CompressedCombineFileWritable key, Text val, Context context) throws IOException, InterruptedException {
            StringTokenizer st = new StringTokenizer(val.toString());
            while (st.hasMoreTokens()) {
                txt.set(st.nextToken());
                context.write(txt, count);
            }
        }
    }
}



注意:使用CombineFileInputFormat过程中发现无论小文件积累到多大,甚至超过HDFS BlockSize后,仍然只有一个map split,查看 hadoop 的源码发现,使用CombineFileInputFormat时,如果没有显示指定CombineFileInputFormat.SPLIT_MAXSIZE,默认不会切分map split,解决方法如下:

conf.setLong(CombineFileInputFormat.SPLIT_MAXSIZE, 128 * 1024 * 1024);


相关内容