DistributedCache会跟踪修改缓存文件的timestamp。

下面是使用的例子, 为应用app设置缓存

  1. $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat     
  2. $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip     
  3. $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar   
  4. $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar   
  5. $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz   
  6. $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz   

2. 设置app的jobConf:

  1. JobConf job = new JobConf();   
  2. DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),    
  3.  job);   
  4. DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);  
  5. DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);   
  6. DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);   
  7. DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);   
  8. DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);   

3. 在mapper或者reducer中使用缓存文件:

  1. public static class MapClass extends MapReduceBase     
  2. implements Mapper<K, V, K, V> {   
  3.      
  4. private Path[] localArchives;   
  5. private Path[] localFiles;   
  6.            
  7. public void configure(JobConf job) {   
  8. // 得到刚刚缓存的文件   
  9. localArchives = DistributedCache.getLocalCacheArchives(job);   
  10. localFiles = DistributedCache.getLocalCacheFiles(job);   
  11. }   
  12.            
  13. public void map(K key, V value,    
  14.  OutputCollector<K, V>; output, Reporter reporter)    
  15. throws IOException {   
  16. // 使用缓存文件   
  17. // ...   
  18. // ...   
  19. output.collect(k, v);   
  20. }   
  21. }   

它跟GenericOptionsParser的部分功能有异曲同工之妙。

PathFilter + 通配符。accept(Path path)筛选path是否通过。

NullWritable

不想输出的时候,把它当做key。NullWritable是Writable的一个特殊类,序列化的长度为0,实现方法为空实现,不从数据流中读数据,也不写入数据,只充当占位符,如在MapReduce中,如果你不需要使用键或值,你就可以将键或值声明为NullWritable,NullWritable是一个不可变的单实例类型。

FileInputFormat继承于InputFormat

InputFormat的作用:

验证输入规范;

切分输入文件为InputSpilts;

提供RecordReader来收集InputSplit中的输入记录,给Mapper进行执行。

RecordReader

将面向字节的InputSplit转换为面向记录的视图,供Mapper或者Reducer使用运行。因此假定处理记录的责任界限,为任务呈现key-value。

SequenceFile:

SequenceFile是包含二进制kv的扁平文件(序列化)。它提供Writer、Reader、Sorter来进行写、读、排序功能。基于CompressionType,SequenceFile有三种对于kv的压缩方式:

●Writer:不压缩records;

RecordCompressWriter: 只压缩values;

BlockCompressWriter: 压缩records,keys和values都被分开压缩在block中,block的大小可以配置;

压缩方式由合适的CompressionCodec指定。推荐使用此类的静态方法createWriter来选择格式。Reader作为桥接可以读取以上任何一种压缩格式。

CompressionCodec:

封装了关于流式压缩/解压缩的相关方法。

Mapper

Mapper 将输入的kv对映射成中间数据kv对集合。Maps 将输入记录转变为中间记录,其中被转化后的记录不必和输入记录类型相同。一个给定的输入对可以映射为0或者多个输出对。

在MRJob执行过程中,MapReduce框架根据提前指定的InputFormat(输入格式对象)产生InputSplit(输入分片),而每个InputSplit将会由一个map任务处理。

总起来讲,Mapper实现类通过JobConfigurable.configure(JobConf)方法传入JobConf对象来初始化,然后在每个map任务中调用map(WritableComparable,Writable,OutputCollector,Reporter)方法处理InputSplit的每个kv对。MR应用可以覆盖Closeable.close方法去处理一些必须的清理工作。

输出对不一定和输入对类型相同。一个给定的输入对可能映射成0或者很多的输出对。输出对是框架通过调用OutputCollector.colect(WritableComparable,Writable)得到。

MR应用可以使用Reporter汇报进度,设置应用层级的状态信息,更新计数器或者只是显示应用处于运行状态等。

所有和给定的输出key关联的中间数据都会随后被框架分组处理,并传给Reducer处理以产生最终的输出。用户可以通过JobConf.setOutputKeyComparatorClass(Class)指定一个Comparator控制分组处理过程。

Mapper输出都被排序后根据Reducer数量进行分区,分区数量等于reduce任务数量。用户可以通过实现自定义的Partitioner来控制哪些keys(记录)到哪个Reducer中去。

此外,用户还可以指定一个Combiner,调用JobConf.setCombinerClass(Class)来实现。这个可以来对map输出做本地的聚合,有助于减少从mapper到reducer的数据量。

经过排序的中间输出数据通常以一种简单的格式(key-len,key,value-len,value)存储在SequenceFile中。应用可以决定是否或者怎样被压缩以及压缩格式,可以通过JobConf来指定CompressionCodec.

如果job没有reducer,那么mapper的输出结果会不经过分组排序,直接写进FileSystem.




相关内容