hadoop之WordCount源码分析,hadoopwordcount


//最近在研究hadoop,第一个想要要开始研究的必定是wordcount程序了。看了《hadoop应用开发实战讲解》结合自己的理解,对wordcount的源码进行分析。
<pre name="code" class="java">

package org.apache.hadoop.mapred;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class WordCount extends Configured implements Tool {
  
  /*
这个类实现mapper接口的map方法,输入的是文本总的每一行。利用StringTokenizer将字符串拆分成单词。然后将输出结果(word, 1)写入到OutputCollector中去
OutputCollector有hadoop框架提供,负责收集mapper和reducer的输出数据,实现map函数和reduce函数时。只需要将输出的<key,value>对向OutputCollector一丢即可,其余的事情框架会自己处理。
   */
  public static class MapClass extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable> {
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
/*类中的LongWritable,  Text, IntWritable是hadoop中实现的用于封装Java数据类型的类,这些类都能够被串行化从而便于在分布式系统中进行数据交换,可以将它们等同的视为long,string,int的替代品
*/
    public void map(LongWritable key, Text value, 
                    OutputCollector<Text, IntWritable> output, 
                    Reporter reporter) throws IOException {
      String line = value.toString();
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        output.collect(word, one);//输出结果(word,1)
      }
    }
  }
  
  /*
	此类实现的是Reducer接口中的reduce方法,函数中的参数key.value是由mapper输出的中间结果,values是一个iterator(迭代器)
   */
  public static class Reduce extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable> {
    
    public void reduce(Text key, Iterator<IntWritable> values,
                       OutputCollector<Text, IntWritable> output, 
                       Reporter reporter) throws IOException {
      int sum = 0;
/*
遍历这个迭代器,就能够得到有相同的key的所有的value值。
此处的key是一个单词,而value则是词频
*/
      while (values.hasNext()) {
        sum += values.next().get();
      }
	//遍历后得到这个单词出现的总次数。
      output.collect(key, new IntWritable(sum));
    }
  }
  
  static int printUsage() {
    System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");//输入输入路径
    ToolRunner.printGenericCommandUsage(System.out);
    return -1;
  }
  
  /*
	Wordcount 中map/reduce项目的主要驱动程序,调用此方法提交的map / reduce任务。在hadoop中一次计算任务成为一个job,可以通过以一个JobConf对象设置如何运行这个job,此处定义了输出的key 类型是text,而value的类型是IntWritable
   */
  public int run(String[] args) throws Exception {
    JobConf conf = new JobConf(getConf(), WordCount.class);
    conf.setJobName("wordcount");
 
    // key是text(words)
    conf.setOutputKeyClass(Text.class);
    // value是IntWritable (ints)
    conf.setOutputValueClass(IntWritable.class);
    
    conf.setMapperClass(MapClass.class);        
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);
    
    List<String> other_args = new ArrayList<String>();
    for(int i=0; i < args.length; ++i) {
      try {
        if ("-m".equals(args[i])) {
          conf.setNumMapTasks(Integer.parseInt(args[++i]));
        } else if ("-r".equals(args[i])) {
          conf.setNumReduceTasks(Integer.parseInt(args[++i]));
        } else {
          other_args.add(args[i]);
        }
      } catch (NumberFormatException except) {
        System.out.println("ERROR: Integer expected instead of " + args[i]);
        return printUsage();
      } catch (ArrayIndexOutOfBoundsException except) {
        System.out.println("ERROR: Required parameter missing from " +
                           args[i-1]);
        return printUsage();
      }
    }
    // Make sure there are exactly 2 parameters left.
    if (other_args.size() != 2) {
      System.out.println("ERROR: Wrong number of parameters: " +
                         other_args.size() + " instead of 2.");
      return printUsage();
    }
    FileInputFormat.setInputPaths(conf, other_args.get(0));
    FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
        
    JobClient.runJob(conf);
    return 0;
  }
  
  
  public static void main(String[] args) throws Exception {
	/* ToolRunner的run方法开始,run方法有三个参数。第一个是Configuration类的实例,第二个是wordcount的实例,args则是从控制台接收到的命令行数组
	*/
    int res = ToolRunner.run(new Configuration(), new WordCount(), args);
    System.exit(res);
  }

}









hadoop中wordcount的代码解释

这个不用管啦!是你运行wordcount程序时命令行参数的读取,不用每段代码都要弄得很清楚,知道它是干嘛的就行啦
 

hadoop中wordcount的代码,解释

System.exit是main的退出方法
()里面不是给明了。。job.waitForCompletion(true)
mapreduce的job完成了就退出
 

相关内容