hadoop 多数据源连接之DataJoin,hadoopdatajoin


一个MapReduce任务很可能访问和处理两个甚至多个数据集,在关系型数据库中,这将是两个或者多个表的连接,但是Hadoop系统没有关系型数据库中那样强大的连接处理功能,因此处理复杂一些。一般来讲,hadoop可以采用这几种数据连接方式:

        1采用DataJoin类库实现Reduce端连接的方法

        2 用全局文件复制实现Map端连接方法

        3 带Map端过滤的Reduce端连接方法

   Hadoop的Mapreduce框架提供了一种较为通用  的多数据源连接方法,该方法用DataJoin类库为程序员提供了完成数据连接所需的编程框架和接口,其处理方法如下:

         为了完成不用数据源的连接操作,我们必须给每个数据源制定一个标签(tag),用来区分数据,就像关系型数据库中表名一样,这里我们需要实现 Text generateInputTag(String inputFile)方法;

         另外,为了进行连接操作,我们必须知道连接的主键是什么,类似于关系型数据库中的key,因此我们需要指定groupKey,这里我们需要实现 Text generateGroupKey(TaggedMapOutput aRecord)

         然后在Map端我们需要把原始数据包装成为一个带标签的数据记录,方便shuffle和Reduce端执行笛卡尔积,所以我们需要实现 TaggedMapOutput generateTaggedMapOutput(Object value);


总结一下Map处理过程:

   Datajoin类库首先提哦功能管理一个抽象基类DataJoinMapperBase,该基类实现了map()方法,帮助程序员对每个数据源下的记录生成一个代标签的数据记录对象。Map端处理过程中,需要指定标签tag和Groupkey,然后包装成为带标签的数据记录对象,在shuffle过程中,这些GroupKey相同的记录被分到同一个Reduce节点上。

Reduce处理过程:

      Reduce节点收到这些带标签的数据记录后,Reduce过程将这些带不同的数据源标签的记录执行笛卡尔积,自动生成所有不同的叉积组合,由程序员实现一个combine()方法,根据应用程序需求将这些具有相同的Groupkey的数据记录进行适当的合并处理,以此完成类似于关系型数据库中不同实体数据记录之间的连接。

     在Reduce阶段我们需要继承DataJoinReduceBase,该基类实现了reduce()方法,我们只是需要实现combine()方法即可,另外我们还是需要继承TaggedMapOutput类,它描述了一个标签化的数据记录,实现了getTag(),setTag()方法,作为Mapper的key_value输出value类型,由于需要I/O,我们需要继承并且实现Writable接口,并且实现getData()方法用以读取记录数据

   下面是数据源:

        user.txt文件:

1,张三,135xxxxxxxx
2,李四,136xxxxxxxx
3,王五,137xxxxxxxx
4,赵六,138xxxxxxxx

order.txt文件:

3,A,13,2013-02-12
1,B,23,2013-02-14
2,C,16,2013-02-17
3,D,25,2013-03-12

这其中需要注意很多小细节,因为没有要求程序员实现Map和reduce方法,所以我们会很容易忽略很多东西,需要注意的东西我在下面一一注释了:

我们必须使用Jobconf 来声明一个job,同时使用JobClient来run job,另外我们在继承TaggedMapOutput的时候默认的无参构造方法中需要初始化data

