MapReduce编程实例(一)-求平均数


开始学习写一些MR编程实例,工作中即将使用(刚刚开始,如果有错误和建议,欢迎指出)

现在有一个文件,里面记录了全校所有学生各科成绩,求每个学生的平均成绩,格式如下

小明 语文 92
小明 数学 88
小明 英语 90
小强 语文 76
小强 数学 66
小强 英语 80
小木 语文 60
小木 数学 65
小木 英语 61


解决思路

Map阶段先将数据拆成key:姓名,value:课程_成绩的格式提供给reduce,默认的partitioner会将名字相同的学生发到同一个reduce上面

这样reduce可以根据总分/科目数计算平均成绩。

逻辑比较简单,

代码如下:

package com.test.mr2;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.Hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/*
 * 计算学生课程平均成绩(某学生总分/课程数)
 * 输入格式
 *
 * 小明 语文 92
 * 小明 数学 88
 * 小明 英语 90
 * 小强 语文 76
 * 小强 数学 66
 * 小强 英语 80
 * 小木 语文 60
 * 小木 数学 65
 * 小木 英语 61
 *
 * 输出
 *
 * 小明 90
 * 小强 74
 * 小木 62
 */
public class Average {

 public static class AverMapper extends Mapper<Object, Text, Text, Text> {
  @Override
  protected void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {
   String line = value.toString();
   StringTokenizer stringTokenizer = new StringTokenizer(line, "\n");
   String name = "";
   StringBuffer out = new StringBuffer(32);
   while (stringTokenizer.hasMoreElements()) {
    String tmp = stringTokenizer.nextToken();
    StringTokenizer st = new StringTokenizer(tmp);
    while (st.hasMoreElements()) {
     name = st.nextToken();
     out.append(st.nextToken());
     out.append("_");
     out.append(st.nextToken());
     // 使用默认的hash partitioner将名字相同的同学发到一个reduce上
     context.write(new Text(name), new Text(out.toString()));
    }
   }
  }

 }

 public static class AverReducer extends
   Reducer<Text, Text, Text, FloatWritable> {
  @Override
  protected void reduce(Text key, Iterable<Text> values, Context context)
    throws IOException, InterruptedException {
   Iterator<Text> it = values.iterator();
   //计算每个key对应的记录条数和总分数
   int count = 0;
   int sum = 0;
   while (it.hasNext()) {
    String value = it.next().toString();
    String[] strs = value.split("\\_");
    if (strs.length < 2) {
     continue;
    }
    try {
     sum += Integer.parseInt(strs[1]);
    } catch (Exception e) {
     System.err.println(e.getMessage());
    }
    count++;
   }
   FloatWritable average = new FloatWritable(sum / count);
   context.write(key, average);
  }
 }

 public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  System.out.println("Begin.....");
  Configuration conf =new Configuration();
  String[] arguments=new GenericOptionsParser(conf, args).getRemainingArgs();
  if(arguments.length<2){
   System.out.println("Usage:com.test.mr2.Average in out");
   System.exit(1);
  }
  Job job=new Job(conf,"Average");
  job.setJarByClass(Average.class);
  job.setMapperClass(AverMapper.class);
  job.setReducerClass(AverReducer.class);
  job.setMapOutputValueClass(Text.class);
  job.setMapOutputKeyClass(Text.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(FloatWritable.class);
  FileInputFormat.addInputPath(job, new Path(arguments[0]));
  FileOutputFormat.setOutputPath(job, new Path(arguments[1]));
  System.exit(job.waitForCompletion(true)?0:1);
  System.out.println("End.....");
 }

}

相关内容