Hadoop HDFS (3) JAVA访问HDFS之二 文件分布式读写策略,hadoophdfs


先把上节未完成的部分补全,再剖析一下HDFS读写文件的内部原理

列举文件

FileSystem(org.apache.hadoop.fs.FileSystem)的listStatus()方法可以列出一个目录下的内容。
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException;
public FileStatus[] listStatus(Path[] files) throws FileNotFoundException, IOException;
public FileStatus[] listStatus(Path f, PathFilter filter) throws FileNotFoundException, IOException;
public FileStatus[] listStatus(Path[] files, PathFilter filter) throws FileNotFoundException, IOException;
这一组方法,都接收Path参数,如果Path是一个文件,返回值是一个数组,数组里只有一个元素,是这个Path代表的文件的FileStatus对象;如果Path是一个目录,返回值数组是该目录下的所有文件和目录的FileStatus组成的数组,有可能是一个0长数组;如果参数是Path[],则返回值相当于多次调用单Path然后把返回值整合到一个数组里;如果参数中包含PathFilter,则PathFilter会对返回的文件或目录进行过滤,返回满足条件的文件或目录,条件由开发者自定义,用法与java.io.FileFilter相似。 下面这个程序接收一组paths,然后列出其中的FileStatus
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
public class ListStatus {
    public static void main(String[] args) throws Exception {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
       
        Path[] paths = new Path[args.length];
        for (int i = 0; i < paths.length; i++) {
            paths[i] = new Path(args[i]);
        }
       
        FileStatus[] status = fs.listStatus(paths);
        Path[] listedPaths = FileUtil.stat2Paths(status);
        for (Path p : listedPaths) {
            System.out.println(p);
        }
    }
}
上传程序,然后执行: $hadoop ListStatus / /user /user/norris 则列出/下,/user/下,/user/norris/下的所有文件和目录。 在Hadoop下执行程序的方法见上一篇博客(http://blog.csdn.net/norriszhang/article/details/39648857

File patterns 用通配符列出文件和目录

FileSystem的globStatus方法就是利用通配符来列出文件和目录的。glob就是通配的意思。 FileSystem支持的通配符有: *:匹配0个或多个字符 ?:匹配1个字符 [ab]:匹配方括号中列出的字符 [^ab]:匹配方括号中没有列出的字符 [a-b]:匹配方括号中列出的字符范围 [^a-b]:匹配方括号中列出的字符范围以外的字符 {a,b}:或者匹配a或者匹配b \c:转义,如果c是一个元字符,就代表这个字符本身,比如\[,就表示字符[
public FileStatus[] globStatus(Path pathPattern) throws IOException;
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException;
虽然pathPattern很强大,但是也有些情况不能满足,比如就是要排除某个特定文件,这时就需要使用PathFilter了。
package org.apache.hadoop.fs;
public interface PathFilter {
    boolean accept(Path path);
}
这是PathFilter这个接口的定义,使用时只要实现accept方法,返回是否选中该Path就行了。这个accept方法接收的参数是一个Path,也就是说,有用的信息几乎只能拿到路径和文件名,像修改时间啊、权限啊、所有者呀、大小啊什么的,都拿不到,没有FileStatus那样强大,所以,如果我们希望按修改时间来选文件时,就要在文件的命名时带上时间戳了。(当然,通过FileSystem也能再次获得那些信息,但是。。。那也太不值当的了吧?我也不确定会有多大消耗)

删除文件

FileSystem的delete方法删除一个文件或目录(永久删除)。 public boolean delete(Path f, boolean recursive) throws IOException; 删除f,如果f是一个文件或空目录,则不管recursive传什么,都删除,如果f是一个非空目录,则recursive为true时目录下内容全部删除,如果recursive为false,不删除,并抛出IOException。

下面再深入剖析HDFS读写文件时的数据流向过程

读文件剖析

第一步:客户端通过调用FileSystem.open()方法打开一个文件,对于HDFS来讲,其实是调用的DistributedFileSystem实例的open方法; 第二步:DistributedFileSystem通过远程方法调用(RPC)访问namenode,获取该文件的前几个blocks所在的位置信息;针对每个block,namenode都会返回有该block数据信息的所有datanodes节点,比如配置的dfs.replication为3,就会每个block返回3个datanodes节点信息,这些节点是按距离客户端的远近排序的,如果发起读文件的客户端就在包含该block的datanode上,那该datanode就排第一位(这种情况在做Map任务时常见),客户端就会从本机读取数据。关于如何判断离客户端的距离远近的问题,一会儿的网络拓扑理论上会讲到。 DistributedFileSystem的open方法返回一个FSDataInputStream,FSDataInputStream里包装着一个DFSInputStream,DFSInputStream真正管理datanodes和namenode的I/O; 第三步:客户端调用FSDataInputStream.read()方法,FSDataInputStream里已经缓存了该文件前几个block所在的datanode的地址,于是从第一个block的第一个地址(也就是最近的datanode)开始连接读取; 第四步:反复调用read()方法,数据不断地从datanode流向客户端; 第五步:当一个block的数据读完了,DFSInputStream会关闭当前datanode的连接,打开下一个block所在的最优datanode的连接继续读取;这些对客户端是透明的,在客户端看来,就是在读一个连续的流; 第六步:这样一个block一个block读下去,当需要更多block的存储信息时,DFSInputStream会再次调用namenode,获取下一批block的存储位置信息,直到客户端停止读取,调用FSDataInputStream.close()方法,整个读取过程结束。
在读取过程中,如果DFSInputStream在与Datanode通信时发生了错误,它会试着向下一个最近的datanode节点获取当前block数据,DFSInputStream也会记录下发生错误的datanode节点,以便在以后block数据的读取时,不再去这些节点上尝试。 DFSInputStream在读取到datanode上的block数据后也会做checksum校验,如果checksum失败,它会先向namenode报告这台datanode上的数据有问题,然后再去尝试一下个存有当前block的datanode。 在这一整套的设计上,最重要的一点是:客户端在namenode的指引下,直接向最优datanode读取数据,这样的设计让HDFS支持大规模的并发,因为客户端读取数据的流量分布在集群的每个节点上,namenode只是通过内存提供位置信息而不提供数据,如果客户端都通过namenode获得数据,那客户端的数量就大大受限制了。

Hadoop如何决定哪个datanode离客户端最近

在网络上什么叫“近”?在大数据流动时,带宽是最稀缺的资源,因此,用两个节点之前的带宽来定义它们之间的距离很合理。 在实践中,那么多节点,在每两个节点之间都测量带宽是不现实的,Hadoop采取了折中的方式,它把网络结构想象成一棵树,两个结点之间的距离就是两个节点分别向上找父、祖父、祖宗。。。直到两个节点有一个共同祖宗时,它们俩走的步数之和。没有人规定树必须有多少级,但通常的做法是分成“数据中心”、“机架”、“节点”三级,越排在前面的,之间通信带宽越小,比如两个数据中心之间通信要比同数据中心两个机架之间通信要慢,两个机架之间通信要比同机架的两个节点通信慢。所以,按照由快到慢分别是: - 本机 - 同机架的两个节点 - 同数据中心不同机架的两个节点 - 不同数据中心的两个节点 假如用d表示数据中心,r表示机架,n表示节点,那/d1/r1/n1就表示1数据中心1机架上的1号节点。 - distance(/d1/r1/n1, /d1/r1/n1) = 0 //同一台机器 - distance(/d1/r1/n1, /d1/r1/n2) = 2 //同一个机架上的两台机器,它们各自到共同父结点r1的步数都是1,因此距离是2 - distance(/d1/r1/n1, /d1/r2/n3) = 4 //同数据中心的两个机架 - distance(/d1/r1/n1, /d2/r3/n4) = 6 //不同的数据中心
最后,Hadoop是无法知道你的网络拓扑结构的,所以你得通过配置告诉它。默认情况下,它认为所有的节点都是同一个机架上的节点,也就是任意两台之间的距离都是相同的。在小规模的集群中,这个默认配置就够用了,但是大的集群需要更多的配置,以后讲到集群配置时再说。

写文件剖析

文件是怎么被写进HDFS的呢?下面的介绍可能过于细致了,但是这样才能有助于理解HDFS的数据一致性模型。 我们来讨论一下创建新文件,向里写数据,然后关闭文件的一个过程: 第一步:客户端调用DistributedFileSystem.create()方法创建一个文件; 第二步:DistributedFileSystem向namenode发起远程方法调用(RPC),创建一个文件,但是namenode没有把它关联到任何block上去;namenode在这一步做了很多检查工作,保证该文件当前不存在,客户端有创建该文件的权限等。如果这些检查都通过了,namenode创建一条新文件记录,否则,创建失败,客户端返回IOException。DistributedFileSystem返回一个FSDataOutputStream,像读文件时一样,这个FSDataOutputStream里包装着一个DFSOutputStream,由它来实际处理与datanodes和namenode的通信。 第三步:客户端向DFSOutputStream里写数据,DFSOutputStream把数据分成包,丢进一个称为data queue的队列中,DataStreamer负责向namenode申请新的block,新的block被分配在了n(默认3)个节点上,这3个节点就形成一个管道。 第四步:DataStreamer把data queue里的包拿出来通过管道输送给第一个节点,第一个节点再通过管道输送给第二个节点,第二个再输送给第三个。 第五步:DFSOutputStream同时还在内部维护一个通知队列,名叫ack queue,里面是发过的数据包,一个包只有被所有管道上的datanodes通知收到了,才会被移除。如果任意一个datanode接收失败了,首先,管道关闭,然后把ack queue里的包都放回到data queue的头部,以便让失败节点下游节点不会丢失这些数据。当前已经成功接收数据了的节点将会经与namenode协商后分配一个新的标识,以便当坏节点以后恢复回来时可以把上面的不完整数据删除。然后打开管道把坏节点移出,数据会继续向其它好节点输送,直到管道上的节点都完成了,这时其实是少复制了一个节点,向namenode报告一下说现在这个block没有达到设定的副本数,然后就返回成功了,后期namenode会组织一个异步的任务,把副本数恢复到设定值。然后,接下来的数据包和数据块正常写入。以上操作,对客户端都是透明的,客户端不知道发生了这些事情,只知道写文件成功了。 如果多个datanodes都失败了怎么办呢?hdfs-site.xml里有个配置dfs.replication.min,默认值是1,意思是只要有1个datanode接收成功,就认为数据写入成功了。客户端就会收到写入成功的返回。后期Hadoop会发起异步任务把副本数恢复到dfs.replication设置的值(默认3)。 第六步:当客户端完成数据写入,调用流的close()方法,这个操作把data queue里的所有剩余的包都发给管道。 第七步:等所有包都收到了写成功的反馈,客户端通知namenode写文件完成了。因为DataStream写文件前就先向namenode申请block的位置信息了,所以写文件完成时,namenode早已知道每个block都在哪了,它只需等最小的副本数写成功,就可以返回成功了。

Namenode如何选择一个block被写到哪几个节点上去?

Hadoop在这个算法上是做了权衡处理的。都写到同一个节点上,或者写在同一个机架的节点上,肯定是效率最高的,因为数据传输的带宽最大,但这就不是分布式冗余了,万一这个节点失败,或者这个机架掉电,这份数据就再也读不到了。当然随意写到三台机器上,最好分在不同的数据中心才最安全,但是那样又太损失效率了。即使是在同一个数据中心的节点上写,也有很多种选择策略。从1.x版开始,这个策略就变成可插拔的了。 当前Hadoop默认的策略是: 第一份:如果客户端就运行在当前集群上,那第一个副本就存在当前节点上,如果客户端不运行在当前集群上,则随机选择第一个副本节点。当然这个随机是会考虑不要选已经有了很多数据或当前正在处理很大流量的datanode的; 第二份:选择与第一份不在同一个机架上的随机一个节点; 第三份:选择与第二份在同一个机架上的另一个随机节点; 更多份:如果需要复制更多份,其它节点是随机选择的,只是尽量分布在多个机架上,不让一个机架上有太多份副本。 *该书写作时Hadoop不支持跨数据中心部署,现在的版本不知道是不是去掉了这个限制,如果是,那这个策略是不是也会考虑跨数据中心,暂时还不清楚。 总体来看,这样的策略平衡考虑了可靠性(数据分布在不同的机架上)、写带宽(只有一次写需要跨机架)、读性能(读数据时有两上机架上的datanodes可选),数据分布在整个集群上。

数据一致性模型

对于文件系统来讲,所谓数据一致性模型,就是说一个写文件操作写进的数据,在什么时机可以被其它读文件的操作看到。HDFS在数据一致性方面做了平衡,因此可能不像本地文件系统那样,写进去的数据马上可以读到。 当一个文件被创建时,它是马上可以被看到的,但是当数据写进时,即使调用flush,读文件的操作也未必能看到,这个文件的长度可能还是0。(前一节讲文件写入时的progress回调时,我曾做了实验,一边往里写,很次回调时睡1秒,然后另一边不停看文件写进去多大了,结果发现一直是0,直到程序结束完成了写入,才看到文件的真实大小,当时以为是没有flush,现在看来其实是这种特殊的数据一致性模型导致的。) HDFS的数据一致性模型是以block为单位的,一个block被写完了,会看到一个block的数据,没写完一个block,就看不到这个block的数据。block 1写完了,其它读操作能看到这个block的内容,这时block 2正在写入,但是其它读操作却看不到,直到block 2完成,开始写block 3,block 2的数据才可以被其它读操作看到。 HDFS提供了一个方法,FSDataOutputStrean.sync(),强制让当前已写入的数据对其它读操作可见。在1.x以后的版本中,这个sync()方法被废弃了,改用hflush(),另外还有一个hsync()方法,声明说是更强的保证数据一致性,但到写书时为止,hsync()方法没有被实现,只是简单地调用了hflush()而已。 关闭文件时会非显示调用sync()方法,也就是被关闭了的文件,其全部数据都可以被其它读者看到了。
这一数据一致性模型对于应用程序是有影响的。应用程序的开发者应该心里清楚,当写操作进行时如果读数据,或者当客户端或系统出现问题时,可能会有最多一个block的数据丢失。如果你的应用不能接受,那就要彩取适当的策略在适当的时候调用hflush()方法,但是频繁调用hflush会影响吞吐量,所以你要在程序健壮性和吞吐量两方面做出权衡,选择适当的调用hflush的频率。

用Flume和Sqoop导入数据

写程序把数据放入HDFS,不如用已有的工具。因为现在已经有很成熟的工具来完成这件事,而且已经覆盖了大部分的需求。 Flume是Apache的大量数据移动的一个工具。其中一个典型的应用就是把Flume部署在web server的机器上,把web server上的日志收集起来导入到HDFS。它同时也支持各种日志写入。 Sqoop也是Apache的工具,它用于把大量结构化数据批量导入HDFS,比如把关系型数据库里的数据导入到Hive里。Hive是运行在Hadoop上的数据仓库,后面章节讲到。

用 java遍历hadoop分布式文件系统中某个目录下的全部文件,我的hadoop是单节点的

原因:
你访问的是本地文件系统而非hdfs , 因为Configuration默认的是在core-default.xml中的属性fs.default.name默认值是file:///,表示本地文件系统。在我们new Configuration();时会默认加载core-default.xml文件,所以根据这个文件的fs.default.name值使用了本地文件系统。

解决方法:
一般安装hadoop时都是修改core-site.xml文件,这个文件设置的属性值一般使用来覆盖core-default.xml这个文件的,在core-site.xml文件中会设置fs.default.name值为hadoop的namenode的地址以及端口号,如hdfs://localhost:9000,即表示namenode是本机,也就是为分布式。所以我们在连接hdfs时需要指定连接的地址,也就是hadoop集群中core-site.xml中fs.default.name属性值。所以解决方法有三种:
1)在代码Configuration conf=new Configuration();之后手动为Configuration对象设置fs.default.name属性值,如:conf.set("fs.default.name","hdfs:localhost:9000");
2)在代码的classpath下创建一个文件,在文件中设置fs.default.name属性值,再使用conf.addResource("文件路径")将该文件添加到Configuration中;
3)直接将集群的core-site.xml添加到classpath下即可,无需手动添加到Configuration,在new Configuration时会自动加载该文件
 

javaapi操作hadoop的hdfs需要什权限?

不知道你说的“调用Filesystem命令”是什么意思;使用hadoop fs -rm可以正常执行,MapReduce也可以正常读写HDFS文件,说明HDFS文件系统没有问题。你不妨试一试hadoop fs -chmod a+rwx <path>设置rwx权限后,再试一次。
 

相关内容