Hadoop-基于DistributedCache的复制联结


基于DistributedCache的复制联结(其中一个连接表必须小到可以放到内存中)

public class DataJoinDC extends Configured implements Tool {

 public static class MapClass extends MapReduceBase
  implements Mapper{
  ...
 }
 public int run(String[] args) throws Exception{
  ...
 }
 public static void main(String[] args) throws Exception {
  int res = ToolRunner.run(new Configuration(),
     new DataJoinDC(),
     args);
  System.exit(res);
 }
}
去掉reduce类,在map阶段执行联结,并将配置该作业为没有reducer。
public int run(String[] args) throws Exception {
 Configuration conf = getConf();
 JobConf job = new JobConf(conf,DataJoinDC.class);

 DistributedCache.addCacheFile(new Path(args[0]).toUri(),conf);

 Path in = new Path(args[1]);//因为仅添加一个文件到分布式缓存中,所以数组长度为1
 Path out = new Path(args[2]);
 FileInputFormat.setInputPaths(jon,in);
 FileOutputFormat.setOutputPath(job,out);

 job.setJobName("DataJoin with DistributedCache");
 job.setMapperClass(MapClass.class);
 job.setNumReduceTasks(0);

 job.setInputFormat(KeyValueTextInputFormat.class);
 job.setOutputFormaty(TextOutputFormat.class);

 job.set("key.value.separator.in.input.line",",");
 JobClient.runJob(job);
 return 0;
}

Mapper接口有方法:map(),configure()和close()
当最初实例化MapClass时,调用configure()方法,而在mapper结束处理其分片时,调用close()方法。
MapReduceBase类为这些方法提供默认的no-op实现。
重写configure(),在mapper第一次初始化时,将连接的数据加载到内存中。(每次调用map()处理一条新纪录时,都可以获得这个数据、)

public static class MapClass extends MapReduceBase
 implements Mapper {

 private Hashtable joinData =
    new Hashtable();//将文件读入名为joinDate的java散列表中,可在mapper的整个、生命周期中获得

 @Override
 public void configure(JobConf conf) {
 try {
 Path [] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
 if (cacheFiles != null && cacheFiles.length >0 ){
  String line;
  String [] tokens;
  
  BufferedReader joinReader = new BufferedReader(
    new FileReader(cacheFiles[0].toString()));

  try {
   while((line = joinReader.readLine())!=null){
    tokens = line.split(",",2);
    joinData.put(tokens[0],tokens[1]);
   }
  }finally{
   joinReader.close();
  }
    }
 }catch(IOException e){
  System.err.println("Exception reading DistributedCache: "+e);
 }
 }
 public void map(Text key,Text value,
   OutputCollector output,
   Reporter reporter) throws IOException{

    String joinValue = joinData.get(key);
    if(joinValue != null){
    output.collect(key,new Text(value.toString()+","+joinValue));//直接输出到hdfs中
    }
 }
}
使用DistributedCache时,可能会出现背景数据(数据连接中较小的数据源)存储于客户端的
本地文件系统中,而不是在hdfs中。

解决方法:调用DistributedCache.addCacheFile()之前将客户端上的本地文件上载到hdfs中。
该过程可在GenericOptionsParser中,通过命令行参数得到支持
bin/hadoop jar -files small_in.txt DataJoinDC.jar big_in.txt output
-files 自动复制一组以逗号分隔的文件到所有的任务节点上
则就不需要调用addCacheFile()方法,也不用将较小数据源的文件名作为参数之一。
参数的索引也移动了。
Path in = new Path(args[0]);
Path in = new Path(args[1]);
这样DistributedCache联结程序就可以让客户端计算机上的本地文件成为一个输入源。

相关内容