MapReduce处理表的自连接,mapreduce处理表


原始数据

/*
 * 原始数据
 * 子    父
 * Tom Lucy
 Tom Jack
 Jone Locy
 Jone Jack
 Lucy Mary
 Lucy Ben
 Jack Alice
 Jack Jesse
 TerryAlice
 TerryJesse
 PhilipAlma
 Mark Terry
 Mark Alma
 */

要求通过子父关系找出子-祖母关系

/*
 * 设计方法:连接的左表的parent列(key),右表的child列(key),且左右表属于同一张表
 * 所以在map阶段将读入数据分割成child,parent后,会将parent设置成key,child设置成value输出,并作为左表
 * 再将同一对child和parent中的child作为key,parent作为value进行输出,作为右表
 * 为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的string最开始出加上字符1表示左表,加上2表示右表。
 * 然后在shuffle过程中完成连接,reduce接收到连接的结果,其中每个key的value-list就包含了“grandchild-grandparent”关系。
 * 取出每个key的value-list进行解析,将左表中的child放入一个数组(就一个key),右表中的grandparent放入一个数组,然后对两个数组求笛卡儿积就ko了
 *
 */

1.Map类

package test.mr.selfrelated;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*
 * 表的自连结(grandchild-grandparend表)
 */
/*
 * 原始数据
 * 子    父
 * Tom	Lucy
 Tom	Jack
 Jone	Locy
 Jone	Jack
 Lucy	Mary
 Lucy	Ben
 Jack	Alice
 Jack	Jesse
 TerryAlice
 TerryJesse
 PhilipAlma
 Mark	Terry
 Mark	Alma
 */
/*
 * 设计方法:连接的左表的parent列(key),右表的child列(key),且左右表属于同一张表
 * 所以在map阶段将读入数据分割成child,parent后,会将parent设置成key,child设置成value输出,并作为左表
 * 再将同一对child和parent中的child作为key,parent作为value进行输出,作为右表
 * 为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的string最开始出加上字符1表示左表,加上2表示右表。
 * 然后在shuffle过程中完成连接,reduce接收到连接的结果,其中每个key的value-list就包含了“grandchild-grandparent”关系。
 * 取出每个key的value-list进行解析,将左表中的child放入一个数组(就一个key),右表中的grandparent放入一个数组,然后对两个数组求笛卡儿积就ko了
 * 
 */
public class selfRelatedMap extends Mapper<LongWritable, Text, Text, Text> {
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		if (line.trim().length() > 0) {
			String str[] = line.split("\t");
			if (str.length == 2) {
				context.write(new Text(str[1]), new Text("1_" + str[0])); // 左表
				context.write(new Text(str[0]), new Text("2_" + str[1])); // 右表
			}
		}

	}
}


 

2.Reduce类

package test.mr.selfrelated;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class selfRelatedRedu extends Reducer<Text, Text, Text, Text> {
	@Override
	protected void reduce(Text key, Iterable<Text> values,
			Reducer<Text, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		List<String> grandsons = new ArrayList<String>();
		List<String> grandparents = new ArrayList<String>();
		for (Text t : values) {
			// 进行value字符串切分
			String str[] = t.toString().split("_");
			if ("1".equals(str[0])) {
				// 左表 //作为孙
				grandsons.add(str[1]);
			} else if ("2".equals(str[0])) {
				// 右表 //作为祖母辈
				grandparents.add(str[1]);
			}
		}
		// 做笛卡尔积
		for (String gc : grandsons) {
			for (String gp : grandparents) {
				context.write(new Text(gc), new Text(gp));
			}
		}
	}
}


 

3.job类

package test.mr.selfrelated;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class selfRelatedMain {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf);
		job.setJarByClass(selfRelatedMain.class);

		job.setMapperClass(selfRelatedMap.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setReducerClass(selfRelatedRedu.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.waitForCompletion(true);
	}
}


 

相关内容