hadoop wordcount 代码


写个wordcount,刚入门的同学可以参考一下。 这个是旧api实现的。

直接贴程序了

一、程序

package WordCount;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool {
 
  public static class WCMapper extends MapReduceBase implements Mapper<Object, Text, Text, IntWritable> {
	  private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
    /**
     * 这里主要实现分词,将单词 ,组织成 key-value  即 单词 -1  的形式
     */
    public void map(Object key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
    	String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            output.collect(word, one);
        }
    }
  }

  public static class WCReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
    /**
     * 这里主要实现累加计数
     */
    public void reduce(Text key, Iterator<IntWritable> values,  OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
    }
  }

  @Override
  public int run(String[] paths) throws Exception {

    Configuration conf = getConf();
    JobConf job = new JobConf(conf, WordCount.class);
    Path in = new Path(paths[0]);
    Path out = new Path(paths[1]);
    FileInputFormat.setInputPaths(job, in);
    FileOutputFormat.setOutputPath(job, out);

    job.setJobName("ajl word count ");
    job.setMapperClass(WCMapper.class);
    job.setReducerClass(WCReducer.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setInputFormat(TextInputFormat.class);
    job.setOutputFormat(TextOutputFormat.class);

    JobClient.runJob(job);
    return 0;
  }

  public static void main(String[] args) throws Exception {
    long startT = System.currentTimeMillis();
    String inputDir = "hdfs://xxx/mr/WordCount/input";
    String outputDir = "hdfs://xxx/mr/WordCount/output";
    String[] paths = { inputDir, outputDir };
    int exitCode = ToolRunner.run(new Configuration(), new WordCount(), paths);

    System.out.println("本次MR统计共耗时:" + ((System.currentTimeMillis() - startT) / 1000 / 60) + "分");
    System.exit(exitCode);

  }

 
}


二、准备


main函数中的输入输出目录,地址写自己的。

输入:

smile
smile  smile
allen
123

111


将输入保存到一个文件中,上传到hdfs,命令 hadoop fs -put wordcount.txt /mr/WordCount/input/wordcount.txt

输出在output 的文件夹中


三、运行

可以在eclipse中直接运行(一般调试用),也可打个jar包放到namenode上用命令运行


输出:

111 1
123 1
allen 1
smile 3



在eclipse中运行过程:

14/03/23 12:51:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/03/23 12:51:11 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/03/23 12:51:11 WARN snappy.LoadSnappy: Snappy native library not loaded
14/03/23 12:51:11 INFO mapred.FileInputFormat: Total input paths to process : 1
14/03/23 12:51:11 INFO mapred.JobClient: Running job: job_local_0001
14/03/23 12:51:11 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
14/03/23 12:51:11 INFO mapred.MapTask: numReduceTasks: 1
14/03/23 12:51:11 INFO mapred.MapTask: io.sort.mb = 100
14/03/23 12:51:11 INFO mapred.MapTask: data buffer = 79691776/99614720
14/03/23 12:51:11 INFO mapred.MapTask: record buffer = 262144/327680
14/03/23 12:51:11 INFO mapred.MapTask: Starting flush of map output
14/03/23 12:51:11 INFO mapred.MapTask: Finished spill 0
14/03/23 12:51:11 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/03/23 12:51:12 INFO mapred.JobClient:  map 0% reduce 0%
14/03/23 12:51:14 INFO mapred.LocalJobRunner: hdfs://haier001:9099/mr/WordCount/input/wordcount.txt:0+36
14/03/23 12:51:14 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
14/03/23 12:51:14 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
14/03/23 12:51:14 INFO mapred.LocalJobRunner: 
14/03/23 12:51:14 INFO mapred.Merger: Merging 1 sorted segments
14/03/23 12:51:14 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 70 bytes
14/03/23 12:51:14 INFO mapred.LocalJobRunner: 
14/03/23 12:51:14 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/03/23 12:51:14 INFO mapred.LocalJobRunner: 
14/03/23 12:51:14 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/03/23 12:51:14 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://haier001:9099/mr/WordCount/output
14/03/23 12:51:15 INFO mapred.JobClient:  map 100% reduce 0%
14/03/23 12:51:17 INFO mapred.LocalJobRunner: reduce > reduce
14/03/23 12:51:17 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
14/03/23 12:51:18 INFO mapred.JobClient:  map 100% reduce 100%
14/03/23 12:51:18 INFO mapred.JobClient: Job complete: job_local_0001
14/03/23 12:51:18 INFO mapred.JobClient: Counters: 20
14/03/23 12:51:18 INFO mapred.JobClient:   File Input Format Counters 
14/03/23 12:51:18 INFO mapred.JobClient:     Bytes Read=36
14/03/23 12:51:18 INFO mapred.JobClient:   File Output Format Counters 
14/03/23 12:51:18 INFO mapred.JobClient:     Bytes Written=28
14/03/23 12:51:18 INFO mapred.JobClient:   FileSystemCounters
14/03/23 12:51:18 INFO mapred.JobClient:     FILE_BYTES_READ=392
14/03/23 12:51:18 INFO mapred.JobClient:     HDFS_BYTES_READ=72
14/03/23 12:51:18 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=82876
14/03/23 12:51:18 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=28
14/03/23 12:51:18 INFO mapred.JobClient:   Map-Reduce Framework
14/03/23 12:51:18 INFO mapred.JobClient:     Map output materialized bytes=74
14/03/23 12:51:18 INFO mapred.JobClient:     Map input records=5
14/03/23 12:51:18 INFO mapred.JobClient:     Reduce shuffle bytes=0
14/03/23 12:51:18 INFO mapred.JobClient:     Spilled Records=12
14/03/23 12:51:18 INFO mapred.JobClient:     Map output bytes=56
14/03/23 12:51:18 INFO mapred.JobClient:     Total committed heap usage (bytes)=1065484288
14/03/23 12:51:18 INFO mapred.JobClient:     Map input bytes=36
14/03/23 12:51:18 INFO mapred.JobClient:     SPLIT_RAW_BYTES=106
14/03/23 12:51:18 INFO mapred.JobClient:     Combine input records=0
14/03/23 12:51:18 INFO mapred.JobClient:     Reduce input records=6
14/03/23 12:51:18 INFO mapred.JobClient:     Reduce input groups=4
14/03/23 12:51:18 INFO mapred.JobClient:     Combine output records=0
14/03/23 12:51:18 INFO mapred.JobClient:     Reduce output records=4
14/03/23 12:51:18 INFO mapred.JobClient:     Map output records=6
本次MR统计共耗时:0分







相关内容