hadoop中文分词、词频统计及排序,


需求如下:

有如图所示的输入文件。其中第一列代表ip地址,之后的偶数列代表搜索词,数字(奇数列)代表搜索次数,使用"\t"分隔。现在需要对搜索词进行分词并统计词频,此处不考虑搜索次数,可能是翻页,亦不考虑搜索链接的行为。




这里中文分词使用了IK分词包,直接将源码放入src中。感谢IK分词。

程序如下:

<span style="font-size:14px;">package seg;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;


/**
 * @author zhf 
 * @version 创建时间:2014年8月16日 下午3:04:40
 */
public class SegmentTool extends Configured implements Tool{
	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new SegmentTool(), args);
		System.exit(exitCode);
	}

	@Override
	public int run(String[] arg0) throws Exception {
		Configuration conf = new Configuration();
		String[] args = new GenericOptionsParser(conf,arg0).getRemainingArgs();
		if(args.length != 2){
			System.err.println("Usage:seg.SegmentTool <input> <output>");
			System.exit(2);
		}
		Job job = new Job(conf,"nseg.jar");
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(new Path(args[1])))
			fs.delete(new Path(args[1]),true);
		job.setJarByClass(SegmentTool.class);
		job.setMapperClass(SegmentMapper.class);
		job.setCombinerClass(SegReducer.class);
		job.setReducerClass(SegReducer.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static class SegmentMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
		private IKSegmenter iks = new IKSegmenter(true);
		private Text word = new Text();	
		private final static IntWritable one = new IntWritable(1);
		public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
			String line = value.toString().trim();
			String[] str = line.split("\t");
			for(int i=1;i<str.length;i+=2){
				String tmp = str[i];
				if(tmp.startsWith("http"))
					continue;
				List<String> list = segment(tmp);
				for(String s : list){
					word.set(s);
					context.write(word, one);
				}
			}
		}
		private List<String> segment(String str) throws IOException{
			byte[] byt = str.getBytes();
			InputStream is = new ByteArrayInputStream(byt);
			Reader reader = new InputStreamReader(is);
			iks.reset(reader);
			Lexeme lexeme;
			List<String> list = new ArrayList<String>();
			while((lexeme = iks.next()) != null){
				String text = lexeme.getLexemeText();
				list.add(text);
			}
			return list;
		}
	}
	public static class SegReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
		private IntWritable result = new IntWritable();
		public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
			int sum = 0;
			for(IntWritable val : values)
				sum += val.get();
			result.set(sum);
			context.write(key, result);
		}
	}

}</span>

使用的hadoop环境为:Hadoop 2.3.0-cdh5.0.0。需要引入三个hadoop相关的jar : hadoop-mapreduce-client-core-2.0.0-cdh4.6.0.jar、hadoop-common-2.0.0-cdh4.6.0.jar、commons-cli-1.2.jar。

打包后,执行命令:yarn jar seg.jar seg.SegmentTool /test/user/zhf/input /test/user/zhf/output

输出结果部分如下:

<span style="font-size:18px;">阿迪达斯        1
附近    2
陈      22
陈乔恩  1
陈奕迅  1
陈毅    2
限额    4
陕西    4
除个别  1
隐私    1
隔壁    1
集成    4
集锦    1
雨中    2
雪      5
露      1
青      7
青岛    2</span>

但是并没有排序,如果数据量比较小,可以采用linux命令:sort -k2 -n -r kw_result.txt > kw_freq.txt进行排序。

数据量大的话,可以将结果导入Hive,因为只有两列了,hive -e "select key,count from kw_table sort by count desc;" > kw_freq.txt 即可得到有序的结果。

亦可以将之前的ouput作为下一个job的input,实现排序。需要反转map输出的key和value。

代码如下:

<span style="font-size:14px;">package seg;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author zhf 
 * @version 创建时间:2014年8月16日 下午4:51:00
 */
public class SortByFrequency extends Configured implements Tool{
	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new SortByFrequency(), args);
		System.exit(exitCode);
	}

	@Override
	public int run(String[] arg0) throws Exception {
		Configuration conf = new Configuration();
		String[] args = new GenericOptionsParser(conf,arg0).getRemainingArgs();
		if(args.length != 2){
			System.err.println("Usage:seg.SortByFrequency <input> <output>");
			System.exit(2);
		}
		Job job = new Job(conf,"nseg.jar");
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(new Path(args[1])))
			fs.delete(new Path(args[1]),true);
		job.setJarByClass(SortByFrequency.class);
		job.setMapperClass(SortMapper.class);
		job.setReducerClass(SortReducer.class);
		job.setSortComparatorClass(DescComparator.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Text.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static class SortMapper extends Mapper<LongWritable,Text,IntWritable,Text>{
		public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
			String str[] = value.toString().split("\t");
			context.write(new IntWritable(Integer.valueOf(str[1])), new Text(str[0]));
		}
	}
	public static class SortReducer extends Reducer<IntWritable,Text,Text,IntWritable>{
		private Text result = new Text();
		public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
			for(Text val : values){
				result.set(val);
				context.write(result, key);
			}
		}
	}
	public static class DescComparator extends WritableComparator{

		protected DescComparator() {
			super(IntWritable.class,true);
		}

		@Override
		public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
				int arg4, int arg5) {
			return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);
		}
		@Override
		public int compare(Object a,Object b){
			return -super.compare(a, b);
		}
	}
}</span>

head查看的结果如下:

的      175
上海    158
上      85
都市    76
在      71
ppt     64
运输    58
电视    58
式      58
2       52





相关内容