Hadoop-2.4.1学习之Mapper和Reducer,hadoopmapper


       MapReduce允许程序员能够容易地编写并行运行在大规模集群上处理大量数据的程序,确保程序的运行稳定可靠和具有容错处理能力。程序员编写的运行在MapReduce上的应用程序称为作业(job),Hadoop既支持用Java编写的job,也支持其它语言编写的作业,比如Hadoop Streaming(shell、python)和Hadoop Pipes(c++)。Hadoop-2.X不再保留Hadoop-1.X版本中的JobTracker和TaskTracker组件,但这并不意味着Hadoop-2.X不再支持MapReduce作业,相反Hadoop-2.X通过唯一的主ResourceManager、每个节点一个的从NodeManager和每个应用程序一个的MRAppMaster保留了对MapReduce作业的向后兼容。在新版本中MapReduce作业依然由Map和Reduce任务组成,Map依然接收由MapReduce框架将输入数据分割为数据块,然后Map任务以完全并行的方式处理这些数据块,接着MapReduce框架对Map任务的输出进行排序,并将结果做为Reduce任务的输入,最后由Reduce任务输出最终的结果,在整个执行过程中MapReduce框架负责任务的调度,监控和重新执行失败的任务等。

      通常计算节点和存储节点是相同的,MapReduce框架会有效地将任务安排在存储数据的节点上,有助于降低传输数据时的带宽使用量。MapReduce应用程序通过实现或者继承合适的接口或类提供了map和reduce函数,这两个函数负责Map任务和Reduce任务。作业客户端将编写好的作业提交给ResourceManager,而不再是JobTracker,ResourceManager负责将作业分布到从节点上,调度和监控作业,为作业客户端提供状态和诊断信息。

      MapReduce框架只处理<key, value>键值对,也就是将作业的输入视为一些键值对并输出键值对。做为键值的类必须可以被MapReduce框架序列化,因此需要实现Writable接口,常用的IntWritable,LongWritable和Text都是实现该接口的类。做为键的类除了要实现Writable接口外,还需要实现WritableComparable接口,实现该接口主要为了有助于排序,上面提到的三个类也都实现了该接口。

      在简要介绍了MapReduce框架后,下面深入学习框架中的两个重要概念:Mapper和Reducer,正如上文提到了,它们组成了MapReduce作业并负责完成实际的业务逻辑处理。

      Mapper是独立的任务,将输入记录转换为中间记录,即对输入的键值对进行处理,并输出为一组中间键值对,输出的键值对使用context.write(WritableComparable, Writable)方法收集起来,中间记录的键值类型不必与输入记录的键值类型相同,实际上也往往是不同的。一条输入记录经由Mapper处理后可能输出为0条或者多条中间记录。比如,如果输入记录不满足业务要求(没有包含特定的值或者包含了特定的值)的话,可以直接返回,则会输出0条记录,此时Mapper起了过滤器的作用。

      接着MapReduce框架将与给定键相关联的所有中间值分组,然后传递给Reducer。用户可以通过Job.setGroupingComparatorClass(Class)方法指定Comparator来控制分组。Mapper的输出被排序然后按照Reducer分区,总的分区数与作业启动的Reducer任务数相同,程序员可以通过实现自定义的Partitioner控制输出的记录由哪个Reducer处理,默认使用的是HashPartitioner。程序员还可以通过Job.setCombinerClass(Class)指定一个combiner来执行中间输出的本地聚合,这有助于减少Mapper到Reducer的数据传输。Mapper的中间输出经过排序后总是保存为(key-len, key,value-len, value)的格式,应用程序可以通过Configuration控制是否将中间输出进行压缩,以及使用何种压缩方式,相关的几个参数有:mapreduce.map.output.compress、mapreduce.map.output.compress.codec。程序员通过Job.setMapperClass(Class)将Mapper传递给Job,MapReduce框架调用Mapper的map(WritableComparable, Writable, Context)处理该任务的价值对,应用程序可以覆盖cleanup(Context)方法实现任何需要的清理工作。

      MapReduce框架为每个由作业的InputFormat生成的InputSplit启动一个map任务,因此总的map任务数量由输入数据大小决定,更准确说是由输入文件总的块数决定。虽然可以为较少使用CPU的map任务在节点上设置300个map任务,但每个节点更适合并行运行10-100个map任务。由于任务的启动需要花费一些时间,所以任务的运行最好至少需要1分钟,因为如果任务运行的时间很少,整个作业的时间将大部分消耗在任务的建立上面。

      Reducer将具有相同键的一组中间值降低为一组更小数量的值,比如合并单词的数量等。一个作业启动的Reducer数量可以通过Job.setNumReduceTasks(int)或者mapred-site.xml中的参数mapreduce.job.reduces设置,但是更推荐前者,因为可以由程序员决定启动多少个reducer,而后者更多的是提供了一种默认值。程序员使用Job.setReducerClass(Class)将Reducer提交给作业,MapReduce框架为每对<key, (list of values)>调用reduce(WritableComparable, Iterable<Writable>, Context)方法,同Mapper一样,程序员也可以覆盖cleanup(Context)方法指定需要的清理工作。

      Reducer的处理过程主要包括三个阶段:shuffle(洗牌)、sort(分类)和reduce。在shuffle阶段,MapReduce框架通过HTTP获取所有Mapper输出的相关分区。在Sort阶段,框架根据键分组Reducer的输入(不同的mapper可能输出相同的键)。Shuffle和sort是同时进行的,获取Mapper的输出后然后合并它们。在reduce阶段,调用reduce(WritableComparable, Iterable<Writable>处理<key, (list of values)>对。Reducer的输出通常通过Context.write(WritableComparable,Writable)写入文件系统,比如HDFS,当然也可以通过使用DBOutputFormat将输出写入数据库。Reducer的输出是未经排序的。

      如果不需要Reducer,可以使用Job.setNumReduceTasks(int)将Reducer的数量设置为0(如果不使用该方法设置Reducer的数量,由于mapreduce.job.reduces默认为1,会启动一个Reducer),在这种情况下,Mapper的输出将直接写入FileOutputFormat.setOutputPath(Job,Path)指定的路径中,并且MapReduce框架不会对Mapper的输出进行排序。

      如果在进行reduce之前想使用与分组中间键时不同的比较规则,可以通过Job.setSortComparatorClass(Class)指定不同的Comparator。也就是Job.setGroupingComparatorClass(Class)控制了如何对中间输出分组,而Job.setSortComparatorClass(Class)控制了在将数据传入reduce之前进行的第二次分组。

      不同于Mapper的数量由输入文件的大小确定,Reducer的数量可以由程序员明确设置,那么设置多少Reducer可以达到较好地效果呢?Reducer的数量范围为:(0.95 ~1.75 ) * 节点数量 * 每个节点上最大的容器数。参数yarn.scheduler.minimum-allocation-mb设置了每个容器可请求的最小内存,那么最大容器数可根据总的内存除以该参数计算得出。当使用0.75时,所有的Reducer会被立即加载,并当Mapper完成时开始传输Mapper的输出。使用1.75时,较快的节点将完成它们第一轮的任务,然后加载第二波任务,这样对负载平衡具有更好的效果。增加Reducer的数量虽然增加了框架开销,但增加了负载平衡和降低了失败的成本。上面的比例因子比总的Reducer数量稍微少些,以为预测执行的任务和失败的任务保留少量的Reducer槽,也就是实际的Reducer数量为上面公式得出的数量加上保留的Reducer数量。


hadoop怎跟踪看mapper与reducer的值

hadoop的mapreduce任务是在集群环境下跑的,所以调试存在一定的复杂度,但是貌似还是可以使用debug的,但是具体方式我没有实现,只是看到什么资料都有介绍。
如果只是想调试mapper和reducer的输入输出是否正确可以使用mrunit进行调试
 

一些hadoop上面 运行的java程序,要有编好mapper,跟reducer 函数我不会编在eclipse里面运行

package com.jeff.preDeal;

import java.io.IOException;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class RandomMap extends Configured implements Tool {

public static class RandomMapper extends Mapper<Object, Text, Text, Text> {

private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
Random random =new Random();
word.set(Integer.toString(random.nextInt(100)));
context.write(word, value);
}

}

@Override
public int run(String[] arg0) throws Exception {
if (arg0.length != 2) {
System.err.println("Usage: pre-process url <in> <out>");
System.exit(2);
}
Job job = new Job(this.getConf(), "url run");
job.setJarByClass(RandomMap.class);
job.setMapperClass(RandomMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1)......余下全文>>
 

相关内容