MapReduce编程实战之“调试”



本篇内容


在上一篇的“初识”环节,我们已经在本地和Hadoop集群中,成功的运行了几个MapReduce程序,对MapReduce编程,已经有了最初的理解。

在本篇文章中,我们对MapReduce编程进行进一步的了解,包括:配置API、辅助类、调试手段、调优手段。

总体来说,我个人的理解是:

(1)本地开发阶段,对于Eclipse开发MapReduce程序来说,是不需要任何插件的,和开发普通的Java程序是一样的,通过DEBUG和单元测试排错;

(2)Hadoop环境测试阶段,也比较困难或者说比较麻烦进行远程调试,经常做的是打印语句,看日志。


配置API和辅助类


配置API


一个Configuration类的实例,代表配置属性及其取值的一个集合。maven项目的src/main/resources下有配置文件conf.xml,内容如下:

<?xml version="1.0"?>
<configuration>
  <property>
    <name>color</name>
    <value>yellow</value>
    <description>Color</description>
  </property>
  <property>
    <name>size</name>
    <value>10</value>
    <description>Size</description>
  </property>
  <property>
    <name>weight</name>
    <value>heavy</value>
    <final>true</final>
    <description>Weight</description>
  </property>
  <property>
    <name>size-weight</name>
    <value>${size},${weight}</value>
    <description>Sizeandwelght</description>
  </property>
</configuration>

如下代码,可以读取配置文件的内容:

import org.apache.hadoop.conf.Configuration;

public class ConfTest {
	public static void main(String[] args) {
		Configuration conf = new Configuration();
		conf.addResource("conf.xml");
		//注意:系统属性的优先级高于源文件中设置的属性,前提是size在conf.xml中有设置,否则就是null了
		//对于这样写的,也可以用JVM参数 -Dproperty=value进行重新设置
		System.setProperty("size", "15");	
		
		System.out.println(conf.getInt("size", 0));	// 输出10
		System.out.println(conf.get("weight"));
		System.out.println(conf.get("size-weight"));	//输出15,heavy
	}
}

辅助类GenericOptionsParser/Tool/ToolRunner


GenericOptionsParser是一个类,用来解释常用的Hadoop命令行选项,根据需要,为Configuration对象设置相应的取值。通常不直接使用它,而是使用继承自它的接口Tool:实现Tool接口,通过ToolRunner来运行程序,ToolRunner内部调研GenericOptionsParser,如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ConfTest extends Configured implements Tool {

	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new ConfTest(), args);
		System.exit(exitCode);
	}

	public int run(String[] arg0) throws Exception {
		Configuration conf = getConf();
		conf.addResource("conf.xml");
		System.out.println(conf.getInt("size", 0));
		System.out.println(conf.get("weight"));
		return 0;
	}
}

在Hadoop集群中,我们执行命令:hadoop jar test.jar ConfTest -Dsize=188,会看到屏幕输出的是188。

我们也可以指定一个配置文件:hadoop jar test.jar -conf conf/xx.xml 。



本地测试


在开发阶段保证MapReduce逻辑正确,比较常用的是写单元测试代码。

或者直接在Eclipse里面设置断点,进行DEBUG,会更加高效和直观。

如果MapReduce在Windows环境的Eclipse中不能运行的话,请参照这里:

http://blog.csdn.net/puma_dong/article/details/23711103#t3


在集群上测试


hadoop集群是分布式的,可能有成百上千的机器,在机器中进行作业调试是很困难的。一般来说,比较经典的办法是通过打印语句来调试程序。

我们把错误信息记录到标准错误中,同时更新任务状况(用reporter.setStatus()方法,但是这个功能在Hadoop2.2中貌似无效了),然后,在Web UI中,可以比较方便的看到这个错误日志。


用例程序代码


