hadoop 左连接


hadoop2.2

场景描述:针对多个文件夹中的文件进行连接操作(单个文件夹内的文件格式相对统一),指定一个文件夹内的文件为主表(单个文件夹内的所有文件的主键唯一),所有在主表中的主键都要输出。

实现思路:在map中读入所有的文件,然后输出主键(默认每个文件每行的\t前面的数据)作为key,其他值+该文件的父目录作为value进行输出,在reduce中根据传入进入的主表父目录来对所有key进行判断,如果该key中的values(value的list)没有含有主表的父目录的标识,那么这个key就不是主表的key,也就意味着这个key可以不用输出,如果包含那么就要输出。

在reduce中输出含有key的values,需要解决下面的问题:1.保持values的相对顺序(来自每个目录的文件数据保持相对顺序);2.针对没有的数据进行填充(假如用null来填充)。

比如使用下面的数据:

/input/u/u1.txt:

user_id1	user_name_1	user_age_1
user_id2	user_name_2	user_age_2
user_id33	user_name_33	user_age_33
/input/u/u2.txt:

user_id11	user_name_11	user_age_11
user_id21	user_name_21	user_age_21
user_id31	user_name_31	user_age_31
/input/u/u3.txt:

user_id43	user_name_43	user_age_43
user_id42	user_name_42	user_age_42
user_id41	user_name_41	user_age_41
user_id45	user_name_45	user_age_45
/input/item/i1.txt:

user_id1	item_name1	item_column1
user_id2	item_name2	item_column2
user_id3	item_name3	item_column3
如果以/input/item作为主表,那么输出应该是:

user_id1	item_name1	item_column1	user_name_1	user_age_1	
user_id2	item_name2	item_column2	user_name_2	user_age_2	
user_id3	item_name3	item_column3	null	null
如果以/input/u作为主表,那么输出应该是:

user_id1	item_name1	item_column1	user_name_1	user_age_1	
user_id11	null	null	null	user_name_11	user_age_11	
user_id2	item_name2	item_column2	user_name_2	user_age_2	
user_id21	null	null	user_name_21	user_age_21	
user_id31	null	null	user_name_31	user_age_31	
user_id33	null	null	user_name_33	user_age_33	
user_id41	null	null	user_name_41	user_age_41	
user_id42	null	null	user_name_42	user_age_42	
user_id43	null	null	user_name_43	user_age_43	
user_id45	null	null	user_name_45	user_age_45
话不多说,上代码:

driver:

package org.fansy.hadoop.mr;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.fansy.hadoop.io.IoOperation;
import org.fansy.hadoop.mr.join.JoinMapper;
import org.fansy.hadoop.mr.join.JoinReducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 同一个目录的文件不能有相同的key
 * @author fansy
 *
 */
public class JoinDemoDriver01 {

	/**
	 * 在Map中读取文件的名字,然后在reduce中就可以按照一定的顺序进行输出,保持顺序一致性
	 * 默认第一列为输出key,分隔符暂时定为'\t'
	 * @param args
	 */
	private static Logger log = LoggerFactory.getLogger(JoinDemoDriver01.class);
	public static final String DIRANDCOLUMNS= "dirAndColumns";
	public static final String JOINOP="joinOp";
	public static final String KEYTABLE	="keyTable";
		
    public static void printUsage(){
    	System.out.println("JoinDemo01 [-r <reduces>] " +
                "[-joinOp <inner|outer|leftJoin>] " +
    			"[-keyInput <keyInput>]"+
                "[input]* <input> <output>");
    }
	public static void main(String[] args) throws Exception {
	  Configuration conf = new Configuration();
	  List<String> otherArgs = new ArrayList<String>();
	  int num_reduces= 1;
	  String op="leftJoin";
	  String keyTable = "";
	  for(int i=0; i < args.length; ++i) {
	      try {
	        if ("-r".equals(args[i])) {
	          num_reduces = Integer.parseInt(args[++i]);
	        }else if("-keyInput".equals(args[i])){
	        	keyTable=args[++i];
	        } else if ("-joinOp".equals(args[i])) {
	          op = args[++i];
	        } else {
	          otherArgs.add(args[i]);
	        }
	      }catch(Exception e){
	    	e.printStackTrace();  
	    	printUsage(); // exits
	      }
	  }
	  if (otherArgs.size() < 2) {
	      System.out.println("ERROR: Wrong number of parameters: ");
	      printUsage();
	    }
	  conf.set(JOINOP, op);
	  conf.set(KEYTABLE, getNameFromDir(keyTable));
	  Job job = new Job(conf,"join demo01");
      job.setJobName("join demo");
      job.setJarByClass(WordCount.class);
    //  job.setOutputFormatClass(SequenceFileOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
      
      job.setMapperClass(JoinMapper.class);
	//  job.setCombinerClass(JoinReducer.class);
      job.setReducerClass(JoinReducer.class);
      job.setNumReduceTasks(num_reduces);
      FileOutputFormat.setOutputPath(job, new Path(otherArgs.remove(otherArgs.size() - 1)));
      
      TreeMap<String ,Integer> treeMap= new TreeMap<String,Integer>();
      for (int i=0;i<otherArgs.size();i++) {
        treeMap.put(getNameFromDir(otherArgs.get(i)), IoOperation.getColumnsFromDir(otherArgs.get(i), "\t"));
        FileInputFormat.addInputPath(job, new Path(otherArgs.get(i)));
      }
      
      StringBuffer fileDirAndColumns = new StringBuffer();
      for(Map.Entry<String, Integer> a:treeMap.entrySet()){
			fileDirAndColumns.append(a.getKey()+":"+a.getValue()+",");
      }
      // set dir name and columns 
      // 按照dir排过序的
      job.getConfiguration().set(DIRANDCOLUMNS, fileDirAndColumns.substring(0, fileDirAndColumns.length()-1));
	  System.exit(job.waitForCompletion(true)?0:1);
    }
	
