在Hadoop中重写FileInputFormat类以处理二进制格式存储的整数


最近开始使用MapReduce,发现网上大部分例子都是对文本数据进行处理的,也就是说在读取输入数据时直接使用默认的TextInputFormat进行处理即可。对于文本数据处理,这个类还是能满足一部分应用场景。但是如果要处理以二进制形式结构化记录存储的文件时,这些类就不再适合了。

本文以一个简单的应用场景为例:对按照二进制格式存储的整数做频数统计。当然,也可以在此基础上实现排序之类的其他应用。实现该应用的主要难点就是如何处理输入数据。参考《权威指南·第三版》得知需要继承FileInputFormat这个类,并实现以下三个方法:

class MyInputFormat extends FileInputFormat<Type1, Type2> {
	/*
	 * 查询判断当前文件是否可以分块?"true"为可以分块,"false"表示不进行分块
	 */
	protected boolean isSplitable(Configuration conf, Path path) {
		
	}
	
	/*
	 * MapReduce的客户端调用此方法得到所有的分块,然后将分块发送给MapReduce服务端。
	 * 注意,分块中不包含实际的信息,而只是对实际信息的分块信息。具体的说,每个分块中
	 * 包含当前分块对应的文件路径,当前分块在该文件中起始位置,当前分块的长度以及对应的
	 * 实际数据所在的机器列表。在实现这个函数时,将这些信息填上即可。
	 * */
	public List<InputSplit> getSplits(Configuration conf) throws IOException {
	}

	/*
	 * 类RecordReader是用来创建传给map函数的Key-Value序列,传给此类的参数有两个:一个分块(split)和作业的配置信息(context).
	 * 在Mapper的run函数中可以看到MapReduce框架执行Map的逻辑:
	 * public void run(Context context) throws IOException, InterruptedException {
	 * 		setup(context);
	 * 		调用RecordReader方法的nextKeyValue,生成新的键值对。如果当前分块(Split)中已经处理完毕了,则nextKeyValue会返回false.退出run函数
	 *		while (context.nextKeyValue()) {	
	 *			map(context.getCurrentKey(), context.getCurrentValue(), context);
	 *		}
	 *		cleanup(context);
	 * }
	 **/
	public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
	}
}

在RecordReader函数中实现以下几个接口:

public class BinRecordReader extends RecordReader<LongWritable, IntWritable> {
	/*关闭文件流
	 * */
	public void close() {}

	/*
	 * 获取处理进度
	 **/
	public float getProgress() {}

	/*
	 * 获取当前的Key
	 * */
	public LongWritable getCurrentKey() throws IOException,
	InterruptedException {}

	/* 获取当前的Value
	 * */
	public IntWritable getCurrentValue() throws IOException,InterruptedException {}

	/*
	 * 进行初始化工作,打开文件流,根据分块信息设置起始位置和长度等等
	 * */
	public void initialize(InputSplit inputSplit, TaskAttemptContext context)
			throws IOException, InterruptedException {}

	/*生成下一个键值对
	 **/
	public boolean nextKeyValue() throws IOException, InterruptedException {
	}
}

以下为是三个文件的代码,首先是BinInputFormat.java的代码:

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.examples.BinRecordReader;

class BinInputFormat extends FileInputFormat<LongWritable, IntWritable> {
	
	private static final double SPLIT_SLOP=1.1;
	
	/*
	 * 查询判断当前文件是否可以分块?"true"为可以分块,"false"表示不进行分块
	 */
	protected boolean isSplitable(Configuration conf, Path path) {
		return true;
	}
	
	/*
	 * MapReduce的客户端调用此方法得到所有的分块,然后将分块发送给MapReduce服务端。
	 * 注意,分块中不包含实际的信息,而只是对实际信息的分块信息。具体的说,每个分块中
	 * 包含当前分块对应的文件路径,当前分块在该文件中起始位置,当前分块的长度以及对应的
	 * 实际数据所在的机器列表。在实现这个函数时,将这些信息填上即可。
	 * */
	public List<InputSplit> getSplits(Configuration conf) throws IOException {
		List<InputSplit> splits = new ArrayList<InputSplit>();
		long minSplitSize = conf.getLong("mapred.min.split.size",1);
		long maxSplitSize = conf.getLong("mapred.max.split.size", 1);
		long blockSize = conf.getLong("dfs.block.size",1);
		long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
		FileSystem fs = FileSystem.get(conf);
		String path = conf.get(INPUT_DIR);
		FileStatus[] files = fs.listStatus(new Path(path));

		for (int fileIndex = 0; fileIndex < files.length; fileIndex++) {
			FileStatus file = files[fileIndex];
			System.out.println("input file: " + file.getPath().toString());
			long length = file.getLen();
			FileSystem fsin = file.getPath().getFileSystem(conf);
		    BlockLocation[] blkLocations = fsin.getFileBlockLocations(file, 0, length);
		    if ((length != 0) && isSplitable(conf, file.getPath())) {
		        long bytesRemaining = length;
		        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
		          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
		          splits.add(new FileSplit(file.getPath(), length-bytesRemaining, splitSize,
		                                   blkLocations[blkIndex].getHosts()));
		          bytesRemaining -= splitSize;
		        }
		        
		        if (bytesRemaining != 0) {
		          splits.add(new FileSplit(file.getPath(), length-bytesRemaining, bytesRemaining,
		                     blkLocations[blkLocations.length-1].getHosts()));
		        }
		      } else if (length != 0) {
		        splits.add(new FileSplit(file.getPath(), 0, length, blkLocations[0].getHosts()));
		      } else {
		        //Create empty hosts array for zero length files
		        splits.add(new FileSplit(file.getPath(), 0, length, new String[0]));
		      }
		}
		return splits;
	}
	
	/*
	 * 类RecordReader是用来创建传给map函数的Key-Value序列,传给此类的参数有两个:一个分块(split)和作业的配置信息(context).
	 * 在Mapper的run函数中可以看到MapReduce框架执行Map的逻辑:
	 * public void run(Context context) throws IOException, InterruptedException {
	 * 		setup(context);
	 * 		调用RecordReader方法的nextKeyValue,生成新的键值对。如果当前分块(Split)中已经处理完毕了,则nextKeyValue会返回false.退出run函数
	 *		while (context.nextKeyValue()) {	
	 *			map(context.getCurrentKey(), context.getCurrentValue(), context);
	 *		}
	 *		cleanup(context);
	 * }
	 **/
	public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		BinRecordReader reader = new BinRecordReader();
		reader.initialize(split,context);
		return reader;
	}
}

