MapReduce编程之倒排索引,mapreduce编程索引
MapReduce编程之倒排索引,mapreduce编程索引
任务要求:
//输入文件格式
18661629496 110
13107702446 110
1234567 120
2345678 120
987654 110
2897839274 18661629496
//输出文件格式格式
11018661629496|13107702446|987654|18661629496|13107702446|987654|
1201234567|2345678|1234567|2345678|
186616294962897839274|2897839274|
mapreduce程序编写:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Test2 {
enum Counter
{
LINESKIP, //记录出错的行
}
public static class Map extends Mapper<LongWritable, Text, Text, Text>{
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString(); //读取源数据
try
{
//数据处理
String [] lineSplit = line.split( " " ); //18661629496,110
String anum = lineSplit[ 0 ];
String bnum = lineSplit[ 1 ];
//输出格式:110,18661629496
context.write( new Text(bnum), new Text(anum));
}
catch (ArrayIndexOutOfBoundsException e)
{
context.getCounter(Counter.LINESKIP).increment( 1 ); //出错时计数器+1
return ;
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String valueString;
String out= "" ;
for (Text value:values)
{
valueString=value.toString();
out+=valueString+ "|" ;
}
context.write(key, new Text(out));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
if (args.length != 2 ) {
System.err.println( "请配置输入输出路径 " );
System.exit( 2 );
}
//各种配置
Job job = new Job(conf, "telephone " ); //作业名称配置
//类配置
job.setJarByClass(Test2. class );
job.setMapperClass(Map. class );
job.setReducerClass(Reduce. class );
//map输出格式配置
job.setMapOutputKeyClass(Text. class );
job.setMapOutputValueClass(Text. class );
//作业输出格式配置
job.setOutputKeyClass(Text. class );
job.setOutputValueClass(Text. class );
//增加输入输出路径
FileInputFormat.addInputPath(job, new Path(args[ 0 ]));
FileOutputFormat.setOutputPath(job, new Path(args[ 1 ]));
//任务完成时退出
System.exit(job.waitForCompletion( true ) ? 0 : 1 );
}
}
|
将mapreduce程序打包为jar文件:
1.右键项目名称->Export->java->jar file
2.配置jar文件存储位置
3.选择main calss
4.运行jar文件
[liuqingjie@master hadoop-0.20.2]$ bin/hadoop jar /home/liuqingjie/test2.jar /user/liuqingjie/in /user/liuqingjie/out
15/05/14 01:46:47 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/05/14 01:46:47 INFO input.FileInputFormat: Total input paths to process : 2
15/05/14 01:46:48 INFO mapred.JobClient: Running job: job_201505132004_0005
15/05/14 01:46:49 INFO mapred.JobClient: map 0% reduce 0%
15/05/14 01:46:57 INFO mapred.JobClient: map 100% reduce 0%
15/05/14 01:47:09 INFO mapred.JobClient: map 100% reduce 100%
……………………………………………………………………………………
查看结果
[liuqingjie@master hadoop-0.20.2]$ bin/hadoop dfs -cat ./out/*
cat: Source must be a file.
110 18661629496|13107702446|987654|18661629496|13107702446|987654|
120 1234567|2345678|1234567|2345678|
18661629496 2897839274|2897839274|
评论暂时关闭