在hadoop上进行编写mapreduce程序,统计关键词在text出现次数,hadoopmapreduce


mapreduce的处理过程分为2个阶段,map阶段,和reduce阶段。在要求统计指定文件中的所有单词的出现次数时,
map阶段把每个关键词写到一行上以逗号进行分隔,并初始化数量为1(相同的单词hadoop中的map会自动放到一行中)
reduce阶段是把每个单词出现的频率统计出来重新写回去。

如代码:

package com.clq.hadoop2;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	final Text key2 = new Text();
	// value2 表示单词在该行中的出现次数
	final IntWritable value2 = new IntWritable(1);
	// key 表示文本行的起始位置
	// value 表示文本行
	protected void map(LongWritable key, Text value, Context context)
			throws java.io.IOException, InterruptedException {
		final String[] splited = value.toString().split(",");
		for (String word : splited) {
			key2.set(word);
			// 把key2、value2写入到context中
			context.write(key2, value2);
		}
	}
}

package com.clq.hadoop2;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	// value3表示单词出现的总次数
	final IntWritable value3 = new IntWritable(0);
	/**
	 * key 表示单词 values 表示map方法输出的1的集合 context 上下文对象
	 */
	protected void reduce(Text key, java.lang.Iterable<IntWritable> values,
			Context context) throws java.io.IOException, InterruptedException {
		int sum = 0;
		for (IntWritable count : values) {
			sum += count.get();
		}
		// 执行到这里,sum表示该单词出现的总次数
		// key3表示单词,是最后输出的key
		final Text key3 = key;
		// value3表示单词出现的总次数,是最后输出的value
		value3.set(sum);
		context.write(key3, value3);
	}
}

package com.clq.hadoop2;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MapperReducer {

	public static void main(String[] args) throws IOException,
			InterruptedException, ClassNotFoundException {
	        //指定输入和输出路径
		final String INPUT_PATH = "hdfs://ubuntu:9000/Input";
		final String OUTPUT_PATH = "hdfs://ubuntu:9000/output";
		//创建一个job对象封装运行时所需要的信息
		final Job job = new Job(new Configuration(),"MapperReducer");
		//打成jar执行
		job.setJarByClass(MapperReducer.class);
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
		//指定自己自定义的mapper类
		job.setMapperClass(MyMapper.class);
		//指定运行mapper类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		//指定自己定义的reducer类
		job.setReducerClass(MyReducer.class);
		//指定reducer的key和value类型
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		job.waitForCompletion(true);
		
		 
	}
}



Hadoop mapreduce程序问题

你需要的就是一个wordcout程序,可以参考hadoop的example包里面的wordcount例子。

你不需要两个key,按你的描述,将用户ID+网站url联合组成1个key,对应wordcount里面的word。最小粒度是一个用户访问一个网站的行为集-unitA,reduce出来的value就是unitA的流量。
 

【Hadoop】问如果我想把两个mapreduce程序顺序连接起来应该怎写程序

你可以自己设置输入输出路径,所以设置就行了。。。
example:
JobConf conf1 = new JobConf(YourClass.class);
//set configurations
...
//set inputformat
conf1.setInputFormat(SomeInputFormatExtendsFromInputFormat.class)
conf1.setOutputFormat(SomeOutputFormatExtendsFromOutputFormat.class)
//set input path
FileInputFormat.setInputPaths(conf1, "/your_input_dir");
FileOutputFormat.setOutputPaths(conf1, "/your_first_output_dir");
JobClient.runJob(conf1);
//at this point, the job should have finished. Use submitJob(conf1) to submit it asynchronisely.
JobConf conf2 = new JobConf();
//do the same for conf2, except the input path
FileInputFormat.setInputPaths(conf1, "/your_first_output_dir");
FileOutputFormat.setOutputPaths(conf1, "/your_first_input_dir");
JobClient.runJob(conf);
自己继承InputFormat, OutputFormat来定义合适的分割,读,写文件方式。mapreduce有一些实现好的,比如FileInputFormat, SequenceFileInputFormat。必要的时候读一下源代码,就清楚了。hadoop mapreduce 的最基本的文档见hadoop.apache.org/...l.html
 

相关内容