利用MultipleOutputs控制reduce输出路径
利用MultipleOutputs控制reduce输出路径
package com.mr.test; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.lib.MultipleOutputs; public class WordCount { public static class MyMap implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } public void configure(JobConf arg0) { } public void close() throws IOException { // TODO Auto-generated method stub } } public static class MyReduce implements Reducer<Text, IntWritable, Text, IntWritable> { private MultipleOutputs mos; private JobConf jobconf; public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } mos.getCollector(key.toString(), reporter).collect(key, new IntWritable(sum)); // mos.getCollector(key.toString(),"test", reporter).collect(key, new IntWritable(sum)); } public void configure(JobConf jobconf) { mos = new MultipleOutputs(jobconf); this.jobconf = jobconf; } public void close() throws IOException { mos.close(); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MyMap.class); conf.setCombinerClass(MyReduce.class); conf.setReducerClass(MyReduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); System.out.println("args1:"+args[0]); System.out.println("args2:"+args[1]); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); MultipleOutputs.addNamedOutput(conf, "test1", TextOutputFormat.class, LongWritable.class, Text.class); MultipleOutputs.addNamedOutput(conf, "test2", TextOutputFormat.class, LongWritable.class, Text.class); MultipleOutputs.addNamedOutput(conf, "test3", TextOutputFormat.class, LongWritable.class, Text.class); MultipleOutputs.addNamedOutput(conf, "test4", TextOutputFormat.class, LongWritable.class, Text.class); MultipleOutputs.addNamedOutput(conf, "test5", TextOutputFormat.class, LongWritable.class, Text.class); JobClient.runJob(conf); } }
input.txt:
test1
test2
test3
test4
test5
output:
-rw-r--r-- 2 test supergroup 0 2014-04-20 11:23 /chukwa/output/0419-10/_SUCCESS
drwxr-xr-x - test supergroup 0 2014-04-20 11:23 /chukwa/output/0419-10/_logs
-rw-r--r-- 2 test supergroup 42 2014-04-20 11:23 /chukwa/output/0419-10/part-00000.lzo
-rw-r--r-- 2 test supergroup 58 2014-04-20 11:23 /chukwa/output/0419-10/test1-m-00000.lzo
-rw-r--r-- 2 test supergroup 58 2014-04-20 11:23 /chukwa/output/0419-10/test2-m-00000.lzo
-rw-r--r-- 2 test supergroup 58 2014-04-20 11:23 /chukwa/output/0419-10/test3-m-00000.lzo
-rw-r--r-- 2test supergroup 58 2014-04-20 11:23 /chukwa/output/0419-10/test4-m-00001.lzo
-rw-r--r-- 2 test supergroup 58 2014-04-20 11:23 /chukwa/output/0419-10/test5-m-00001.lzo
评论暂时关闭