import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class MaxTemperatureReporter {

	public static void main(String[] args) throws Exception {
		JobConf conf = new JobConf(MaxTemperatureReporter.class);
		conf.setJobName("Max Temperature");

		// FileInputFormat.addInputPaths(conf, new Path(args[0]));
		// FileOutputFormat.setOutputPath(conf, new Path(args[1]));

		FileInputFormat.setInputPaths(conf, new Path("/test/input/t"));
		FileOutputFormat.setOutputPath(conf, new Path("/test/output/t"));

		conf.setMapperClass(MaxTemperatureMapperReporter.class);
		conf.setReducerClass(MaxTemperatureReduceReporter.class);

		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(IntWritable.class);

		JobClient.runJob(conf);
	}
}

class MaxTemperatureMapperReporter extends MapReduceBase implements
		Mapper<LongWritable, Text, Text, IntWritable> {
	private static final int MISSING = 9999;

	enum Temperature {
		OVER_10
	}

	public void map(LongWritable key, Text value,
			OutputCollector<Text, IntWritable> output, Reporter reporter)
			throws IOException {
		String line = value.toString();
		String year = line.substring(15, 19);
		int airTemperature;
		if (line.charAt(87) == '+') {
			airTemperature = Integer.parseInt(line.substring(88, 92));
		} else {
			airTemperature = Integer.parseInt(line.substring(87, 92));
		}
		try {
			Thread.sleep(100); // 让程序运行的慢一点,可以看到运行过程
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		if (airTemperature > 100) {
			System.err.println("Temperature over 10 degrees for input: "
					+ value);
			// 关于这个的作用,在Hadoop2.2的WebUI中貌似无效了
			reporter.setStatus("Detected possibly corrupt record: see logs.");
			// 这个就是符合某种情况的数据计数器,在WebUI可以看到统计
			reporter.incrCounter(Temperature.OVER_10, 1);
		}
		String quality = line.substring(92, 93);
		if (airTemperature != MISSING && quality.matches("[01459]")) {
			output.collect(new Text(year), new IntWritable(airTemperature));
		}
	}
}

class MaxTemperatureReduceReporter extends MapReduceBase implements
		Reducer<Text, IntWritable, Text, IntWritable> {
	public void reduce(Text key, Iterator<IntWritable> values,
			OutputCollector<Text, IntWritable> output, Reporter reporter)
			throws IOException {
		int maxValue = Integer.MIN_VALUE;
		while (values.hasNext()) {
			maxValue = Math.max(maxValue, values.next().get());
		}
		output.collect(key, new IntWritable(maxValue));

	}
}


在Hadoop集群运行


运行命令:hadoop jar test.jar MaxTemperatureReporter

启动时的截图:


MapReduce执行完毕时的截图:


在MapReduce的执行过程中,也可以通过命令:

hadoop job -counter job_1397897643076_0010 'MaxTemperatureMapperReporter$Temperature' OVER_10 

即时查看我们定义的计数器的情况,但是最直观的情况,还是通过WebUI查看。


WebUI即时查看Job运行状况


WebUI地址:http://master:8088/cluster 。

(1)WebUI首页截图:


(2)Job列表界面截图:


(3)Job信息界面截图:


(4)Task列表界面截图:


(5)Task计数器界面截图:


(6)Task Attempts界面截图:


(7)Task Logs首页截图:


(8)Task错误日志截图:



集群测试小结


我的环境是在2个PC上分别作了2个RHEL6.2虚拟机,6600行数据,在集群上运行了10次,每次Hadoop都是在相同的节点上启动两个Map任务,同一个节点上启动一个Reduce任务,有时在node1上(物理机A的虚拟机),有时在nod2上(物理机B的虚拟机)。

当Job运行完毕后,点击history连接,会报错误连接,尚不知道原因。


远程调试


这种调试手段是通过设置一些属性,找到要进行处理的节点的task attempt,启动IsolationRunner,等待Eclipse连接调试。

这种手段稍后详细讲解。


作业调优


作业调优检查表




大部分的MapReduce作业,都是I/O密集型的,优化代码的CPU性能是没有意义的,为了保证所有调整都是有效的,应该在实际集群上对比新老执行时间。这实际也是很困难的,因为作业执行时间会随着其他作业的资源争夺和调度器决定的任务顺序不同而发生改变。为了在这类情况下得到较短的作业执行时间,必须不断运行(改变代码或不改变代码),并检查是否有明显的改进。

另外还有一种HPROF分析工具,也可以使用。


Map的数量


最终分片决定了Map的个数。Map任务的个数也能通过使用JobConf 的 conf.setNumMapTasks(int num)方法来手动地设置。

这个方法能够用来增加map任务的个数,但是不能设定任务的个数小于Hadoop系统通过分割输入数据得到的值。


Reduce的数量


默认情况下,只有一个reducer,因此,也就只有一个分区,在这种情况下,partitioner操作将由于所有数据都已放入同一个分区而无关紧要了。

如果有很多reducer,了解HashPartitioner的作用就非常重要。假设键的散列函数足够好,那么记录将被均匀分到若干个reduce任务中,这样,具有相同键的记录将由同一个reduce任务进行处理。

单个reducer的默认配置对Hadoop新手而言很容易上手。真实的应用中,作业都把他设置成一个较大的数字,否则由于所有的中间数据都会放到一个reducer任务中,从而导致作业效率极低。

注意,在本地作业运行骑上运行时,只支持0个或1个reducer。

reducer最优个数与集群中可用的reducer任务槽数相关。总槽数有集群中节点数与每个节点的任务槽数相乘得到。该值由mapred.tasktracker.reduce.tasks.maxinum属性的值决定。

一个常用的方法是:设置比总槽数稍微少一些的reducer数,这回给reducer任务留有余地(容忍一些错误发生而不需要延长作业运行时间)。

如果reduce任务很大,比较明智的做法是使用更多的reducer,使得任务粒度更小,这样一来,任务的失败才不至于显著影响作业执行时间。

每个节点的任务槽数,就是指一个TaskTracker能够同时运行最多多少个map/reduce任务,默认是2,一般设为CPU个数的1-2倍。


内存/任务


在默认情况下,Hadoop为各个守护进程分配1000MB(1GB)内存。该值由hadoop-env.sh文件的HADOOP_HEAPSIZE参数控制。

此外,TaskTracker启动独立的子JVM以运行map和reduce任务。

因此,计算一个工作机器的最大内存需求时,需要综合考虑上述因素。

一个TaskTracker能够同时运行最多多少个map任务,由mapred.tasktracker.map.tasks.maximum属性控制,默认值是2个任务。

相应的,一个TaskTracker能够同时运行的最多reduce任务数由mapred.tasktracker.reduce.tasks.maximum属性控制,默认值也是2。

分配给每个子JVM的内存量由mapred.child.java.opts属性决定,默认值是-Xmx200m,表示每个任务分配200M内存。

综上所述,在默认情况下,一个工作机器会占用2800MB内存。

在一个TaskTracker上能够同时运行的任务数取决于一台机器有多少个处理器。

由于MapReduce作业通常是I/O-bound,因此将任务数设定为超出处理器数也有一定道理,能够获得更好的利用率。

至于到底需要运行多少个任务,则历来与相关作业的CPU使用情况,但经验法则则是(包括map和reduce任务)与处理器数的比值最好是1和2之间。

例如,假设客户拥有8个处理器,并计划在各处理器上分别跑2个进程,则可以将mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum的值分别设置7(考虑到还有datanode和tasktracker这两个进程,这两项值不可设置为8)。总内存开销井高达7600MB。


对于配备8GB物理内存的机器,该Java内存分配方案是否合理还取决于同时运行在这台机器上的其他进程。如果这台机器还运行着Streaming和Pipes程序等,由于无法为这些进程分配足够内存,这个分配方案并不合理(而且分配到子节点的内存将会减少)。此时,各进程在系统中不断切换,导致服务性能恶化。

精准的内存设置季度依赖于集群自身的特性。

可用使用一些工具监控集群的内存使用情况,以优化分配方案。例如Ganglia。

对于主节点来说,namenode、second namenode和jobtracker守护进程在默认情况下各使用1000MB内存,所以总计3000MB。

相关内容