浅谈hadoop中mapreduce的文件分发


最近在做数据分析的时候,需要在mapreduce中调用c语言写的接口,此时就需要把动态链接库so文件分发到hadoop的各个节点上,原来想自己来做这个分发,大概过程就是把so文件放在hdfs上面,然后做mapreduce的时候把so文件从hdfs下载到本地,但查询资料后发现hadoop有相应的组件来帮助我们完成这个操作,这个组件就是DistributedCache,分布式缓存,运用这个东西可以做到第三方文件的分发和缓存功能,下面详解:


如果我们需要在map之间共享一些数据,如果信息量不大,我们可以保持在conf中,但是如果我们需要共享一些配置文件,jar包之类的,此时DistributedCache可以满足我们的需求,使用DistributedCache的步骤如下:

1.正确配置被分发的文件的路径(hdfs上的路径)

2.在自定义的mapper或reducer中获取文件下载到本地后的路径(linux文件系统路径);一般是重写configure或者重写setup(新方式)

3.在自定义的mapper或reducer类中读取这些文件的内容

下面以代码说明三个步骤:

1.Configuration conf = new Configuration();

  DistributedCache.addCacheFile(new URI("/user/tinfo/zhangguochen/libJMeshCalc.so"), conf);

  DistributedCache.addCacheArchive(new URI("/user/tinfo/zhangguochen/libJMeshCalc.zip"),conf);
  DistributedCache.addFileToClassPath(new URI("/user/tinfo/zhangguochen/libMeshCal.jar"), conf);

    或者

    conf.set("mapred.cache.files", "/myapp/file");

   conf.set("mapred.cache. archives", "/mayapp/file.zip");

   以上是配置需要分发的hdfs上的文件,但是前提是这些文件必须在hdfs上存在,看源码可知道DistributedCache的静态方法其实就是封装了conf.set的动作。

2.在自己的mapper类中,使用DistributedCache获取下载到本地的文件,大部分情况下这些操作都是重写configure接口(或者setup),然后把本地文件路径保存在mapper类的成员变量中,供map方法使用,代码如下:

   private Path[] localFiles;

   public void setup(Context context) {

       localFiles = DistributeCache.getLocalCacheFiles(context.getConfiguration());

       for(Path temp:localFiles) {

            String path = temp.toString();//path就是此文件在本地的路径

            if(path.contains("myfileName")) {//获取到自己需要的文件

            }

       }

    }

getLocalCacheFiles返回的是数组(元素类型是Path),数组内容是这个task(map或reduce)所属的job设定的所有需要被分发的文件,如果设置了     多个文件,可以遍历Path数组,用String.contains("KeyWord")来判断是否是你所需要的文件。

   获取压缩包的路径

   private  File[] inputFiles;

   private Path[] localArchives;

   public void setup(Context context) {

       localArchives = DistributeCache.getLocalCacheArvhives();

       for(Path archive : localArchives) {

            if(archive.toString.contains("mytarName")) {//找到自己需要的文件

                 inputFiles = new File(archive.toString()).listFiles();//获取压缩包下的所有 文件

            }

       }

   }

    也可以用DistributedCache将所使用到的第三方jar包加载到classpath中DistributedCache.addFileToClassPath

  

   通过以上代码发现如果要使用这些分发到各个节点上的文件操作比较复杂,DistributedCache也提供一种更方便的用法,即可以为每一个分发的文件创建一个符号链接,然后hadoop就会在当前mapreduce的执行路径下创建一个到源文件的链接,我们就可以在mapreduce中直接使用这些文件,而不必关心这些文件在本地的路径。

  示例:

  1.把文件分发到缓存中

   Configuration conf = new Configuration();

   DistributedCache.createSymlink(conf);//创建符号链接
   DistributedCache.addCacheFile(new URI("/user/tinfo/zhangguochen/file1#myfile"), conf);//加入分布式缓存,myfile是符号


   2.在mapreduce中使用

    public void setup(Context context) {

       File myfile = new File("myfile");//在这里就可以直接通过符号myfile使用此文件

    }

   或者用以下方式:

    conf.set("mapred.cache.files", "/data/data#mData");
     conf.set("mapred.cache.archives", "/data/data.zip#mDataZip");
     conf.set("mapred.create.symlink", "yes"); // 是yes,不是true
     DistributedCache.createSymlink(Configuration)
     在map阶段,只需要File file = new File("mData");即可获得该文件……

