hadoop复合键排序使用方法,hadoop复合使用方法
hadoop复合键排序使用方法,hadoop复合使用方法
在hadoop中处理复杂业务时,需要用到复合键,复合不同于单纯的继承Writable接口,而是继承了WritableComparable<T>接口,而实际上,WritableComparable<T>接口继承了Writable和Comparable<T>接口,如果只需要使用某一个类作为传值对象而不是作为key,继承Writable接口即可。
上源码:
public interface WritableComparable<T> extends Writable, Comparable<T> { }
public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataInput in) throws IOException; }
public interface Comparable<T> { public int compareTo(T o); }以下是实现复合key的实例,亲测,可用
public class SortKey implements WritableComparable<SortKey>{ private Text name; private IntWritable right; public SortKey() { set(new Text(), new IntWritable()); } public SortKey(Text name, IntWritable right) { set(name, right); } private void set(Text name,IntWritable right){ this.name = name; this.right = right; } /** * @return the name */ public Text getName() { return name; } /** * @param name the name to set */ public void setName(Text name) { this.name = name; } /** * @return the right */ public IntWritable getRight() { return right; } /** * @param right the right to set */ public void setRight(IntWritable right) { this.right = right; } @Override public void write(DataOutput out) throws IOException { name.write(out); right.write(out); } @Override public void readFields(DataInput in) throws IOException { name.readFields(in); right.readFields(in); } @Override public int compareTo(SortKey o) { int cmp = name.compareTo(o.name); if(cmp != 0){ return cmp; }else{ return right.compareTo(o.right); } }
<span style="white-space:pre"> </span>//到目前为止,你只能将其作为key来使用,但是如果你需要按照key的某一个值来排序,以下是重点
static{ WritableComparator.define(SortKey.class, new Comparator()); } public static class Comparator extends WritableComparator{ private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); protected Comparator() { super(SortKey.class); } /* (non-Javadoc) * @see org.apache.hadoop.io.WritableComparator#compare(byte[], int, int, byte[], int, int) */ @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try{ int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); }catch(Exception e){ throw new IllegalArgumentException(e); } } } }
基本MapReduce模式
计数与求和
问题陈述:
有许多文档,每个文档都有一些字段组成。需要计算出每个字段在所有文档中的出现次数或者这些字段的其他什么统计值。例如,给定一个log文件,其中的每条记录都包含一个响应时间,需要计算出平均响应时间。
解决方案:
让我们先从简单的例子入手。在下面的代码片段里,Mapper每遇到指定词就把频次记1,Reducer一个个遍历这些词的集合然后把他们的频次加和。
1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Reducer
7 method Reduce(term t, counts [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)
这种方法的缺点显而易见,Mapper提交了太多无意义的计数。它完全可以通过先对每个文档中的词进行计数从而减少传递给Reducer的数据量:
1 class Mapper
2 method Map(docid id, doc d)
3 H = new AssociativeArray
4 for all term t in doc d do
5 H{t} = H{t} + 1
6 for all term t in H do
7 Emit(term t, count H{t})
如果要累计计数的的不只是单个文档中的内容,还包括了一个Mapper节点处理的所有文档,那就要用到Combiner了:
1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Combiner
7 method Combine(term t, [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)
12
13 class Reducer
14 method Reduce(term t, counts [c1, c2,...])
15 sum = 0
16 for all count c in [c1, c2,...] do
17 sum = sum + c
18 Emit(term t, count sum)
应用:Log 分析, 数据查询
整理归类
问题陈述:
有一系列条目,每个条目都有几个属性,要把具有同一属性值的条目都保存在一个文件里,或者把条目按照属性值分组。 最典型的应用是倒排索引。
解决方案:
解决方案很简单。 在 Mapper 中以每个条......余下全文>>
hi.baidu.com/...163544
一篇文章,讲得挺清楚
评论暂时关闭