hadoop 二次排序,hadoop排序


                                               hadoop 二次排序

1       3
1       2
1       1
3       3
3       2
2       2
2       1
3       1

排序后:

1 1
1 2
1 3
2 1
2 2
3 1
3 2
3 3

代码为:

package com.hadoop.test.SecondSort;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


import org.apache.hadoop.io.WritableComparable;


public class SortKey implements WritableComparable<SortKey> {
private Long first;
private Long second;


public SortKey() {
}


public SortKey(Long first, Long second) {
super();
this.first = first;
this.second = second;
}


@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.first);
out.writeLong(this.second);


}


@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readLong();
this.second = in.readLong();


}


@Override
public int compareTo(SortKey o) {
if (this.first != o.first) {
return (int) (this.first - o.first);
} else {
return (int) (this.second - o.second);
}


}


@Override
public int hashCode() {
// TODO Auto-generated method stub
return this.first.hashCode()+this.second.hashCode();
}


@Override
public boolean equals(Object obj) {
if (!(obj instanceof SortKey)) {
return false;
} else {
SortKey key = (SortKey) obj;
return (this.first == key.first && this.second == key.second);
}
}


public Long getFirst() {
return first;
}


public void setFirst(Long first) {
this.first = first;
}


public Long getSecond() {
return second;
}


public void setSecond(Long second) {
this.second = second;
}





}



package com.hadoop.test.SecondSort;


import java.io.IOException;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class SortMapper extends Mapper<LongWritable, Text, SortKey, LongWritable> {


@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] arr = line.split("\t");
SortKey temp = new SortKey(Long.parseLong(arr[0]), Long.parseLong(arr[1]));
if(arr.length==2){
context.write(temp, new LongWritable(Long.parseLong(arr[1])));
}
}


}



package com.hadoop.test.SecondSort;


import java.io.IOException;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;


public class SortReducer extends Reducer<SortKey, LongWritable, LongWritable, LongWritable> {


@Override
protected void reduce(SortKey k2, Iterable<LongWritable> values,Context context)
throws IOException, InterruptedException {
for (LongWritable value : values) {
context.write(new LongWritable(k2.getFirst()), new LongWritable(k2.getSecond()));
}
}


}


package com.hadoop.test.SecondSort;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * 默认key排序 安装升序排序
 * @author 小明
 *
 */
public class JobMain {
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Job job = new Job(configuration, "sort-sort");
job.setJarByClass(JobMain.class);

job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(SortKey.class);
job.setMapOutputValueClass(LongWritable.class);

job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));

Path outputDir= new Path(args[1]);
FileSystem sys = FileSystem.get(configuration);
if(sys.exists(outputDir)){
sys.delete(outputDir, true);
}
FileOutputFormat.setOutputPath(job, outputDir);
 
job.waitForCompletion(true);
}
}



相关内容