hadoop中mapreduce的常用类(1)


写这个文章的时候才意识到新旧API是同时存在于1.1.2的hadoop中的。以前还一直纳闷儿为什么有时候是jobClient提交任务,有时是Job...不管API是否更新,下面这些类也还是存在于API中的,经过自己跟踪源码,发现原理还是这些。只不过进行了重新组织,进行了一些封装,使得扩展性更好。所以还是把这些东西从记事本贴进来吧。

关于这些类的介绍以及使用,有的是在自己debug中看到的,多数为纯翻译API的注释,但是翻译的过程受益良多。

GenericOptionsParser

parseGeneralOptions(Options opts, Configuration conf, String[] args)解析命令行参数

GenericOptionsParser是为hadoop框架解析命令行参数的工具类。它能够辨认标准的命令行参数,使app能够轻松指定namenode,jobtracker,以及额外的配置资源或信息等。它支持的功能有:

-conf 指定配置文件;

-D 指定配置信息;

-fs 指定namenode

-jt 指定jobtracker

-files 指定需要copy到MR集群的文件,以逗号分隔

-libjars指定需要copy到MR集群的classpath的jar包,以逗号分隔

-archives指定需要copy到MR集群的压缩文件,以逗号分隔,会自动解压缩

1. String[] otherArgs = new GenericOptionsParser(job, args)

2. .getRemainingArgs();

3. if (otherArgs.length != 2) {

4. System.err.println("Usage: wordcount ");

5. System.exit(2);

6. }

ToolRunner

用来跑实现Tool接口的工具。它与GenericOptionsParser合作来解析命令行参数,只在此次运行中更改configuration的参数。

Tool

处理命令行参数的接口。Tool是MR的任何tool/app的标准。这些实现应该代理对标准命令行参数的处理。下面是典型实现:

  1. public class MyApp extends Configured implements Tool {   
  2.            
  3. public int run(String[] args) throws Exception {   
  4. // 即将被ToolRunner执行的Configuration   
  5. Configuration conf = getConf();   
  6.             
  7. // 使用conf建立JobConf   
  8. JobConf job = new JobConf(conf, MyApp.class);   
  9.         
  10. // 执行客户端参数   
  11. Path in = new Path(args[1]);   
  12. Path out = new Path(args[2]);   
  13.             
  14. // 指定job相关的参数        
  15. job.setJobName("my-app");   
  16. job.setInputPath(in);   
  17. job.setOutputPath(out);   
  18. job.setMapperClass(MyApp.MyMapper.class);   
  19. job.setReducerClass(MyApp.MyReducer.class);   
  20. *   
  21. // 提交job,然后监视进度直到job完成   
  22. JobClient.runJob(job);   
  23. }   
  24.           
  25.  public static void main(String[] args) throws Exception {   
  26. // 让ToolRunner 处理命令行参数    
  27. int res = ToolRunner.run(new Configuration(), new Sort(), //这里封装了GenericOptionsParser解析args   
  28.             
  29. System.exit(res);   
  30. }   
  31. }   

MultipleOutputFormat

自定义输出文件名称或者说名称格式。在jobconf中setOutputFormat(MultipleOutputFormat的子类)就行了。而不是那种part-r-00000啥的了。。。并且可以分配结果到多个文件中。

MultipleOutputFormat继承了FileOutputFormat, 允许将输出数据写进不同的输出文件中。有三种应用场景:

a. 最少有一个reducer的mapreduce任务。这个reducer想要根据实际的key将输出写进不同的文件中。假设一个key编码了实际的key和为实际的key指定的位置

b. 只有map的任务。这个任务想要把输入文件或者输入内容的部分名称设为输出文件名。

c. 只有map的任务。这个任务为输出命名时,需要依赖keys和输入文件名。 

  1. //这里是根据key生成多个文件的地方,可以看到还有value,name等参数   
  2. @Override   
  3. protected String generateFileNameForKeyValue(Text key,   
  4. IntWritable value, String name) {   
  5. char c = key.toString().toLowerCase().charAt(0);   
  6. if (c >= 'a' && c <= 'z') {   
  7. return c + ".txt";   
  8. }   
  9. return "result.txt";   
  10. }   

DistributedCache

在集群中快速分发大的只读文件。DistributedCache是MR用来缓存app需要的诸如text,archive,jar等的文件的。app通过jobconf中的url来指定需要缓存的文件。它会假定指定的这个文件已经在url指定的对应位置上了。在job在node上执行之前,DistributedCache会copy必要的文件到这个slave node。它的功效就是为每个job只copy一次,而且copy到指定位置,能够自动解压缩。

DistributedCache可以用来分发简单的只读文件,或者一些复杂的例如archive,jar文件等。archive文件会自动解压缩,而jar文件会被自动放置到任务的classpath中(lib)。分发压缩archive时,可以指定解压名称如:dict.zip#dict。这样就会解压到dict中,否则默认是dict.zip中。

文件是有执行权限的。用户可以选择在任务的工作目录下建立指向DistributedCache的软链接。

  1. DistributedCache.createSymlink(conf);     
  2. DistributedCache.addCacheFile(new Path("hdfs://host:port/absolute-path#link-name").toUri(), conf);      

DistributedCache.createSymlink(Configuration)方法让DistributedCache 在当前工作目录下创建到缓存文件的符号链接。则在task的当前工作目录会有link-name的链接,相当于快捷方法,链接到expr.txt文件,在setup方法使用的情况则要简单许多。或者通过设置配置文件属性mapred.create.symlink为yes。 分布式缓存会截取URI的片段作为链接的名字。 例如,URI是 hdfs://namenode:port/lib.so.1#lib.so, 则在task当前工作目录会有名为lib.so的链接, 它会链接分布式缓存中的lib.so.1




相关内容