高阶MapReduce_4_reducer侧联结小案例,mapreduce应用案例


数据集文件:

customers:

1,Stephanie leung,555-555-555
2,Edward Kim,123-456-7890
3,Jose Madriz,281-330-8004
4,David Stork,408-555-0000

orders:

3,A,12.95,02-Jun-2008
1,B,88.25,20-May-2008
2,C,32.00,30-Nov,2007
3,D,25.02,22-Jan-2009

程序预期实现结果:

1	Stephanie leung,555-555-555,B,88.25,20-May-2008
2	Edward Kim,123-456-7890,C,32.00,30-Nov,2007
3	Jose Madriz,281-330-8004,D,25.02,22-Jan-2009
3	Jose Madriz,281-330-8004,A,12.95,02-Jun-2008

接下来,就来实现一下这个小程序:

在上一篇中说了,我们需要实现几个类,一个是TaggedMapOutput的子类,还有两个是DataJoinMapperBase的子类,一个是mapper,一个是reducer,下面是具体的实现:

TaggedWritable类继承自TaggedMapOutput:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

/*TaggedMapOutput是一个抽象数据类型,封装了标签与记录内容
 此处作为DataJoinMapperBase的输出值类型,需要实现Writable接口,所以要实现两个序列化方法
 自定义输入类型*/
public class TaggedWritable extends TaggedMapOutput {
		private Writable data;

		public TaggedWritable() {
			this.tag = new Text();
		}
 
		public TaggedWritable(Writable data) // 构造函数
		{
			//tag就是将数据集按key分区
			this.tag = new Text(); // tag可以通过setTag()方法进行设置
			this.data = data;
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			tag.readFields(in);
			String dataClz = in.readUTF();
			if (this.data == null || !this.data.getClass().getName().equals(dataClz)) {
					try {
						this.data = (Writable) ReflectionUtils.newInstance(Class.forName(dataClz), null);
					} catch (ClassNotFoundException e) {
						e.printStackTrace();
					}
			}
			data.readFields(in);
		}

		@Override
		public void write(DataOutput out) throws IOException {
			tag.write(out);
			out.writeUTF(this.data.getClass().getName());
			data.write(out);
		}

		@Override
		public Writable getData() {
			return data;
		}
}

JoinMapperl类继承自DataJoinMapperBase:

import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;

import com.demo.writables.TaggedWritable;

public class JoinMapper extends DataJoinMapperBase {

		// 这个在任务开始时调用,用于产生标签
		  // 此处就直接以文件名作为标签----标签的作用就是将数据集分区
		@Override
		protected Text generateInputTag(String inputFile) {
			System.out.println("inputFile = " + inputFile);
			return new Text(inputFile);
		}

		// 这里我们已经确定分割符为',',更普遍的,用户应能自己指定分割符和组键。
		  // 设置组键
		@Override
		protected Text generateGroupKey(TaggedMapOutput record) {
			String tag = ((Text) record.getTag()).toString();
			System.out.println("tag = " + tag);
			String line = ((Text) record.getData()).toString();
			String[] tokens = line.split(",");
			return new Text(tokens[0]);
		}

		// 返回一个任何带任何我们想要的Text标签的TaggedWritable
		@Override
		protected TaggedMapOutput generateTaggedMapOutput(Object value) {
			TaggedWritable retv = new TaggedWritable((Text) value);
			retv.setTag(this.inputTag); // 不要忘记设定当前键值的标签
			return retv;///
		}
}

JoinReducer集成自DataJoinReducerBase:

import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;

import com.demo.writables.TaggedWritable;

public class JoinReducer extends DataJoinReducerBase {

		// 两个参数数组大小一定相同,并且最多等于数据源个数
		@Override
		protected TaggedMapOutput combine(Object[] tags, Object[] values) {
			if (tags.length < 2)
					return null; // 这一步,实现内联结
			String joinedStr = "";
			for (int i = 0; i < values.length; i++) {
					if (i > 0)
						joinedStr += ","; // 以逗号作为原两个数据源记录链接的分割符
					TaggedWritable tw = (TaggedWritable) values[i];
					String line = ((Text) tw.getData()).toString();

					String[] tokens = line.split(",", 2); // 将一条记录划分两组,去掉第一组的组键名。
					joinedStr += tokens[1];
			}
			TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
			retv.setTag((Text) tags[0]); // 这只retv的组键,作为最终输出键。
			return retv;
		}
}

还有一个是MapReduce的程序入口:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.demo.mappers.JoinMapper;
import com.demo.reducers.JoinReducer;
import com.demo.writables.TaggedWritable;

public class DataJoinDriver extends Configured implements Tool {

		public int run(String[] args) throws Exception {
			Configuration conf = getConf();
			if (args.length != 2) {
					System.err.println("Usage:DataJoin <input path> <output path>");
					System.exit(-1);
			}
			Path in = new Path(args[0]);
			Path out = new Path(args[1]);
			JobConf job = new JobConf(conf, DataJoinDriver.class);
			job.setJobName("DataJoin");
			//FileSystem hdfs =FileSystem.get(conf);
			FileSystem hdfs = in.getFileSystem(conf);
			FileInputFormat.setInputPaths(job, in);
			if (hdfs.exists(new Path(args[1]))) {
					hdfs.delete(new Path(args[1]), true);
			}
			FileOutputFormat.setOutputPath(job, out);
			job.setMapperClass(JoinMapper.class);
			job.setReducerClass(JoinReducer.class);
			job.setInputFormat(TextInputFormat.class);
			job.setOutputFormat(TextOutputFormat.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(TaggedWritable.class);
			JobClient.runJob(job);
			return 0;
		}

		public static void main(String[] args) throws Exception { 
			 args = new String[]{"hdfs://localhost:9000/input/different datasource data/*.txt","hdfs://localhost:9000/output/secondOutput1"};
			int res = ToolRunner.run(new Configuration(), new DataJoinDriver(), args);
			System.exit(res);
		}

}

程序运行结果:



版权声明:本文为博主原创文章,未经博主允许不得转载。

相关内容