MapReduce处理数据平均值与数值大小排行比较,mapreduce平均值
MapReduce处理数据平均值与数值大小排行比较,mapreduce平均值
一:计算数据平均值
在map中将名称作为key 数据为value写出去
/* * 计算平均成绩 * 名字作为key 分数值为value写出去 */ public class AverageMap extends Mapper<LongWritable, Text, Text, IntWritable> { protected void map( LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws java.io.IOException, InterruptedException { String line = value.toString(); if (line.trim().length() > 0) { String[] str = line.split("\t"); if (str.length == 2) { context.write(new Text(str[0]), new IntWritable(Integer.valueOf(str[1]))); } } }; }
public class AverageRedu extends Reducer<Text, IntWritable, Text, DoubleWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key, new DoubleWritable(sum / 2.0)); } }
public class AverageMain { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(AverageMain.class); job.setMapperClass(AverageMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(AverageRedu.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
二:求最大最小值
求最大最小值(简单方式:将所有数据作为valueslist)
/* * 求最大最小值(简单方式:将所有数据作为valueslist) */ public class MaxminMap extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); if (line.trim().length() > 0) { context.write(new Text("key:"), new LongWritable(Long.parseLong(line))); } } }
public class MaxminRedu extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { long max = Long.MIN_VALUE; long min = Long.MAX_VALUE; for (LongWritable val : values) { if (val.get() > max) { max = val.get(); } if (val.get() < min) { min = val.get(); } } context.write(new Text("Max"), new LongWritable(max)); context.write(new Text("Min"), new LongWritable(min)); } }
public class MaxminMain { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(MaxminMain.class); job.setMapperClass(MaxminMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(MaxminRedu.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
三:求数据的topN排行
•setup()此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高!
•cleanup()此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,也会导致Mapper任务在解析、处理每一行文本后释放资源,而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高!
PS:大数据情况下首先在map中将数据进行一次数组排行
/* * 求数据的topN排行 */ /* * 原始数据 * orderid,userid,payment,productid * 1,9818,100,121 * 2,8918,2999,22 * 3,2322,1234,11 * 4,343,2232,22 * 5,232,434,1 * 6,34,232,11 * 7,2322,9000,54 * 8,45,3454,34 */ /* * 预测结果(求TOP5-payment) * 1,9000 * 2, * 3, * 4, * 5, */ /* * 在map是将N排序 */ /* •setup(),此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高! •cleanup(),此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,也会导致Mapper任务在解析、处理每一行文本后释放资源,而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高! */ public class topNMap extends Mapper<LongWritable, Text, IntWritable, IntWritable> { int len; // topN中的N int[] top; // top数组 @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString().trim(); if (line.trim().length() > 0) { String str[] = line.split(","); if (str.length == 4) { int payment = Integer.parseInt(str[2]); // 将数据放入top[]数组中 add(payment); } } } private void add(int payment) { top[0] = payment; Arrays.sort(top); // 数组的升序排序 } // 初始化 @Override protected void setup( Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context) throws IOException, InterruptedException { len = context.getConfiguration().getInt("N", 5); // 默认为5 top = new int[len + 1]; // 以top[0]为比较标准,其后的5个数与top[0]比较。所以要(len+1)个大小 } @Override protected void cleanup( Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context) throws IOException, InterruptedException { for (int i = 1; i < len + 1; i++) { // 只需要取出top后面的(除第一个位置以外的)几个数 context.write(new IntWritable(top[i]), new IntWritable(top[i])); } } }
public class TopNRedu extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { int len; int[] top; @Override protected void reduce( IntWritable key, Iterable<IntWritable> values, Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context) throws IOException, InterruptedException { add(key.get()); } private void add(int payment) { top[0] = payment; Arrays.sort(top); // 数组的升序排序 } @Override protected void setup( Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context) throws IOException, InterruptedException { len = context.getConfiguration().getInt("N", 5); top = new int[len + 1]; } @Override protected void cleanup( Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context) throws IOException, InterruptedException { for (int i = len; i >= 1; i--) { // 降序排 //key为从到小的次序 //value为top[]数组的值 context.write(new IntWritable(len - i + 1), new IntWritable(top[i])); } } }
public class topNMain { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInt("N", Integer.parseInt(args[2])); // 设置N为第三个参数 Job job = new Job(conf); job.setJarByClass(topNMain.class); job.setMapperClass(topNMap.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(TopNRedu.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
评论暂时关闭