以下资料来自网络,如有雷同,纯属意外

DistributedCache是Hadoop提供的文件缓存工具,它能够自动将指定的文件分发到各个节点上,缓存到本地,供用户程序读取使用。它具有以下几个特点:缓存的文件是只读的,修改这些文件内容没有意义;用户可以调整文件可见范围(比如只能用户自己使用,所有用户都可以使用等),进而防止重复拷贝现象;按需拷贝,文件是通过HDFS作为共享数据中心分发到各节点的,且只发给任务被调度到的节点。本文将介绍DistributedCache在Hadoop 1.0和2.0中的使用方法及实现原理。

Hadoop DistributedCache有以下几种典型的应用场景:1)分发字典文件,一些情况下Mapper或者Reducer需要用到一些外部字典,比如黑白名单、词表等;2)map-side join:当多表连接时,一种场景是一个表很大,一个表很小,小到足以加载到内存中,这时可以使用DistributedCache将小表分发到各个节点上,以供Mapper加载使用;3)自动化软件部署:有些情况下,MapReduce需依赖于特定版本的库,比如依赖于某个版本的PHP解释器,一种做法是让集群管理员把这个版本的PHP装到各个机器上,这通常比较麻烦,另一种方法是使用DistributedCache分发到各个节点上,程序运行完后,Hadoop自动将其删除。


Hadoop提供了两种DistributedCache使用方式,一种是通过API,在程序中设置文件路径,另外一种是通过命令行(-files,-archives或-libjars)参数告诉Hadoop,个人建议使用第二种方式,该方式可使用以下三个参数设置文件:

(1)-files:将指定的本地/hdfs文件分发到各个Task的工作目录下,不对文件进行任何处理;

(2)-archives:将指定文件分发到各个Task的工作目录下,并对名称后缀为“.jar”、“.zip”,“.tar.gz”、“.tgz”的文件自动解压,默认情况下,解压后的内容存放到工作目录下名称为解压前文件名的目录中,比如压缩包为dict.zip,则解压后内容存放到目录dict.zip中。为此,你可以给文件起个别名/软链接,比如dict.zip#dict,这样,压缩包会被解压到目录dict中。

(3)-libjars:指定待分发的jar包,Hadoop将这些jar包分发到各个节点上后,会将其自动添加到任务的CLASSPATH环境变量中。

hadoop jar xxx.jar -files hdfs://xxx/xx

hadoop jar xxx.jar -libjars hdfs://xxx/xxx.jar,hdfs://xxx/xx2.jar


前面提到,DistributedCache分发的文件是有可见范围的,有的文件可以只对当前程序可见,程序运行完后,直接删除;有的文件只对当前用户可见(该用户所有程序都可以访问);有的文件对所有用户可见。DistributedCache会为每种资源(文件)计算一个唯一ID,以识别每个资源,从而防止资源重复下载,举个例子,如果文件可见范围是所有用户,则在每个节点上,第一个使用该文件的用户负责缓存该文件,之后的用户直接使用即可,无需重复下载。那么,Hadoop是怎样区分文件可见范围的呢?

在Hadoop 1.0版本中,Hadoop是以HDFS文件的属性作为标识判断文件可见性的,需要注意的是,待缓存的文件即使是在Hadoop提交作业的客户端上,也会首先上传到HDFS的某一目录下,再分发到各个节点上的,因此,HDFS是缓存文件的必经之路。对于经常使用的文件或者字典,建议放到HDFS上,这样可以防止每次重复下载,做法如下:

比如将数据保存在HDFS的/dict/public目录下,并将/dict和/dict/public两层目录的可执行权限全部打开(在Hadoop中,可执行权限的含义与linux中的不同,该权限只对目录有意义,表示可以查看该目录中的子目录),这样,里面所有的资源(文件)便是所有用户可用的,并且第一个用到的应用程序会将之缓存到各个节点上,之后所有的应用程序无需重复下载,可以在提交作业时通过以下命令指定:

-files hdfs:///dict/public/blacklist.txt, hdfs:///dict/public/whilelist.txt

如果有多个HDFS集群可以指定namenode的对外rpc地址:

-files hdfs://host:port/dict/public/blacklist.txt, hdfs://host:port/dict/public/whilelist.txt

DistributedCache会将blacklist.txt和whilelist.txt两个文件缓存到各个节点的一个公共目录下,并在需要时,在任务的工作目录下建立一个指向这两个文件的软连接。

