Hadoop文件系统支持释疑之S3


一、引言

  Hadoop版本提供了对多种文件系统的支持,但是这些文件系统是以何种方式实现的,其实现原理是什么以前并没有深究过。今天正好有人咨询我这个问题:Hadoop对S3的支持原理是什么?特此总结一下。Hadoop支持的文件系统包括:  

  文件系统                 URI前缀       hadoop的具体实现类

  Local                     file               fs.LocalFileSystem

  HDFS                     hdfs            hdfs.DistributedFileSystem

  HFTP                      hftp            hdfs.HftpFileSystem

  HSFTP                    hsftp          hdfs.HsftpFileSystem

  HAR                        har            fs.HarFileSystem

  KFS                         kfs            fs.kfs.KosmosFileSystem

  FTP                          ftp             fs.ftp.FTPFileSystem

  S3 (native)              s3n            fs.s3native.NativeS3FileSystem

  S3 (blockbased)      s3      fs.s3.S3FileSystem

二、争议观点

   1.Hadoop对S3文件系统的支持是通过自己实现S3文件系统来做的吗?

   2.Hadoop对S3文件系统的支持是通过S3文件系统接口,实现的对S3文件系统的整合?

三、源码解析

复制代码
  
               
          
  Jets3tFileSystemStore    
      String FILE_SYSTEM_NAME = "fs"      String FILE_SYSTEM_VALUE = "Hadoop" 
      String FILE_SYSTEM_TYPE_NAME = "fs-type"      String FILE_SYSTEM_TYPE_VALUE = "block" 
      String FILE_SYSTEM_VERSION_NAME = "fs-version"      String FILE_SYSTEM_VERSION_VALUE = "1"   
      Map<String, String> METADATA =
      HashMap<String, String>   
        
      String PATH_DELIMITER =      String BLOCK_PREFIX = "block_" 
      
    
      
       
     initialize(URI uri, Configuration conf)      
     .conf =     
     S3Credentials s3Credentials =              AWSCredentials awsCredentials =
                 .s3Service =      }         (e.getCause()                         bucket =  
     .bufferSize = conf.getInt("io.file.buffer.size", 4096  
    String getVersion()        
     delete(String key)            }         (e.getCause()                      
     deleteINode(Path path)    
     deleteBlock(Block block)    
     inodeExists(Path path)      InputStream in = get(pathToKey(path),       (in ==                     
     blockExists( blockId)      InputStream in = get(blockToKey(blockId),       (in ==                   
    InputStream get(String key,             
            S3Object object =                     }         ("NoSuchKey"                   (e.getCause()                      
    InputStream get(String key,  byteRangeStart)             S3Object object = s3Service.getObject(bucket, key, , ,                                              , byteRangeStart,             }         ("NoSuchKey"                   (e.getCause()                      
     checkMetadata(S3Object object)       
     String name =      (!         S3FileSystemException("Not a Hadoop S3 file."      String type =      (!         S3FileSystemException("Not a block file."      String dataVersion =      (!            
    INode retrieveINode(Path path)       INode.deserialize(get(pathToKey(path),   
    File retrieveBlock(Block block,           File fileBlock =      InputStream in =      OutputStream out =             fileBlock =       in =       out =  BufferedOutputStream(       [] buf =                 ((numRead = in.read(buf)) >= 0         out.write(buf, 0             }        
        out = ; 
        (fileBlock !=               }        
    File newBackupFile()      File dir =  File(conf.get("fs.s3.buffer.dir"      (!dir.exists() && !         IOException("Cannot create S3 buffer directory: " +      File result = File.createTempFile("input-", ".tmp"        
    Set<Path> listSubPaths(Path path)             String prefix =        (!         prefix +=        S3Object[] objects =       Set<Path> prefixes =  TreeSet<Path>        ( i = 0; i < objects.length; i++               }         (e.getCause()                        
    Set<Path> listDeepSubPaths(Path path)             String prefix =        (!         prefix +=        S3Object[] objects = s3Service.listObjects(bucket, prefix,        Set<Path> prefixes =  TreeSet<Path>        ( i = 0; i < objects.length; i++               }         (e.getCause()                      
     put(String key, InputStream in,  length,             
            S3Object object =         object.setContentType("binary/octet-stream"                }         (e.getCause()                      
     storeINode(Path path, INode inode)      put(pathToKey(path), inode.serialize(), inode.getSerializedLength(),   
     storeBlock(Block block, File file)      BufferedInputStream in =             in =  BufferedInputStream(       put(blockToKey(block), in, block.getLength(),      }     
          (closeable !=                }          
    
         (!         IllegalArgumentException("Path must be absolute: " +        
             
    String blockToKey(      BLOCK_PREFIX +  
          
     purge()             S3Object[] objects =        ( i = 0; i < objects.length; i++       }         (e.getCause()                      
     dump()      StringBuilder sb =  StringBuilder("S3 Filesystem, "     sb.append(bucket.getName()).append("\n"            S3Object[] objects = s3Service.listObjects(bucket, PATH_DELIMITER,         ( i = 0; i < objects.length; i++         Path path =         sb.append(path).append("\n"         INode m =         sb.append("\t").append(m.getFileType()).append("\n"          (m.getFileType() ==                      ( j = 0; j < m.getBlocks().length; j++           sb.append("\t").append(m.getBlocks()[j]).append("\n"       }         (e.getCause()                       
 }
复制代码

 

 四、有图有真相

 五、结论

  Hadoop对S3文件系统的支持通过S3文件系统接口,实现的对S3文件系统的整合。有感兴趣的可以自行参照源码。

相关内容