Hadoop学习之MapReduce(一)


在学习过了HDFS架构和Hadoop的配置管理后,现在学习MapReduce应用程序的编写和管理。首先简单介绍一下MapReduce框架。

MapReduce是一个易于编写程序的软件框架,这些应用程序以可靠的、容错的模式并行的运行在很大规模的商用硬件集群上(数以千计的节点),处理超大数量的数据(超过TB的数据集)。一个MapReduce作业通常将输入数据集分割为独立的数据块,这些数据块被map任务以完全并行的方式处理,MapReduce框架整理map任务的输出结果,然后map的输出结果做为reduce任务的输入。典型地,作业的输入和输出都存储在文件系统中。MapReduce框架处理调度任务,监控任务和重新执行失败的任务。

通常地,计算节点和存储节点是相同的节点,也就是MapReduce框架和HDFS运行在相同的节点集之上。这种配置方式允许MapReduce框架在数据存放的节点上有效地调度任务,导致集群中非常高的带宽传输率。MapReduce由单一的主JobTracker和每个集群节点上一个的从TaskTracker组成。主节点负责调度作业在从节点上的组件任务,监控任务和重新执行失败的任务,从节点执行由主节点指示的任务。

最低限度地,应用程序指定输入输出位置,通过实现恰当地的接口和(或者)抽象类提供map和reduce函数,这些信息和其它的作业参数组成了作业的配置管理(jobconfiguration)。Hadoop的作业客户端提交作业(jar文件或者可执行文件等)和JobTrackerde 配置信息,JobTracker开始承担分发软件/配置信息到从节点上,调度任务和监控任务,向作业客户端提供状态和诊断信息。

虽然Hadoop框架是用Java语言实现的,但是MapReduce应用程序不是必须用Java编写,还可以采用下面两种方式实现:

  • Hadoop Streaming。该方式允许用户创建和允许任何可执行文件(比如shell脚本)作为mapper和reducer。
  • Hadoop Pipes。该方式是SWIG,兼容c++ API实现MapReduce应用程序(该方式不是基于JNI的,即Java本地调用)

在简单地介绍了MapReduce框架后,通过一个例子直观地看看MapReduce应用是如何编写和运行的。MapReduce框架仅操作不同类型的<key, value>对,也就是MapReduce将作业的输入看做<key, value>对的集合,并且产生<key, value>对集合作为作业的输出。作为Key和Value的类必须可以被框架序列化,因此需要实现org.apache.hadoop.io.Writable接口,另外Key还必须实现org.apache.hadoop.io.WritableComparable接口便于被框架排序。

 一个MapReduce作业的输入和输出过程如下:


其中<k1,v1>作为输入,输出为<k3,v3>,map的输出<k2,v2>作为reduce的输入。

现在开始MapReduce应用程序之旅吧,例子的名称为WordCount,用于统计输入集合中每个单词的出现次数,该应用程序可以运行在单机模式、伪分布模式和全分布模式下。下面是WordCount的源代码:

package org.learning;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool{
 	
 	    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
 	      private final static IntWritable one = new IntWritable(1);
 	      private Text word = new Text();
 	
 	      public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 	        String line = value.toString();
 	        StringTokenizer tokenizer = new StringTokenizer(line);
 	        while (tokenizer.hasMoreTokens()) {
 	          word.set(tokenizer.nextToken());
 	          context.write(word, one);
 	        }
 	      }
 	    }
 	
 	    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
 	      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
 	    	 int sum = 0;
 	        for(IntWritable value : values){
 	        	sum += value.get();
 	        }

 	       context.write(key, new IntWritable(sum));
 	      }
 	    }
 
		@Override
		public int run(String[] as) throws Exception {
			Configuration conf = getConf();
			Job job = new Job(conf, "wordCount");
			job.setJarByClass(WordCount.class);
			
			Path in = new Path(as[0]);
			Path out =  new Path(as[1]);
			FileInputFormat.setInputPaths(job, in);
			FileOutputFormat.setOutputPath(job, out);
			
			job.setMapperClass(Map.class);
			job.setReducerClass(Reduce.class);
			job.setInputFormatClass(TextInputFormat.class);
			job.setOutputFormatClass(TextOutputFormat.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);
			
			System.exit(job.waitForCompletion(true)?0:1);
			return 0;
		}
		
		public static void main(String[] args) throws Exception {
	 	      int res = ToolRunner.run(new Configuration(), new WordCount(), args);
	 	      System.exit(res);
	    }
	}

看过源代码后,使用java命令将WordCount打包为jar文件,分别执行如下的命令:

[gpadmin@pgmdw ~]$ mkdir wordcount_classes
[gpadmin@pgmdw ~]$ javac -classpath ${HADOOP_HOME}/hadoop-core-1.2.1.jar -d wordcount_classes WordCount.java
[gpadmin@pgmdw ~]$ jar -cvf wordcount.jar -C wordcount_classes/ .

在HDFS创建存放输入文件的目录:

[gpadmin@pgmdw ~]$ hadoop fs -mkdir input
[gpadmin@pgmdw ~]$ hadoop fs -ls
Found 1 items
drwxr-xr-x   - gpadmin supergroup          0 2014-03-11 16:22 /user/gpadmin/input

使用cat选项查看hello文件的内容:

[gpadmin@pgmdw ~]$ hadoop fs -cat input/hello
hello master hello master hello slave bye

运行WordCount作业:

[gpadmin@pgmdw ~]$ hadoop jar wordCount.jar org.learning.WordCount input/hello output

查看输出结果:

[gpadmin@pgmdw ~]$ hadoop fs -cat output/part-r-00000
bye     1
hello   3
master  2
slave   1
应用程序可以使用-files选项指定用逗号分隔的路径列表,该路径将出现在任务的当前工作目录中,-libjars允许应用程序向map和reduce任务的classpath添加jar文件,-archives允许向应用程序传递逗号分隔的归档文件列表作为参数,这些归档文件是没有解压的,并且在任务的当前工作目录中创建以归档文件名称为名称的链接。下面看看使用这几个选项的WordCount例子:
hadoop jar hadoop-examples.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output
WordCount应用程序是比较直观的。通过job.setInputFormatClass(TextInputFormat.class)指定了map函数每次处理文件中的一行,然后将每行的文本使用空格分隔为若干字符,将每个字符组成键值对<word,1>,这些键值对将作为reduce的输入。Reducer中的reduce函数将每个单词出现的次数进行累加求和。Run方法中指定了作业的方面,比如输入输出路径(作为参数从命令行传递进来),键值的类型,输入输出的格式等


相关内容

    暂无相关文章