package joinTest;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class DataJoin{

	public static class MyTaggedMapOutput extends TaggedMapOutput{
		private Writable data;
	    
		public MyTaggedMapOutput(){
			//一定要new一下,不然反序列化时会报空指针异常
			this.data=new Text();
		}
		
		public MyTaggedMapOutput(Writable data){
			this.tag=new Text("");
			this.data=data;
		}

		public void readFields(DataInput in) throws IOException {
			this.tag.readFields(in);
			this.data.readFields(in);
		}

		public void write(DataOutput output) throws IOException {
			this.tag.write(output);
			//this.tag.write(output);     //大问题,粗心写成了this.tag.write(output); 结果一直报错
			this.data.write(output);
		}

		@Override
		public Writable getData() {
			return data;
		}
	}
	
	
	public static class DataJoinMapper extends DataJoinMapperBase{
		
		protected Text generateInputTag(String inputFile){
				//String datasource=inputFile.substring(inputFile.lastIndexOf("/")+1).split("\\.")[0];
			    String datasource = inputFile.split("-")[0];
				System.out.println("datasource:"+datasource);
				return new Text(datasource);
		}
		
		protected TaggedMapOutput generateTaggedMapOutput(Object value){
			TaggedMapOutput tm=new MyTaggedMapOutput((Text)value);
			tm.setTag(this.inputTag);
			return tm;
		}

		@Override
		protected Text generateGroupKey(TaggedMapOutput aRecord) {
			String line=aRecord.getData().toString();
			//String groupkey=line.split("\\s")[0];
			String groupkey=line.split(",")[0];
			return new Text(groupkey);
		}
	}
	
	public static class DataJoinReducer extends DataJoinReducerBase{

		@Override
		protected TaggedMapOutput combine(Object[] tags, Object[] values) {
			if(tags.length<2){
				return null;
			}
			String output = "";
		   /* for(int i=0;i<tags.length;i++){
		    	TaggedMapOutput tat=(MyTaggedMapOutput)values[i];System.out.println("tags:"+tags[i]+"    values:"+tat.getData().toString());
		    	if(i==0){
		    	     output=tat.getData().toString();//System.out.println("i==0  output:"+output);
		    	}else{
		    		output+="\t";
		    		String [] s=tat.getData().toString().split("\\s",2);
		    		System.out.println("s.length:"+s.length);
		    		output+=s[0];
		    	}*/
			for(int j=0;j<tags.length;j++){
				//TaggedMapOutput taOutput=(TaggedMapOutput)tags[j];
				TaggedMapOutput taggedMapOutput=(TaggedMapOutput)values[j];
				System.out.println("tag:"+taggedMapOutput.getTag()+"  value:"+taggedMapOutput.getData().toString());
			}
			     for(int i=0;i<values.length;i++){
			    	 TaggedMapOutput tat=(MyTaggedMapOutput)values[i];
			    	 String  recordLine=((Text)tat.getData()).toString();
			    	 String [] tokens=recordLine.split(",",2);System.out.println("data:"+recordLine);
			    	 if(i>0)
			    		 output+=",";
			    	output+=tokens[1];
		    }
		    TaggedMapOutput tag=new MyTaggedMapOutput(new Text(output));
		    tag.setTag((Text)tags[0]);
		    return tag;
		}
	}
	
	/*
	 * 这里一定要注意,FileInputFormat和FileOutputFormat一定要是org.apache.hadoop.mapred下面的包
	 */
	public static int run(String args[]) throws IOException{
		       Configuration conf=new Configuration();
				//Configuration conf=getConf();
				JobConf job=new JobConf(conf, DataJoin.class);
				//Job job=new Job(conf,"DataJoin");
				job.setJobName("DataJoin");
			//	job.setJarByClass(DataJoin.class);
				
				job.setMapperClass(DataJoinMapper.class);
				job.setReducerClass(DataJoinReducer.class);
				job.setInputFormat(TextInputFormat.class);
				job.setOutputFormat(TextOutputFormat.class);
				job.setMapOutputKeyClass(Text.class);
				job.setMapOutputValueClass(MyTaggedMapOutput.class);
				job.setOutputKeyClass(Text.class);
				job.setOutputValueClass(Text.class);
				
				//job.set("mapred.textoutputformat.separator", "\t");
				job.set("mapred.textoutputformat.separator", ",");
				//FileInputFormat.addInputPath(job, new Path("/home/hadoop/test/mapReduce/DataJoinTest2"));
				//FileInputFormat.addInputPath(job, new Path("hdfs://192.168.0.1:9000/user/hadoop/DataJoinTest2"));
				//FileInputFormat.addInputPaths(job,  "/home/hadoop/test/DataJoinTest/province.txt");
				//MultipleInputs.addInputPath(job, new Path("/home/hadoop/test/DataJoinTest/province.txt"), TextInputFormat.class);
				FileInputFormat.addInputPaths(job, args[0]);
				//FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.0.1:9000/user/hadoop/DataJoinTest2_Out"));
				//FileOutputFormat.setOutputPath(job, new Path("/home/hadoop/test/mapReduce/DataJoinTest2_result"));
				FileOutputFormat.setOutputPath(job, new Path(args[1]));  
				JobClient.runJob(job);
				return 0;
	}
	public static void main(String [] args) throws Exception{
		String[] arg = { "/home/hadoop/test/mapReduce/DataJoinTest2",
		"/home/hadoop/test/mapReduce/DataJoinTest2_result" };
		int res=run(arg);
		System.exit(res);
		
	}
	
}





相关内容