Hadoop源码分析之客户端读取HDFS数据
Hadoop源码分析之客户端读取HDFS数据
在使用Hadoop的过程中,很容易通过FileSystem类的API来读取HDFS中的文件内容,读取内容的过程是怎样的呢?今天来分析客户端读取HDFS文件的过程,下面的一个小程序完成的功能是读取HDFS中某个目录下的文件内容,然后输出到控制台,代码如下:
public class LoadDataFromHDFS { public static void main(String[] args) throws IOException { new LoadDataFromHDFS().loadFromHdfs("hdfs://localhost:9000/user/wordcount/"); } public void loadFromHdfs(String hdfsPath) throws IOException { Configuration conf = new Configuration(); Path hdfs = new Path(hdfsPath); FileSystem in = FileSystem.get(conf); //in = FileSystem.get(URI.create(hdfsPath), conf);//这两行都会创建一个DistributedFileSystem对象 FileStatus[] status = in.listStatus(hdfs); for(int i = 0; i < status.length; i++) { byte[] buff = new byte[1024]; FSDataInputStream inputStream = in.open(status[i].getPath()); while(inputStream.read(buff) > 0) { System.out.print(new String(buff)); } inputStream.close(); } } }
FileSystem in = FileSystem.get(conf)这行代码创建一个DistributedFileSystem,如果直接传入一个Configuration类型的参数,那么默认会读取属性fs.default.name的值,根据这个属性的值创建对应的FileSystem子类对象,如果没有配置fs.default.name属性的值,那么默认创建一个org.apache.hadoop.fs.LocalFileSystem类型的对象。但是这里是要读取HDFS中的文件,所以在core-site.xml文件中配置fs.default.name属性的值为hdfs://localhost:9000,这样FileSystem.get(conf)返回的才是一个DistributedFileSystem类的对象。 还有一种创建DistributedFileSystem这种指定文件系统类型对像的方法是使用FileSystem.get(Configuration conf)的一个重载方法FileSystem.get(URI uri, Configuration),其实调用第一个方法时在FileSystem类中先读取conf中的属性fs.default.name的值,再调用的FileSystem.get(URI uri, Configuration)方法。
创建完了读取HDFS的DistributedFileSystem对象就可以按照HDFS的API对HDFS中的文件和目录进行操作了,如列出某个目录中文件和目录,读取文件,写入文件等。现在就来看看从HDFS中读取文件的过程。在上面的代码中首先列出了目录中的所有文件,然后逐个文件进行读取。与使用Java IO读取本地文件类似,首先根据文件的路径创建一个输入流,在Hadoop中使用FileSystem.open()方法来创建输入流,这个方法返回的是一个FSDataInputStream对象,关于Hadoop输入流类结构的设计,参考博文Hadoop源码分析之HDFS客户端的输入流类结构。
调用in.ipen(status[i].getPath());这行代码会返回一个FSDataInputStream对象,在上面的代码中实际是一个DFSClient.DFSDataInputStream类的对象,上面调用FileSystem.open()方法后会进入到DistributedFileSystem.open()方法中,代码如下:
public FSDataInputStream open(Path f, int bufferSize) throws IOException { statistics.incrementReadOps(1); return new DFSClient.DFSDataInputStream( dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics)); }
方法中,statistics是一个org.apache.hadoop.fs.FileSystem.Statistics类型,它实现了文件系统读写过程中的一些统计。这个方法虽然只有两行,但是却调用了多个方法,首先是getPathName(),获取path在NameNode中的路径,方法的代码如下:
private String getPathName(Path file) { checkPath(file); String result = makeAbsolute(file).toUri().getPath(); if (!DFSUtil.isValidName(result)) { throw new IllegalArgumentException("Pathname " + result + " from " + file+" is not a valid DFS filename."); } return result; }
在这个方法中先调用了checkPath()方法,用于检查路径的合法性,例如保证用户不会在RawLocalFileSystem中创建“hdfs://xxx”这样的路径,其实就是检查了当前DistributedFileSystem对象的URI和根据传入的Path变量的URI。然后将Path对象file转换为文件具体的用/分隔的String路径,返回的结果是result。makeAbsolute()方法就是将相对路径转换为绝对路径,如果file本身就是一个绝对路径,那么就不用转换,在判断路径file是否是一个绝对路径的isAbsolute()方法中,还调用了一个名为hasWindowsDrive()的方法,其代码如下:
/** * 判断是否是Windows上的磁盘路径 * @param path * @param slashed 路径的第一个字符是否是‘/’ */ private boolean hasWindowsDrive(String path, boolean slashed) { if (!WINDOWS) return false; int start = slashed ? 1 : 0; return path.length() >= start+2 && //路径长度大于2,例如长度大于C:的长度 (slashed ? path.charAt(0) == '/' : true) && path.charAt(start+1) == ':' && ((path.charAt(start) >= 'A' && path.charAt(start) <= 'Z') || (path.charAt(start) >= 'a' && path.charAt(start) <= 'z')); }
这个函数判断是否是在Windows操作系统上运行HDFS,Windows操作系统对磁盘的划分使用C、D、E等这些字符开头,会对这些情况做一些特殊的判断。getPathName()方法返回的字符串表示NameNode目录文件树中的文件路径,对于上面读取HDFS内容的代码中给出的路径hdfs://localhost:9000/user/wordcount/,则会返回/user/wordcount/这个字符串,user和wordcount为从根目录开始的两个目录。
getPathName()方法返回后,调用进入到DFSClient.open(String src, int buffersize, boolean verifyChecksum,FileSystem.Statistics stats)方法,在这个方法中调用了DFSClient.DFSInputStream()的构造方法,创建DFSInputStream输入流对象。DFSInputStream是对客户端读取的输入流的抽象,类的定义和成员变量如下:
public class DFSInputStream extends FSInputStream { private Socket s = null; /**检查该对象是否打开**/ private boolean closed = false; /**读取的文件的路径**/ private String src; /**在getBlockLocations()方法中作为请求的数据长度length参数,可一次获取多个数据块的位置信息,这样就不必每读一个数据块就 * 调用ClientProtocol.getBlockLocations()来获取数据块的位置**/ private long prefetchSize = 10 * defaultBlockSize; /**BlockReader的实现类有BlockReaderLocal和RemoteBlockReader,两者都继承自FSInputChecker,从而增加了数据校验能力**/ private BlockReader blockReader = null; /**是否对数据进行校验的标志**/ private boolean verifyChecksum; /**已经解析了位置的数据块信息**/ private LocatedBlocks locatedBlocks = null; /**当前DFSInputStream对象读取文件数据所联系的数据节点**/ private DatanodeInfo currentNode = null; /**当前读取的数据块**/ private Block currentBlock = null; /**在数据块中读取的位置**/ private long pos = 0; private long blockEnd = -1; private int failures = 0; private ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes = new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>(); /**缓冲区大小**/ private int buffersize = 1; private byte[] oneByteBuf = new byte[1]; // used for 'int read()'
DFSClient.open()方法中调用的DFSInputStream输入流的构造方法代码如下:
DFSInputStream(String src, int buffersize, boolean verifyChecksum ) throws IOException { this.verifyChecksum = verifyChecksum; this.buffersize = buffersize; this.src = src; prefetchSize = conf.getLong("dfs.read.prefetch.size", prefetchSize); openInfo(); }
这个构造方法给成员变量赋值,openInfo()则调用NameNode的IPC方法获取要读取文件的数据块信息,代码如下:
synchronized void openInfo() throws IOException { for (int retries = 3; retries > 0; retries--) { if (fetchLocatedBlocks()) { // fetch block success return; } else { // Last block location unavailable. When a cluster restarts, // DNs may not report immediately. At this time partial block // locations will not be available with NN for getting the length. // Lets retry a few times to get the length. DFSClient.LOG.warn("Last block locations unavailable. " + "Datanodes might not have reported blocks completely." + " Will retry for " + retries + " times"); waitFor(4000); } } throw new IOException("Could not obtain the last block locations."); }
在openInfo()方法中如果读取数据块信息失败,则会每隔4秒读取一次,总共读取3次,主要调用了方法fetchLocatedBlocks()方法来读取进行数据块的读取过程。fetchLocatedBlocks()方法的代码如下:
private boolean fetchLocatedBlocks() throws IOException, FileNotFoundException { LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize); if (newInfo == null) { throw new FileNotFoundException("File does not exist: " + src); } if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() && !newInfo.isUnderConstruction()) { Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks() .iterator(); Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator(); while (oldIter.hasNext() && newIter.hasNext()) { if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) { throw new IOException("Blocklist for " + src + " has changed!"); } } } boolean isBlkInfoUpdated = updateBlockInfo(newInfo); this.locatedBlocks = newInfo; this.currentNode = null; return isBlkInfoUpdated; }
在这个方法中,具体对NameNode远程方法的调用在callGetBlockLocations()方法中完成,远程方法调用返回的是LocatedBlocks类型的变量newInfo。其中LocatedBlocks类用于保存客户端方位NameNode定位到的数据块的结果信息。在访问NameNode时,一次调用会定位多个数据块,包含一系列LocatedBlock对象,这样就不用每次要读取一个数据块时就访问NameNode一次了。LocatedBlocks类的成员变量underConstruction标识文件是否处于构建状态,在HDFS中,某文件处于构建状态是指客户端正在写文件,这时候,LocatedBlocks对象中获得的文件长度fileLength后续可能会有变化。
获取到文件的部分数据块之后,newInfo对象是否为null,如果上次已经获取到该文件的部分数据块,那么第二次数据块获取的数据块应该与第一次获取的数据块一致,这里看不不是很明白。
然后,调用updateBlockInfo()方法更新数据块信息,因为如果文件还处于构建状态(underConstruction为true),那么刚才通过callGetBlockLocations()方法获取到的数据块就不是最新的,所以还要再次更新已经更新的数据块。方法最后初始化locatedBlocks和currentNode,currentNode表示客户端当前正在读取的数据节点,因为客户端刚刚才获取到数据块,所以还未读数据块,currentNode为null。
callGetBlockLocations()方法返回文件src文件中从起始部分开始到prefectchSize大小的数据所处的数据块。方法代码如下:
/** * 获取被打开文件的初始数据块位置信息 * @param namenode * @param src 读取的文件路径 * @param start 从文件的什么地方开始读取 * @param length 读取文件的长度 * @return * @throws IOException */ static LocatedBlocks callGetBlockLocations(ClientProtocol namenode, String src, long start, long length) throws IOException { try { return namenode.getBlockLocations(src, start, length);//调用NameNode.getBlockLocations() } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class); } }
方法很简单,直接调用了远程方法ClientProtocol.getBlockLocations(),其中namenode变量是在DFSClient的构造方法中创建的一个Hadoop IPC对象,它访问NameNode的ClientProtocol接口方法,然后返回所请求的数据对应的数据块信息。
updateBlockInfo()方法更新还处于构建状态的文件的最后一个数据块的信息,它是直接访问某个DataNode节点,而不是NameNode节点,方法的代码如下:
private boolean updateBlockInfo(LocatedBlocks newInfo) throws IOException { if (!serverSupportsHdfs200 || !newInfo.isUnderConstruction() || !(newInfo.locatedBlockCount() > 0)) { return true; } LocatedBlock last = newInfo.get(newInfo.locatedBlockCount() - 1); boolean lastBlockInFile = (last.getStartOffset() + last.getBlockSize() == newInfo .getFileLength());//TODO ? if (!lastBlockInFile) { return true; } if (last.getLocations().length == 0) { return false; } ClientDatanodeProtocol primary = null; Block newBlock = null; for (int i = 0; i < last.getLocations().length && newBlock == null; i++) { DatanodeInfo datanode = last.getLocations()[i]; try { primary = createClientDatanodeProtocolProxy(datanode, conf, last .getBlock(), last.getBlockToken(), socketTimeout, connectToDnViaHostname);//创建一个DataNode的动态代理对象 //获取数据节点上的数据块的长度 newBlock = primary.getBlockInfo(last.getBlock()); } catch (IOException e) { if (e.getMessage().startsWith( "java.io.IOException: java.lang.NoSuchMethodException: " + "org.apache.hadoop.hdfs.protocol" + ".ClientDatanodeProtocol.getBlockInfo")) { // We're talking to a server that doesn't implement HDFS-200. serverSupportsHdfs200 = false; } else { LOG.info("Failed to get block info from " + datanode.getHostName() + " probably does not have " + last.getBlock(), e); } } finally { if (primary != null) { RPC.stopProxy(primary); } } } if (newBlock == null) { if (!serverSupportsHdfs200) { return true; } throw new IOException( "Failed to get block info from any of the DN in pipeline: " + Arrays.toString(last.getLocations())); } long newBlockSize = newBlock.getNumBytes(); long delta = newBlockSize - last.getBlockSize(); // if the size of the block on the datanode is different // from what the NN knows about, the datanode wins! last.getBlock().setNumBytes(newBlockSize); long newlength = newInfo.getFileLength() + delta; newInfo.setFileLength(newlength); LOG.debug("DFSClient setting last block " + last + " to length " + newBlockSize + " filesize is now " + newInfo.getFileLength()); return true; }
方法中serverSupportsHdfs200指示系统是否支持sync()方法,其中sync()方法保证处于构建状态的文件都输出到了DataNode节点中,但不保证DataNode节点持久化了这些数据。如果系统支持serverSupportsHdfs200,并且文件是处于构建状态的,那么就是执行updateBlockInfo()方法的逻辑,否则直接返回true。
接下来判断获取到的最后一个数据块的所在文件中的偏移量与文件整体大小的关系,如果最后一个数据块的结束部分是文件的末尾,那么就说明最后一个数据块在文件中,并且文件正处于构建状态,则最后一个数据块将会发生变化,则继续执行下面的更新逻辑,否则就表明获取到的数据块所不是文件的最后一个数据块,暂时不需要更新最后一个数据块的信息。
此外如果last.getLocations().length==0,则说明,已经获取到的数据块还未在任何一个DataNode节点中,不能从DataNode中获取数据块的数据信息,所以返回false。
然后就是与保存这个数据块的DataNode通信获取最后一个数据块的数据信息的过程。一个数据块副本可能会保存在多个DataNode节点上,所以就在一个循环中遍历所有保存了这个数据块副本的节点。在调用NameNode.getBlockLocations()方法时返回给客户端的每个数据块保存的DataNode节点是按照DataNode距离客户端的远近排序好的,只要从一个DataNode节点得到了这个数据块的信息,循环就退出了。所以这里读到的只是一个DataNode上的对应数据块的信息,可能与其他保存这个数据块副本的DataNode上的信息有所不同。下面就更新最后一个数据块的信息和文件长度信息。
这样openInfo()方法调用完成,DFSClient.DFSInputStream对象创建成功,在DistributedFileSystem.open()方法中创建一个DFSClient.DFSDataInputStream类型的对象。下面就是读取文件数据的过程。
在LoadDataFromHDFS类中,FSDataInputStream inputStream = in.open(status[i].getPath());的调用得到了一个FSDataInputStream类型的对象,然后调用输入流的read方法就可以读取数据了。
在LoadDataFromHDFS中读取HDFS中的文件,会进入到方法DFSClient.DFSInputStream.read(byte buf[], int off, int len)方法,代码如下:
public synchronized int read(byte buf[], int off, int len) throws IOException { checkOpen();//检查客户端是否已经打开 if (closed) {//检查DFSInputStream是否打开 throw new IOException("Stream closed"); } failures = 0; if (pos < getFileLength()) {//读取的位置未到文件结尾 int retries = 2; while (retries > 0) { try { if (pos > blockEnd) {//已经读到某个数据块的尾部 currentNode = blockSeekTo(pos); } //计算读取的长度 int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L)); int result = readBuffer(buf, off, realLen);//读入数据 if (result >= 0) { pos += result;//更新pos } else { // got a EOS from reader though we expect more data on it. throw new IOException("Unexpected EOS from the reader"); } if (stats != null && result != -1) { stats.incrementBytesRead(result); } return result; } catch (ChecksumException ce) {//检验错误,抛出异常 throw ce; } catch (IOException e) { if (retries == 1) { LOG.warn("DFS Read: " + StringUtils.stringifyException(e)); } blockEnd = -1; if (currentNode != null) { addToDeadNodes(currentNode); }//将节点加入黑名单 if (--retries == 0) {//达到重试上线,抛异常 throw e; } } } } return -1; }
在这个方法中,首先调用checkOpen()方法检查客户端是否打开(正在运行),然后是当前DFSInputStream对象是否关闭。failures变量在每次调用DFSInputStream的读取数据方法时都会置0,表示此次读取失败的次数,当这个值达到一个阈值时,会抛出异常。pos变量表示读取文件过程中,下一个字符在文件中的偏移量,也就是下次读取的起始位置,blockEnd表示已经读到数据块的尾部。如果读到数据块的尾部,则会调用blockSeekTo()方法获取下一个数据块所在的DataNode节点,然后调用readBuffer()进行读取。调用readBuffer()方法完成后,就更新pos指针,并返回读取的字节数。blockSeekTo()方法和readBuffer()方法是这个过程中的主要方法,先来分析blockSeekTo()方法,代码如下:
private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { if (target >= getFileLength()) { throw new IOException("Attempted to read past end of file"); } if ( blockReader != null ) {//如果BlockReader对象非空则关闭该对象 blockReader.close(); blockReader = null; } if (s != null) {//到上一个数据节点的Socket还打开着,关闭连接 s.close(); s = null; } // // Connect to best DataNode for desired Block, with potential offset // DatanodeInfo chosenNode = null; int refetchToken = 1; // only need to get a new access token once while (true) { // // Compute desired block // LocatedBlock targetBlock = getBlockAt(target, true); assert (target==this.pos) : "Wrong postion " + pos + " expect " + target; long offsetIntoBlock = target - targetBlock.getStartOffset();//target与这个数据块的第一个字节的距离,即从这个数据块的哪个位置开始读数据 DNAddrPair retval = chooseDataNode(targetBlock); chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; // try reading the block locally. if this fails, then go via // the datanode Block blk = targetBlock.getBlock(); Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken(); //是否使用本地读优化 if (shouldTryShortCircuitRead(targetAddr)) { try { blockReader = getLocalBlockReader(conf, src, blk, accessToken, chosenNode, DFSClient.this.socketTimeout, offsetIntoBlock); return chosenNode; } catch (AccessControlException ex) { LOG.warn("Short circuit access failed ", ex); //Disable short circuit reads shortCircuitLocalReads = false; } catch (IOException ex) { if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) { /* Get a new access token and retry. */ refetchToken--; fetchBlockAt(target); continue; } else { LOG.info("Failed to read " + targetBlock.getBlock() + " on local machine" + StringUtils.stringifyException(ex)); LOG.info("Try reading via the datanode on " + targetAddr); } } } //没有使用本地读优化,创建Socket对象 try { s = socketFactory.createSocket(); LOG.debug("Connecting to " + targetAddr); NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), socketTimeout); s.setSoTimeout(socketTimeout); blockReader = RemoteBlockReader.newBlockReader(s, src, blk.getBlockId(), accessToken, blk.getGenerationStamp(), offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, buffersize, verifyChecksum, clientName); return chosenNode; } catch (IOException ex) { if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) { refetchToken--; fetchBlockAt(target); } else { LOG.warn("Failed to connect to " + targetAddr + ", add to deadNodes and continue" + ex); if (LOG.isDebugEnabled()) { LOG.debug("Connection failure", ex); } // Put chosen node into dead list, continue addToDeadNodes(chosenNode); } if (s != null) { try { s.close(); } catch (IOException iex) { } } s = null; } } }
该方法表示在读操作越过某个数据块边界时,用于建立到下一个数据块所在数据节点的联系,并初始化所需要的BlockReader对象。方法中如果blockReader对象和s对象不为空,则会关闭,因为上一个数据块已经读取完毕,要读取下一个数据块,而这个要读取的数据块会在另个DataNode节点上,需要重新建立Socket连接并重新初始化BlockReader对象。其中BlockReader负责数据块的读取,下面会详细分析到。
在while循环中首先调用getBlockAt()方法,根据下一个读取位置获取文件中这个位置的LocatedBlock对象,该对象包含了这个位置所处的数据块,在getBlockAt()方法中先利用DFSInputStream中已经缓存的LocatedBlock实例获取这个数据块,如果不存在,即这个数据块没信息有在上次与NameNode交互的过程中返回,就使用callGetBlockLocations()方法,再次从NameNode中获取并缓存数据块的位置信息。getBlockAt()方法调用的LocatedBlocks.findBlock()使用二分查找算法从已经获取的数据块数组中找所需要的数据块,如果在没有找到,则调用NameNode.getBlockLocations()方法从NameNode中获取所需要的数据块,这个过程中也会返回多个数据块,这个调用完成后,会调用LocatedBlocks.insertRange()将新获取的数据块加入到locatedBlocks中,最后根据参数updatePosition的值,更新pos,blockEnd和currentBlock这三个值。
接下来调用chooseDataNode()方法选择一个对于客户端来说最优的数据节点读取节点上的数据块,NameNode.getBlockLocations()返回数据块时,会对数据块所在的数据节点列表进行整理,将离客户端网络距离近的节点放在数组的最前面,已经是一个按优先级排序的数组,在chooseDataNode()的执行过程中,会调用方法bestNode()排除掉客户端联系不上的“死”节点,返回可用的数据节点位置信息。
如果配置了客户端的本读读优化,并且数据块已经创建了,且上面获取到的数据节点地址是本机地址,那么如果当前数据块与客户端在同一台主机上,则会优先读取本地的数据块,否则就读取远程机器上面的数据块。对于数据节点间的数据相关操作来说,可能会进行本地数据块读取,但是非HDFS集群的机器来说只会读取HDFS中的内容的程序则只会读取远程机器上的数据块。无论是本地读取还是远程读取都会创建一个BlockReader接口的对象,这个接口的实现类用于处理数据块的读取操作,它有两个实现类,分别是BlockReaderLocal和RemoteBlockReader,这两个类都继承自FSInputChecker这个抽象类,BlockReaderLocal和RemoteBlockReader这两个类负责具体的数据的读取,FSInputChecker则为数据输入提供了检验能力。可以分析这个数据读取过程中的方法调用先后关系:
DFSInputStream.read(byte buf[], int off, int len)方法中调用DFSInputStream.readBuffer(byte buf[], int off, int len)来实现数据读取,而readBuffer方法则是调用BlockReader.read(byte[] buf, int off, int len)方法,由于BlockReader有两个实现类,以RemoteBlockReader.read(byte[] buf, int off, int len)为例来分析,在这个方法中通过super.read(buf, off, len);这行语句来读取数据,而这行语句则是对方法FSInputChecker.read(byte[] b, int off, int len)的调用,在FSInputChecker抽象类中又调用了read1方法和readChecksumChunk方法,最后的调用则是FSInputChecker.readChunk(long pos, byte[] buf, int offset, int len, byte[] checksum)。
读取数据完成之后,就可以关闭输入流了,DFSInputStream类重写了close方法来关闭输入流,这个方法代码如下:
public synchronized void close() throws IOException { if (closed) { return; } checkOpen(); if ( blockReader != null ) { blockReader.close(); blockReader = null; } if (s != null) { s.close(); s = null; } super.close(); closed = true; }
分别关闭了blockReader和Socket对象s,再设置标识closed为true。
Reference
《Hadoop技术内幕:深入理解Hadoop Common和HDFS架构设计与实现原理》
评论暂时关闭