MapReduce的两表join一般操作,mapreduce表join
MapReduce的两表join一般操作,mapreduce表join
案例:(部门员工两表的join查询)
原始数据
员工表(emp):
empno ename job mgr hiredate sal comm deptno loc
7499 allen salesman 7698 1981-02-20 1600 300 30
7782 clark manager 7639 1981-06-09 2450 10
7654 martin salesman 7698 1981-03-22 1250 1400 30 boston
7900 james clerk 7698 1981-03-20 950 30
7788 scott analyst 7566 1981-09-01 3000 100 20
部门表(dep):
deptno dname loc
30 sales chicago
20 research dallas
10 accouting newyork
实现的功能类似于:select e.empno,e.ename,d.deptno,dname from emp e join dept d on e.deptno=d.deptno;
用join on后面的字段作为key
一对多关系
解析:
最后输出的结果包含两张表共四个属性(员工id,员工姓名,部门id(外键),部门名称)
我们可以将部门id作为map传值的key,将四个属性构造一个JavaBean作为map传值的value,其中自定义的JavaBean中除了包含四个属性外,还应有区分是员工表还是部门表的字段flag.
1.JavaBean
/* *实现join的两张表通用的一个bean,并且bean中加一个通用的标识flag,用于区分两张表 *实现writableCompare接口(由于数据要在网络上传输必须序列化,hadoop处理的时候需要分组和排序) */ public class Bean implements WritableComparable<Bean> { // 两个表共查询的属性 private String empno = " "; private String empname = " "; private String depno = " "; private String depname = " "; private int flag = 0; // 0:部门 1:员工 public Bean() { } public Bean(String empno, String empname, String depno, String depname, int flag) { super(); this.empno = empno; this.empname = empname; this.depno = depno; this.depname = depname; this.flag = flag; } public Bean(Bean bean) { this.empno = bean.getEmpno(); this.empname = bean.getEmpname(); this.depno = bean.getDepno(); this.depname = bean.getDepname(); this.flag = bean.getFlag(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub // 写数据 out.writeUTF(empno); out.writeUTF(empname); out.writeUTF(depno); out.writeUTF(depname); out.writeInt(flag); } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub // 读数据 this.empno = in.readUTF(); this.empname = in.readUTF(); this.depno = in.readUTF(); this.depname = in.readUTF(); this.flag = in.readInt(); } @Override public String toString() { return "empno=" + empno + ", empname=" + empname + ", depno=" + depno + ", depname=" + depname; } @Override public int compareTo(Bean arg0) { // TODO Auto-generated method stub return 0; } public String getEmpno() { return empno; } public void setEmpno(String empno) { this.empno = empno; } public String getEmpname() { return empname; } public void setEmpname(String empname) { this.empname = empname; } public String getDepno() { return depno; } public void setDepno(String depno) { this.depno = depno; } public String getDepname() { return depname; } public void setDepname(String depname) { this.depname = depname; } public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } }
2.Map类( map的输出的key为join时员工表的deptno,输出的value为对象Bean,reduce时 会将两个相同的key组成一起)
/* * map的输出的key为join时员工表的deptno,输出的value为对象Bean * reduce时 会将两个相同的key组成一起 */ /* * 将emp和dep定义相同的bean来处理 */ public class StaffDepMap extends Mapper<LongWritable, Text, IntWritable, Bean> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Bean>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] str = line.split("\t"); // 判断表的类型 if (str.length == 3) { // 部门数据 Bean dep = new Bean(); dep.setDepno(str[0]); dep.setDepname(str[1]); dep.setFlag(0); // 传递部门数据 context.write(new IntWritable(Integer.parseInt(str[0])), dep); } else { // 员工数据 Bean emp = new Bean(); emp.setEmpno(str[0]); emp.setEmpname(str[1]); emp.setDepno(str[7]); emp.setFlag(1); // 传递员工数据 context.write(new IntWritable(Integer.parseInt(str[7])), emp); } } }
3.Reduce类(输入的即是两个表的depno)
public class StaffDepRedu extends Reducer<IntWritable, Bean, NullWritable, Text> { @Override protected void reduce(IntWritable key, Iterable<Bean> values, Reducer<IntWritable, Bean, NullWritable, Text>.Context context) throws IOException, InterruptedException { Bean dep = null; List<Bean> emps = new ArrayList<Bean>(); for (Bean bean : values) { if (bean.getFlag() == 0) { // 部门数据 dep = new Bean(bean); // 重新构造对象 } else { // 员工数据 emps.add(new Bean(bean)); } } // 给员工数据list添加部门的dname for (Bean emp : emps) { emp.setDepname(dep.getDepname()); context.write(NullWritable.get(), new Text(emp.toString())); } } }
4.job类
public class StaffDepMain { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(StaffDepMain.class); job.setMapperClass(StaffDepMap.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Bean.class); job.setReducerClass(StaffDepRedu.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
评论暂时关闭