Hadoop二次排序的其他写法,hadoop二次排序


                                              Hadoop二次排序的其他写法

二次排序原理
在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。
本例子中使用的是TextInputFormat,他提供的RecordReader会将文本的字节偏移量作为key,这一行的文本作为value。
这就是自定义Map的输入是<LongWritable, Text>的原因。然后调用自定义Map的map方法,将一个个<LongWritable, Text>对输入给Map的map方法。
注意输出应该符合自定义Map中定义的输出<IntPair, IntWritable>。最终是生成一个List<IntPair, IntWritable>。
在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。
每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。 
如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。 
在第一个例子中,使用了IntPair实现的compareTo方法,而在下一个例子中,专门定义了key比较函数类。 


在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。
然后开始构造一个key对应的value迭代器。这时就要用到分组,使用job.setGroupingComparatorClass设置的分组函数类。
只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。
最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。 


核心总结:
1、map最后阶段进行partition分区,一般使用job.setPartitionerClass设置的类,如果没有自定义Key的hashCode()方法进行排序。
2、每个分区内部调用job.setSortComparatorClass设置的key的比较函数类进行排序,如果没有则使用Key的实现的compareTo方法。
3、当reduce接收到所有map传输过来的数据之后,调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序,如果没有则使用Key的实现的compareTo方法。
4、紧接着使用job.setGroupingComparatorClass设置的分组函数类,进行分组,同一个Key的value放在一个迭代器里面。


2、如何自定义Key
所有自定义的key应该实现接口WritableComparable,因为是可序列的并且可比较的。并重载方法 
//反序列化,从流中的二进制转换成IntPair 
public void readFields(DataInput in) throws IOException 

//序列化,将IntPair转化成使用流传送的二进制 
public void write(DataOutput out) 


//key的比较 
public int compareTo(IntPair o) 

另外新定义的类应该重写的两个方法 
//The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce) 
public int hashCode() 
public boolean equals(Object right)  


3、如何自定义分区函数类。这是key的第一次比较。 
public static class FirstPartitioner extends Partitioner<IntPair,IntWritable> 


在job中设置使用setPartitionerClasss  
4、如何自定义key比较函数类。这是key的第二次比较。这是一个比较器,需要继承WritableComparator。 
public static class KeyComparator extends WritableComparator 
必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2) 
另一种方法是 实现接口RawComparator。 
在job中设置使用setSortComparatorClass。  
5、如何自定义分组函数类。
在reduce阶段,构造一个key对应的value迭代器的时候,只要first相同就属于同一个组,放在一个value迭代器。这是一个比较器,需要继承WritableComparator。 
public static class GroupingComparator extends WritableComparator 
同key比较函数类,必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2) 
同key比较函数类,分组函数类另一种方法是实现接口RawComparator。 
在job中设置使用setGroupingComparatorClass。  


6、案例分析
数据准备:
假如我们现在的需求是先按 cookieId 排序,然后按 time 排序,以便按 session 切分日志


cookieId    time    url
2   12:12:34 2_hao123
3   09:10:34 3_baidu
1   15:02:41    1_google
3   22:11:34    3_sougou
1   19:10:34    1_baidu
2   15:02:41    2_google
1   12:12:34    1_hao123
3   23:10:34    3_soso
2   05:02:41    2_google
 
结果:
------------------------------------------------
1       12:12:34        1_hao123
1       15:02:41        1_google
1       19:10:34        1_baidu
------------------------------------------------
2       05:02:41        2_google
2       12:12:34        2_hao123
2       15:02:41        2_google
------------------------------------------------
3       09:10:34        3_baidu
3       22:11:34        3_sougou
3       23:10:34        3_soso
案例2:对两列数据进行排序,第一列相同的比较第二列
20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
70 55
70 56
70 57
70 58
1 2
3 4
5 6
7 82
203 21
50 512
50 522
50 53
530 54
40 511
20 53
20 522
60 56
60 57
740 58
63 61
730 54
71 55
71 56
73 57
74 58
12 211
31 42
50 62
7 8




7、代码实现
package SecondarySort;
 
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
public class SecondarySort
{
   //自己定义的key类应该实现WritableComparable接口
   public static class IntPair implements WritableComparable<IntPair>
   {
       String first;
       String second;
       /**
        * Set the left and right values.
        */
       public void set(String left, String right)
       {
           first = left;
           second = right;
       }
       public String getFirst()
       {
           return first;
       }
       public String getSecond()
       {
           return second;
       }
       //反序列化,从流中的二进制转换成IntPair
       public void readFields(DataInput in) throws IOException
       {
           first = in.readUTF();
           second = in.readUTF();
       }
       //序列化,将IntPair转化成使用流传送的二进制
       public void write(DataOutput out) throws IOException
       {
           out.writeUTF(first);
           out.writeUTF(second);
       }
       //重载 compareTo 方法,进行组合键 key 的比较,该过程是默认行为。
       //分组后的二次排序会隐式调用该方法。
       public int compareTo(IntPair o)
       {
           if (!first.equals(o.first) )
           {
               return first.compareTo(o.first);
           }
           else if (!second.equals(o.second))
           {
               return second.compareTo(o.second);
           }
           else
           {
               return 0;
           }
       }
 
