MapReduce的一对多连接操作


问题描述:
一个trade table表
product1"trade1
product2"trade2
product3"trade3
一个pay table表
product1"pay1
product2"pay2
product2"pay3
product1"pay4
product3"pay5
product3"pay6

建立两个表之间的连接,该两表是一对多关系的
如下:
trade1pay1
trade1pay4
trade2pay2
...

思路:

为了将两个表整合到一起,由于有相同的第一列,且第一个表与第二个表是一对多关系的。
这里依然采用分组,以及组内排序,只要保证一方最先到达reduce端,则就可以进行迭代处理了。
为了保证第一个表先到达reduce端,可以为定义一个组合键,包含两个值,第一个值为product,第二个值为0或者1,来分别代表第一个表和第二个表,只要按照组内升序排列即可。

具体代码:

自定义组合键策略

package whut.onetomany;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.Hadoop.io.WritableComparable;
public class TextIntPair implements WritableComparable{
    //product1 0/1
    private String firstKey;//product1
    private int secondKey;//0,1;0代表是trade表,1代表是pay表
    //只需要保证trade表在pay表前面就行,则只需要对组顺序排列
                                                           
    public String getFirstKey() {
        return firstKey;
    }
    public void setFirstKey(String firstKey) {
        this.firstKey = firstKey;
    }
    public int getSecondKey() {
        return secondKey;
    }
    public void setSecondKey(int secondKey) {
        this.secondKey = secondKey;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(firstKey);
        out.writeInt(secondKey);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        firstKey=in.readUTF();
        secondKey=in.readInt();
    }
                                                           
    @Override
    public int compareTo(Object o) {
        // TODO Auto-generated method stub
        TextIntPair tip=(TextIntPair)o;
        return this.getFirstKey().compareTo(tip.getFirstKey());
    }
}

分组策略

package whut.onetomany;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class TextComparator extends WritableComparator{
    protected TextComparator() {
        super(TextIntPair.class,true);//注册比较器
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // TODO Auto-generated method stub
        TextIntPair tip1=(TextIntPair)a;
        TextIntPair tip2=(TextIntPair)b;
        return tip1.getFirstKey().compareTo(tip2.getFirstKey());
    }
}

组内排序策略:目的是保证第一个表比第二个表先到达
package whut.onetomany;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//分组内部进行排序,按照第二个字段进行排序
public class TextIntComparator extends WritableComparator {
    public TextIntComparator()
    {
        super(TextIntPair.class,true);
    }
    //这里可以进行排序的方式管理
    //必须保证是同一个分组的
    //a与b进行比较
    //如果a在前b在后,则会产生升序
    //如果a在后b在前,则会产生降序
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // TODO Auto-generated method stub
        TextIntPair ti1=(TextIntPair)a;
        TextIntPair ti2=(TextIntPair)b;
        //首先要保证是同一个组内,同一个组的标识就是第一个字段相同
        if(!ti1.getFirstKey().equals(ti2.getFirstKey()))
          return ti1.getFirstKey().compareTo(ti2.getFirstKey());
        else
          return ti1.getSecondKey()-ti2.getSecondKey();//0,-1,1
    }
                                     
}

分区策略:

package whut.onetomany;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PartitionByText extends Partitioner<TextIntPair, Text> {
    @Override
    public int getPartition(TextIntPair key, Text value, int numPartitions) {
        // TODO Auto-generated method stub
        return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
    }
}

Hadoop权威指南(中文版-带目录索引)PDF
Hadoop权威指南(中文第2版)PDF
采用MapReduce与Hadoop进行大数据分析 

  • 1
  • 2
  • 下一页

相关内容