	private static String getNameFromDir(String dir){
		int index= dir.lastIndexOf("/");
		if (index==dir.length()-1){
			String temp= dir.substring(0,index);
			return temp.substring(temp.lastIndexOf("/")+1);
		}else{
			return dir.substring(index+1);
		}
	}

}

mapper:

package org.fansy.hadoop.mr.join;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{
	private static Logger log = LoggerFactory.getLogger(JoinMapper.class);
	private  String filename= null;
	private  Text keyText= new Text();
	private Text valueText= new Text();
	public void setup(Context cxt){
	   InputSplit input=cxt.getInputSplit();
	   filename=((FileSplit) input).getPath().getParent().getName();
	   log.info("file name:"+filename);
	}
	   
     public void map(LongWritable key, Text value, Context cxt) throws IOException,InterruptedException {
      String[] values= value.toString().split("\t");
      // 第一个\t前面为key,后面是value
      String valueLeft = value.toString().substring((values[0]+"\t").length());
      keyText.set(values[0]);
      valueText.set(valueLeft+"<-->"+filename);  // ***此处的分隔符 可以自己定制,需要和数据区分开来,即数据中不应该含有此分隔符
      cxt.write(keyText,valueText);
     }
}
reducer:

package org.fansy.hadoop.mr.join;