       //新定义类应该重写的两个方法
       //The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)
       public int hashCode()
       {
           return first.hashCode() * 157 + second.hashCode();
       }
       public boolean equals(Object right)
       {
           if (right == null)
               return false;
           if (this == right)
               return true;
           if (right instanceof IntPair)
           {
               IntPair r = (IntPair) right;
               return r.first.equals(first) && r.second.equals(second) ;
           }
           else
           {
               return false;
           }
       }
   }
   /**
     * 分区函数类。根据first确定Partition。
     */
   public static class FirstPartitioner extends Partitioner<IntPair, Text>
   {
       public int getPartition(IntPair key, Text value,int numPartitions)
       {
           return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;
       }
   }
 
   /**
    * 分组函数类。只要first相同就属于同一个组。
    */
   /*//第一种方法,实现接口RawComparator
   public static class GroupingComparator implements RawComparator<IntPair> {
       public int compare(IntPair o1, IntPair o2) {
           int l = o1.getFirst();
           int r = o2.getFirst();
           return l == r ? 0 : (l < r ? -1 : 1);
       }
       //一个字节一个字节的比,直到找到一个不相同的字节,然后比这个字节的大小作为两个字节流的大小比较结果。
       public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
            return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8,
                    b2, s2, Integer.SIZE/8);
       }
   }*/
   //第二种方法,继承WritableComparator
   public static class GroupingComparator extends WritableComparator
   {
       protected GroupingComparator()
       {
           super(IntPair.class, true);
       }
       //Compare two WritableComparables.
       //  重载 compare:对组合键按第一个自然键排序分组
       public int compare(WritableComparable w1, WritableComparable w2)
       {
           IntPair ip1 = (IntPair) w1;
           IntPair ip2 = (IntPair) w2;
           String l = ip1.getFirst();
           String r = ip2.getFirst();
           return l.compareTo(r);
       }
   }
 
 
   // 自定义map
   public static class Map extends Mapper<LongWritable, Text, IntPair, Text>
   {
       private final IntPair keyPair = new IntPair();
       String[] lineArr = null;
       public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
       {
           String line = value.toString();
           lineArr = line.split("\t", -1);
           keyPair.set(lineArr[0], lineArr[1]);
           context.write(keyPair, value);
       }
   }
   // 自定义reduce
   //
   public static class Reduce extends Reducer<IntPair, Text, Text, Text>
   {
       private static final Text SEPARATOR = new Text("------------------------------------------------");
        
       public void reduce(IntPair key, Iterable<Text> values,Context context) throws IOException, InterruptedException
       {
           context.write(SEPARATOR, null);
           for (Text val : values)
           {
               context.write(null, val);
           }
       }
   }
 
   public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException
   {
       // 读取hadoop配置
       Configuration conf = new Configuration();
       // 实例化一道作业
       Job job = new Job(conf, "secondarysort");
       job.setJarByClass(SecondarySort.class);
       // Mapper类型
       job.setMapperClass(Map.class);
       // 不再需要Combiner类型,因为Combiner的输出类型<Text, IntWritable>对Reduce的输入类型<IntPair, IntWritable>不适用
       //job.setCombinerClass(Reduce.class);
       // Reducer类型
       job.setReducerClass(Reduce.class);
       // 分区函数
       job.setPartitionerClass(FirstPartitioner.class);
       // 分组函数
       job.setGroupingComparatorClass(GroupingComparator.class);
 
       // map 输出Key的类型
       job.setMapOutputKeyClass(IntPair.class);
       // map输出Value的类型
       job.setMapOutputValueClass(Text.class);
       // rduce输出Key的类型,是Text,因为使用的OutputFormatClass是TextOutputFormat
       job.setOutputKeyClass(Text.class);
       // rduce输出Value的类型
       job.setOutputValueClass(Text.class);
 
       // 将输入的数据集分割成小数据块splites,同时提供一个RecordReder的实现。
       job.setInputFormatClass(TextInputFormat.class);
       // 提供一个RecordWriter的实现,负责数据输出。
       job.setOutputFormatClass(TextOutputFormat.class);
 
       // 输入hdfs路径
       FileInputFormat.setInputPaths(job, new Path(args[0]));
       // 输出hdfs路径
       FileSystem.get(conf).delete(new Path(args[1]), true);
       FileOutputFormat.setOutputPath(job, new Path(args[1]));
       // 提交job
       System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}




相关内容