以下为BinRecordReader.java的代码:

package org.apache.hadoop.examples;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordReader;

/**
 * Return a single record (filename, "") where the filename is taken from
 * the file split.
 */
public class BinRecordReader extends RecordReader<LongWritable, IntWritable> {
	private FSDataInputStream inputStream = null;
	private long start,end,pos;
	private Configuration conf = null;
	private FileSplit fileSplit = null;
	private LongWritable key = new LongWritable();
	private IntWritable value = new IntWritable();
	private boolean processed = false;
	public BinRecordReader() throws IOException {
	}

	/*关闭文件流
	 * */
	public void close() {
		try {
			if(inputStream != null)
				inputStream.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	/*
	 * 获取处理进度
	 **/
	public float getProgress() {
		return ((processed == true)? 1.0f : 0.0f);
	}

	/*
	 * 获取当前的Key
	 * */
	public LongWritable getCurrentKey() throws IOException,
	InterruptedException {
		// TODO Auto-generated method stub
		return key;
	}

	/* 获取当前的Value
	 * */
	public IntWritable getCurrentValue() throws IOException,InterruptedException {
		// TODO Auto-generated method stub
		return value;
	}

	/*
	 * 进行初始化工作,打开文件流,根据分块信息设置起始位置和长度等等
	 * */
	public void initialize(InputSplit inputSplit, TaskAttemptContext context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		fileSplit = (FileSplit)inputSplit;
		conf = context.getConfiguration();

		this.start = fileSplit.getStart();
		this.end = this.start + fileSplit.getLength();

		try{
			Path path = fileSplit.getPath();
			FileSystem fs = path.getFileSystem(conf);
			this.inputStream = fs.open(path);
			inputStream.seek(this.start);
			this.pos = this.start;
		}	catch(IOException e)	{
			e.printStackTrace();
		}
	}

	/*生成下一个键值对
	 **/
	public boolean nextKeyValue() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		if(this.pos < this.end) {
			key.set(this.pos);
			value.set(Integer.reverseBytes(inputStream.readInt()));
			this.pos = inputStream.getPos();
			return true;
		} else {
			processed = true;
			return false;
		}
	}
}

以下是主文件BinCount.java的代码

package org.apache.hadoop.examples;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.examples.BinInputFormat;

public class IntCount {
	public static class TokenizerMapper extends Mapper<LongWritable, IntWritable, Text, IntWritable>{

		private final static IntWritable one = new IntWritable(1); 
		private Text intNum = new Text();                             

		public void map(LongWritable key, IntWritable value, Context context
				) throws IOException, InterruptedException {
			intNum.set(Integer.toString(value.get()));                              
			context.write(intNum, one);                            
		}
	}

	public static class IntSumReducer 
	extends Reducer<Text,IntWritable,Text,IntWritable> { 
		private IntWritable result = new IntWritable();              

		public void reduce(Text key, Iterable<IntWritable> values, 
				Context context
				) throws IOException, InterruptedException {
			int sum = 0;                                                 
			for (IntWritable val : values) {
				sum += val.get();                                         

			}
			result.set(sum);                                                                              
			context.write(key, result);                                
		}
	}

	public static void main(String[] args) throws Exception {
		System.out.println("testing1");
		Configuration conf = new Configuration();
		String[] newArgs = new String[]{"hdfs://localhost:9000/read","hdfs://localhost:9000/data/wc_output21"};
		String[] otherArgs = new GenericOptionsParser(conf, newArgs).getRemainingArgs();
		if (otherArgs.length != 2) {
			System.err.println("Usage: wordcount <in> <out>");
			System.exit(2);
		}
		Job job = new Job(conf, "IntCount");                
		job.setJarByClass(IntCount.class);
		job.setMapperClass(TokenizerMapper.class);  
		job.setCombinerClass(IntSumReducer.class);  
		job.setReducerClass(IntSumReducer.class); 
		//设置自定义的输入类
		job.setInputFormatClass(BinInputFormat.class);
		job.setOutputKeyClass(Text.class);     
		job.setOutputValueClass(IntWritable.class);   
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
接着我们用一段C语言生成二进制格式存储的文件,C语言代码如下:

#include<stdio.h>
int main(){
	FILE * fp = fopen("tmpfile","wb");
	int i,j;
	for(i=0;i<10;i++) {
		for(j=0;j<10;j++)
			fwrite(&j,sizeof(int),1,fp);
	}
	fclose(fp);
	return 0;
}

将生成的文件拷贝到/read/下,接着启动IntCount这个MapReduce程序,打开运行结果:




相关内容