Java MapReduce详解--(3),mapreduce详解


如果Hadoop命令是以类名作为第一个参数,它就会启动一个JVM来运行这个类。使用命令比直接使用Java更方便,因为前者把类的路径(及其依赖关系)加入Hadoop的库中,并获得Hadoop的配置。要添加应用程序类的路径,我们需要定义一个HADOOP_CLASSPATH环境变量,Hadoop脚本会来执行相关操作。

注意:以本地(独立)模式运行时,本书所有程序希望都以这种方式来设置HADOOP_CLA-SSPATH。命令必须在示例代码所在的文件夹下被运行。

运行作业所得到的输出提供了一些有用的信息。(无法找到作业JAR文件的相关信息是意料之中的,因为我们是在本地模式下没有JAR的情况下运行的。在集群上运行时,不会看到此警告。)例如,我们可以看到,这个作业被给予了一个IDjob_local_0001,并且它运行了一个map任务和一个reduce任务(使用attempt_local_0001_m_000000_0attempt_local_0001_r_000000_0两个ID)。在调试MapReduce作业时,知道作业和任务的ID是非常有用的。

输出的最后一部分叫"计数器"(Counter),显示了在Hadoop上运行的每个作业产生的统计信息。这些对检查处理的数据量是否符合预期非常有用。例如,我们可以遵循整个系统中记录的数目:5map输入产生了5map的输出,然后5reduce输入产生两个reduce输出。

输出被写入output目录,其中每个reducer包括一个输出文件。作业包含一个reducer,所以我们只能找到一个文件,名为part-00000

1.catoutput/part-00000

2.1949 111

3.1950 22

这个结果和我们之前手动寻找的结果一样。我们可以把前面这个结果解释为在1949年的最高气温记录为11.1℃,而在1950年为2.2℃。

新的JavaMapreduce API

Hadoop最新版JavaMapReduce Release 0.20.0API包括一个全新的MapReduce JavaAPI,有时也称为"contextobject"(上下文对象),旨在使API在未来更容易扩展。新的API类型上不兼容以前的API,所以,以前的应用程序需要重写才能使新的API发挥其作用。

新的API和旧的API之间有下面几个明显的区别。

新的API倾向于使用抽象类,而不是接口,因为这更容易扩展。例如,你可以添加一个方法(用默认的实现)到一个抽象类而不需修改类之前的实现方法。在新的API中,MapperReducer是抽象类。

新的API是在org.apache.hadoop.mapreduce(和子包)中的。之前版本的API则是放在org.apache.hadoop.mapred中的。

新的API广泛使用context object(上下文对象),并允许用户代码与MapReduce系统进行通信。例如,MapContext基本上充当着JobConfOutputCollectorReporter的角色。

新的API同时支持""""式的迭代。在这两个新老API中,键/值记录对被推mapper中,但除此之外,新的API允许把记录从map()方法中拉出,这也适用于reducer""式的一个有用的例子是分批处理记录,而不是一个接一个。

新的API统一了配置。旧的API有一个特殊的JobConf对象用于作业配置,这是一个对于Hadoop通常的Configuration对象的扩展(用于配置守护进程,请参见5.1)。在新的API中,这种区别没有了,所以作业配置通过Configuration来完成。

作业控制的执行由Job类来负责,而不是JobClient,它在新的API中已经荡然无存。

2-6使用新API重写了MaxTemperature的代码,不同之处用黑体字突出显示。

2-6:使用新的context object(上下文对象)MapReduce API在气象数据集中查找最高气温

1.public class NewMaxTemperature {

2.static class NewMaxTemperatureMapper

3.extends Mapper<LongWritable,Text, Text, IntWritable> {

4.

5.private static final int MISSING = 9999;

6.

7.public void map(LongWritable key, Text value,Context context)

8.throws IOException, InterruptedException {

9.

10. String line =value.toString();

11. String year =line.substring(15, 19);

12. int airTemperature;

13. if (line.charAt(87) == '+') { // parseInt doesn't like

14. leading plus signs

15. airTemperature =Integer.parseInt(line.substring(88, 92));

16. } else {

17. airTemperature =Integer.parseInt(line.substring(87, 92));

18. }

19. String quality =line.substring(92, 93);

20. if (airTemperature !=MISSING &&quality.matches("[01459]")){

21. context.write(new Text(year), new

22. IntWritable(airTemperature));

23. }

24. }

25.}

26.

27.static class NewMaxTemperatureReducer

28. extends Reducer<Text, IntWritable, Text,IntWritable> {

29.

30. public void reduce(Text key, Iterable<IntWritable> values,

31. Context context)

32. throws IOException, InterruptedException {

33.

34. int maxValue = Integer.MIN_VALUE;

35. for (IntWritable value : values) {

36. maxValue =Math.max(maxValue, value.get());

37. }

38. context.write(key, new IntWritable(maxValue));

39. }

40.}

41.

42.public static void main(String[] args) throws Exception {

43. if (args.length != 2) {

44. System.err.println("Usage: NewMaxTemperature <input path>

45. <output path>");

46. System.exit(-1);

47. }

48.

49. Job job =new Job();

50. job.setJarByClass(NewMaxTemperature.class);

51.

52. FileInputFormat.addInputPath(job, new Path(args[0]));

53. FileOutputFormat.setOutputPath(job, new Path(args[1]));

54.

55. job.setMapperClass(NewMaxTemperatureMapper.class);

56. job.setReducerClass(NewMaxTemperatureReducer.class);

57.

58. job.setOutputKeyClass(Text.class);

59. job.setOutputValueClass(IntWritable.class);

60.

61. System.exit(job.waitForCompletion(true) ? 0 : 1);

62.

63. }

64.} 

相关内容