如果可执行权限没有打开,则默认只对该应用程序的拥有者可见,该用户所有应用程序可共享这些文件。

一旦你对/dict/public下的某个文件进行了修改,则下次有作业用到对应文件时,会发现文件被修改过了,进而自动重新缓存文件。

对于一些频繁使用的字典,不建议存放在客户端,每次通过-files指定,这样的文件,每次都要经历以下流程:上传到HDFS上—》缓存到各个节点上—》之后不再使用这些文件,直到被清除,也就是说,这样的文件,只会被这次运行的应用程序使用,如果再次运行同样的应用程序,即使文件没有被修改,也会重新经历以上流程,非常耗费时间,尤其是字典非常多,非常大时。

DistributedCache内置缓存置换算法,一旦缓存(文件数目达到一定上限或者文件总大小超过某一上限)满了之后,会踢除最久没有使用的文件。

在Hadopo 2.0中,自带的MapReduce框架仍支持1.0的这种DistributedCache使用方式,但DistributedCache本身是由YARN实现的,不再集成到MapReduce中。YARN还提供了很多相关编程接口供用户调用,有兴趣的可以阅读源代码。

下面介绍Hadoop 2.0中,DistributedCache通过命令行分发文件的基本使用方式:

(1)运行Hadoop自带的example例子, dict.txt会被缓存到各个Task的工作目录下,因此,直接像读取本地文件一样,在Mapper和Reducer中,读取dict.txt即可:

1 2 3 4 5 6 bin/Hadoopjar \ share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar \ wordcount \ -files hdfs:///dict/public/dict.txt \ /test/input\ /test/output

(2)Hadoop Streaming例子,需要通过-files指定mapper和reducer可执行文件或者脚本文件,这些文件就是通过DistributedCache分发到各个节点上的。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 #!/bin/bash HADOOP_HOME=/opt/yarn-client INPUT_PATH=/test/input/data OUTPUT_PATH=/test/output/data echo"Clearing output path: $OUTPUT_PATH" $HADOOP_HOME/bin/hadoopfs -rmr $OUTPUT_PATH   ${HADOOP_HOME}/bin/hadoopjar\    ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar\   -D mapred.reduce.tasks=2\   -files mapper,reducer\   -input $INPUT_PATH\   -output $OUTPUT_PATH\   -mapper mapper\   -reducer reducer

(3)接下给出一个缓存压缩文件的例子,假设压缩文件为dict.zip,里面存的数据为:

1 2 3 4 data/1.txt data/2.txt mapper.list reducer.list

通过-archives参数指定dict.zip后,该文件被解压后,将被缓存(实际上是软连接)到各个Task的工作目录下的dict.zip目录下,组织结构如下:

1 2 3 4 5 6 dict.zip/     data/         1.txt         2.txt     mapper.list     reducer.list

你可以在Mapper或Reducer程序中,使用类似下面的代码读取解压后的文件:

 

1 2 3 File file2 = read(“dict.zip/data/1.txt”, “r”); ……. File file3 = read(“dict.zip/mapper.list”, “r”);

如果你想直接将内容解压到Task工作目录下,而不是子目录dict.zip中,可以用“-files”(注意,不要使用-archives,“-files”指定的文件不会被解压)指定dict.zip,并自己在程序中实现解压缩:

1 2 3 4 #include <cstdlib> ……. system(“unzip –q dict.zip”); //C++代码 ……

总之,Hadoop DistributedCache是一个非常好用的工具,合理的使用它能够解决很多非常困难的问题。 

   总结以下:如果mr程序中需要第三方jar包,可以通过在程序中使用DistributedCache,也可以在命令中使用-libjars来实现,但是这些引入的jar都只可以在mr任务启动之后来使用,如果你在启动MR任务之前调用了第三方jar包的类,那这就会有问题,会在启动任务的时候找不到这个类。此时可以使用如下方式解决:

   在你的project里面建立一个lib文件夹,然后把所有的第三方jar包放到里面去,hadoop会自动加载lib依赖里面的jar。 这样就可以在mr启动之前也可以使用第三方jar了。

   方法调用顺序为(以libjars为例): -libjars --->conf.set("tmpjars")--->

DistributedCache.addArchiveToClassPath--->conf.set("mapreduce.job.cache.archives","")

相关文章链接:http://blog.csdn.net/xiaolang85/article/details/11782539

                        http://blog.csdn.net/lazy0zz/article/details/7505712

              

 

相关内容