MapReduce处理数据平均值与数值大小排行比较,mapreduce平均值


一:计算数据平均值

在map中将名称作为key 数据为value写出去

/*
 * 计算平均成绩
 * 名字作为key	分数值为value写出去
 */
public class AverageMap extends Mapper<LongWritable, Text, Text, IntWritable> {
	protected void map(
			LongWritable key,
			Text value,
			org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws java.io.IOException, InterruptedException {
		String line = value.toString();
		if (line.trim().length() > 0) {
			String[] str = line.split("\t");
			if (str.length == 2) {
				context.write(new Text(str[0]),
						new IntWritable(Integer.valueOf(str[1])));
			}
		}

	};
}


 

public class AverageRedu extends
		Reducer<Text, IntWritable, Text, DoubleWritable> {
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Reducer<Text, IntWritable, Text, DoubleWritable>.Context context)
			throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable value : values) {
			sum += value.get();
		}
		context.write(key, new DoubleWritable(sum / 2.0));
	}

}


 

public class AverageMain {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf);
		job.setJarByClass(AverageMain.class);

		job.setMapperClass(AverageMap.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		job.setReducerClass(AverageRedu.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DoubleWritable.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.waitForCompletion(true);
	}
}


 

二:求最大最小值

求最大最小值(简单方式:将所有数据作为valueslist)

/*
 * 求最大最小值(简单方式:将所有数据作为valueslist)
 */
public class MaxminMap extends Mapper<LongWritable, Text, Text, LongWritable> {
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, Text, LongWritable>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		if (line.trim().length() > 0) {
			context.write(new Text("key:"),
					new LongWritable(Long.parseLong(line)));
		}
	}
}


 

public class MaxminRedu extends Reducer<Text, LongWritable, Text, LongWritable> {
	@Override
	protected void reduce(Text key, Iterable<LongWritable> values,
			Reducer<Text, LongWritable, Text, LongWritable>.Context context)
			throws IOException, InterruptedException {
		long max = Long.MIN_VALUE;
		long min = Long.MAX_VALUE;
		for (LongWritable val : values) {
			if (val.get() > max) {
				max = val.get();
			}
			if (val.get() < min) {
				min = val.get();
			}
		}
		context.write(new Text("Max"), new LongWritable(max));
		context.write(new Text("Min"), new LongWritable(min));
	}
}


 

public class MaxminMain {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf);
		job.setJarByClass(MaxminMain.class);

		job.setMapperClass(MaxminMap.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);

		job.setReducerClass(MaxminRedu.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.waitForCompletion(true);
	}
}

 

三:求数据的topN排行

•setup()此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高!


•cleanup()此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,也会导致Mapper任务在解析、处理每一行文本后释放资源,而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高!

PS:大数据情况下首先在map中将数据进行一次数组排行

/*
 * 求数据的topN排行
 */
/*
 * 原始数据
 * orderid,userid,payment,productid
 * 1,9818,100,121
 * 2,8918,2999,22
 * 3,2322,1234,11
 * 4,343,2232,22
 * 5,232,434,1 
 * 6,34,232,11
 * 7,2322,9000,54
 * 8,45,3454,34
 */
/*
 * 预测结果(求TOP5-payment)
 * 1,9000
 * 2,
 * 3,
 * 4,
 * 5,
 */
/*
 * 在map是将N排序
 */
/*
 •setup(),此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高!
 •cleanup(),此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,也会导致Mapper任务在解析、处理每一行文本后释放资源,而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高!
 */
public class topNMap extends
		Mapper<LongWritable, Text, IntWritable, IntWritable> {
	int len; // topN中的N
	int[] top; // top数组

	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString().trim();
		if (line.trim().length() > 0) {
			String str[] = line.split(",");
			if (str.length == 4) {
				int payment = Integer.parseInt(str[2]);
				// 将数据放入top[]数组中
				add(payment);
			}
		}
	}

	private void add(int payment) {
		top[0] = payment;
		Arrays.sort(top); // 数组的升序排序
	}

	// 初始化
	@Override
	protected void setup(
			Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context)
			throws IOException, InterruptedException {
		len = context.getConfiguration().getInt("N", 5); // 默认为5
		top = new int[len + 1]; // 以top[0]为比较标准,其后的5个数与top[0]比较。所以要(len+1)个大小
	}

	@Override
	protected void cleanup(
			Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context)
			throws IOException, InterruptedException {
		for (int i = 1; i < len + 1; i++) { // 只需要取出top后面的(除第一个位置以外的)几个数
			context.write(new IntWritable(top[i]), new IntWritable(top[i]));
		}
	}
}


 

public class TopNRedu extends
		Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
	int len;
	int[] top;

	@Override
	protected void reduce(
			IntWritable key,
			Iterable<IntWritable> values,
			Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context)
			throws IOException, InterruptedException {
		add(key.get());
	}

	private void add(int payment) {
		top[0] = payment;
		Arrays.sort(top); // 数组的升序排序
	}

	@Override
	protected void setup(
			Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context)
			throws IOException, InterruptedException {
		len = context.getConfiguration().getInt("N", 5);
		top = new int[len + 1];
	}

	@Override
	protected void cleanup(
			Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context)
			throws IOException, InterruptedException {
		for (int i = len; i >= 1; i--) { // 降序排 //key为从到小的次序 //value为top[]数组的值
			context.write(new IntWritable(len - i + 1), new IntWritable(top[i]));
		}
	}
}


 

public class topNMain {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		conf.setInt("N", Integer.parseInt(args[2])); // 设置N为第三个参数
		Job job = new Job(conf);
		job.setJarByClass(topNMain.class);

		job.setMapperClass(topNMap.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(IntWritable.class);

		job.setReducerClass(TopNRedu.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.waitForCompletion(true);
	}
}



 

相关内容