hdfs DFSClient 源码分析2


这篇文章其实不是专门来讲DFSClient源码的,就光这个类就近4000行代码,不用说牵扯到其他类的代码,围绕着以下错误展开对DFSClient分析

由于最近flume1.4.0报

04 Apr 2014 07:11:53,111 WARN  [ResponseProcessor for block blk_326610323152553165_1164644] (org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$ResponseProcessor.run:3015)  - DFSOutputStream ResponseProcessor exception  for block blk_326610323152553165_1164644java.net.SocketTimeoutException: 69000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.95.198.123:27691 remote=/10.95.198.22:60010]
        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:155)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:128)
        at java.io.DataInputStream.readFully(DataInputStream.java:195)
        at java.io.DataInputStream.readLong(DataInputStream.java:416)
        at org.apache.hadoop.hdfs.protocol.DataTransferProtocol$PipelineAck.readFields(DataTransferProtocol.java:124)
        at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$ResponseProcessor.run(DFSClient.java:2967)


04 Apr 2014 07:11:53,112 WARN  [DataStreamer for file /user/hive/warehouse_ziyan/flume/woStoreSoftWDownload/20140404/.woStoreSoftWDownloadTotal.1396540800474.tmp block blk_326610323152553165_1164644] (org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError:3051)  - Error Recovery for block blk_326610323152553165_1164644 bad datanode[0] 10.95.198.22:60010
04 Apr 2014 07:11:53,112 WARN  [DataStreamer for file /user/hive/warehouse_ziyan/flume/woStoreSoftWDownload/20140404/.woStoreSoftWDownloadTotal.1396540800474.tmp block blk_326610323152553165_1164644] (org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError:3102)  - Error Recovery for block blk_326610323152553165_1164644 in pipeline 10.95.198.22:60010, 10.95.198.21:60010, 10.95.198.11:60010: bad datanode 10.95.198.22:60010

不得不深究一下DFSClient,其实DFSClient与flume没有直接的关系,但是flume操作FileSystem,而DistributedFileSystem有调用了DFSClient,所以有间接的关系,

以上错误,可修改org.apache.flume.sink.hdfs.Bucketwriter

private void open() throws IOException, InterruptedException {
    if ((filePath == null) || (writer == null)) {
      throw new IOException("Invalid file settings");
    }


    final Configuration config = new Configuration();
    // disable FileSystem JVM shutdown hook
    config.setBoolean("fs.automatic.close", false);
   //添加如下两句
    config.set("dfs.socket.timeout","3600000");
    config.set("dfs.datanode.socket.write.timeout","3600000");
    // Hadoop is not thread safe when doing certain RPC operations,
    // including getFileSystem(), when running under Kerberos.
    // open() must be called by one thread at a time in the JVM.
    // NOTE: tried synchronizing on the underlying Kerberos principal previously
    // which caused deadlocks. See FLUME-1231.
    synchronized (staticLock) {
      checkAndThrowInterruptedException();


      try {
        long counter = fileExtensionCounter.incrementAndGet();


        String fullFileName = fileName + "." + counter;


        if (fileSuffix != null && fileSuffix.length() > 0) {
          fullFileName += fileSuffix;
        } else if (codeC != null) {
          fullFileName += codeC.getDefaultExtension();

在DFSClient 中HdfsConstants


socketTimeout,datanodeWriteTimeout默认为1分,8分,当然socketTimeout是主要的,以上错误是DFSOutputStream读取物block超时,ResponseProcessor响应不到就会报错

针对这个问题,我之前以为hdfs-site.xml那个dfs.socket.timeout配置会影响到DFSClient,其实不是的,只会设置datanode

flume 端:

HDFSDataStream

  @Override
  public void open(String filePath) throws IOException {
    Configuration conf = new Configuration();
    Path dstPath = new Path(filePath);
    FileSystem hdfs = dstPath.getFileSystem(conf);
    if(useRawLocalFileSystem) {
      if(hdfs instanceof LocalFileSystem) {
        hdfs = ((LocalFileSystem)hdfs).getRaw();
      } else {
        logger.warn("useRawLocalFileSystem is set to true but file system " +
            "is not of type LocalFileSystem: " + hdfs.getClass().getName());
      }
    }

flume就会间接影响到DFClient的配置

服务端DistributedFileSystem

  public void initialize(URI uri, Configuration conf) throws IOException {
    super.initialize(uri, conf);
    setConf(conf);


    String host = uri.getHost();
    if (host == null) {
      throw new IOException("Incomplete HDFS URI, no host: "+ uri);
    }


    InetSocketAddress namenode = NameNode.getAddress(uri.getAuthority());
    this.dfs = new DFSClient(namenode, conf, statistics);
    this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
    this.workingDir = getHomeDirectory();
  }

不过像hive,hbase,都像flume一样都会存在这样的问题,如下是hbase

http://blog.csdn.net/wangqiaoshi/article/details/22900641

相关内容