import java.io.IOException;
import java.util.TreeSet;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.fansy.hadoop.mr.JoinDemoDriver01;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JoinReducer extends Reducer<Text,Text,Text,Text>{
	/**
	 * 1.同一个目录不能含有相同的key
	 * 2.不管何种方式的关联,都要保持列数不变,同时列排序不变
	 * 3.inner方式,如果接受的values不够目录的个数,那么就说明有一个或多个文件中没有这个key,那么这条记录就不用输出
	 * 4.outer方式,如果接受的values不够目录的个数,那么就要使用某个不够文件个数的列数(总列数-1),来填充,以达到保持列不缺少的情况
	 */
	private static Logger log = LoggerFactory.getLogger(JoinReducer.class);
	private String op=null;
	private String keyTable =null;
	private String dirAndColumns=null;
	private String[] dirs=null;
	private int[] columns=null;
	public void setup(Context cxt){
		System.out.println("JoinReducer setup");
		//  关联方式
		op=cxt.getConfiguration().get(JoinDemoDriver01.JOINOP);
		// 主表,可能是空
		keyTable=cxt.getConfiguration().get(JoinDemoDriver01.KEYTABLE);
		// 目录排序以及每个目录的列数
		dirAndColumns= cxt.getConfiguration().get(JoinDemoDriver01.DIRANDCOLUMNS);
		String[] dirAndColumnsArr= dirAndColumns.split(",");
		// 目录
		dirs= new String[dirAndColumnsArr.length];
		// 列数
		columns=new int[dirAndColumnsArr.length];
		// dirAndColumns已经排过序了
		for( int i=0;i<dirAndColumnsArr.length;i++){
			String[] tempDirColumn=dirAndColumnsArr[i].split(":");
			dirs[i]=tempDirColumn[0];
			columns[i]=Integer.parseInt(tempDirColumn[1]);
		}
		// 测试
		/*StringBuffer buff= new StringBuffer();
		for(int i=0;i<dirs.length;i++){
			buff.append("dir["+i+"]:"+dirs[i]).append(",").append("columns["+i+"]:"+columns[i]).append("\n");
		}
		log.info(buff.toString());
		log.info("keyTable:"+keyTable);*/
	}
    public void reduce(Text key, Iterable<Text> values, Context cxt) throws IOException,InterruptedException {
        if("leftJoin".equals(op)){
        	leftJoin(key,values,cxt);
        }else if("inner".equals(op)){
        	innerJoin(key,values,cxt);
        }else if("outer".equals(op)){
        	outerJoin(key,values,cxt);
        }else{
        	log.info("wrong join method");
        }
    	
    }
    /** 
     * 暂时未实现
     * @param key
     * @param values
     * @param cxt
     */
	private void outerJoin(Text key,
			Iterable<Text> values,
			Context cxt) {
		
	}
	/**
	 * 暂时未实现
	 * @param key
	 * @param values
	 * @param cxt
	 */
	private void innerJoin(Text key,
			Iterable<Text> values,
			Context cxt) {
		
	}
	/**
	 * 如果含有主表的key则考虑输出,否则直接退出
	 * @param key
	 * @param values
	 * @param cxt
	 * @throws IOException
	 * @throws InterruptedException
	 */
	private void leftJoin(Text key, Iterable<Text> values,
			Context cxt) throws IOException, InterruptedException {
		// 这里暂时考试使用TreeSet,后期如果效率不行,可以考虑使用其他数据存储结构
		boolean hasKeyTableKey=false;
    	TreeSet<JoinData>  treeSet = new TreeSet<JoinData>();
    	JoinData joinData =null;
	    for (Text v:values) {
	    	/*log.info("key:"+key.toString()+"--"+"v.toString():"+v.toString());*/
	    	String[] vArr= v.toString().split("<-->"); // ***和mapper中的分隔符对应
	    	joinData = new JoinData(vArr[1],vArr[0]); // 注意mapper中的顺序
	    	treeSet.add(joinData);
	    	if(joinData.getDir().equals(keyTable)){
	    		hasKeyTableKey=true;
	    	}
       }
	    // 如果不包含主表的key,那么就直接退出,不输出任何记录
	    if(!hasKeyTableKey){
	    	return ;
	    }
	    // 拼凑不够个数的values(某个文件夹中的key没有则会导致这种情况)
	    StringBuffer buff = new StringBuffer();
	    /* int i=0;
	    for(JoinData v:treeSet){
	    	while(!v.getDir().equals(dirs[i])){
	    		buff.append(getNumsNull(columns[i],"\t")).append("\t"); // ***此处getNumsNull可以替换为其他字符
	    		i++;
	    	}
	    	buff.append(v.getValue().toString()).append("\t");
	    	i++;
	    }*/
	    // 这里设计不合理,后期改进
	    boolean flag=false;
	    for(int i=0;i<dirs.length;i++){
	    	flag=false;
	    	for(JoinData v:treeSet){
	    		if(dirs[i].equals(v.getDir())){
	    			buff.append(v.getValue()).append("\t");
	    			flag=true;
	    		}
	    	}
	    	if(!flag){
	    		buff.append(getNumsNull(columns[i]-1,"\t")).append("\t"); // ***此处getNumsNull可以替换为其他字符,减1是因为除去主键
	    	}
	    }
	    cxt.write(key, new Text(buff.toString()));
	}
	private String getNumsNull(int num,String splitter){
		StringBuffer buff = new StringBuffer();
		for(int i=0;i<num-1;i++){
			buff.append("null"+splitter);
		}
		buff.append("null");
		return buff.toString();
	}
}

目前,暂时只实现了含有主表的左连接,其他内连接和外连接暂时没有实现。

同时代码中还有些地方有待改进,而且在reduce中实现连接是比较慢的,这点在查阅了一些文章后也可以得到。在hadoop的example中实现的是在map端的连接,但是其需要输入文件是已经排过序的,这点可能需要在运行Join之前先做一步排序,而且其不支持左连接。其三种方式为:inner:所有文件中共有的key输出,是交集;outer:所有文件的key都输出,是并集;override:越靠近输出文件的文件(在输出参数中)其value值替换越不靠近的。

最后附上调用上面代码的命令:

./yarn jar yarn.jar org.fansy.hadoop.mr.JoinDemoDriver01 -joinOp leftJoin -keyInput /input/item /input/u /input/item /output/joindemo/008
其中,打包好的yarn.jar在http://download.csdn.net/detail/fansy1990/7008039可以下载。


分享,成长,快乐

转载请注明blog地址:http://blog.csdn.net/fansy1990


相关内容

    暂无相关文章