Hadoop 2.2.0 fsimage和edit logs的处理逻辑


在类org.apache.Hadoop.hdfs.server.namenode.NNStorageRetentionManager的purgeOldStorage()方法中描述了fsimage和edit logs的处理逻辑:

一、找到存在于fsimage中的最小txid,删除比最小txid小的fsimage

二、最小txid - dfs.namenode.num.extra.edits.retained = 可以删除txid集合

三、可删除txid集合 > dfs.namenode.max.extra.edits.segments.retained 时,删除集合中的最小值

Ubuntu 13.04上搭建Hadoop环境

Ubuntu 12.10 +Hadoop 1.2.1版本集群配置

Ubuntu上搭建Hadoop环境(单机模式+伪分布模式)

Ubuntu下Hadoop环境的配置

单机版搭建Hadoop环境图文教程详解

搭建Hadoop环境(在Winodws环境下用虚拟机虚拟两个Ubuntu系统进行搭建)

完整代码:

  public void purgeOldStorage() throws IOException {
    FSImageTransactionalStorageInspector inspector =
      new FSImageTransactionalStorageInspector();
    storage.inspectStorageDirs(inspector);

    long minImageTxId = getImageTxIdToRetain(inspector);
    purgeCheckpointsOlderThan(inspector, minImageTxId);
    // If fsimage_N is the image we want to keep, then we need to keep
    // all txns > N. We can remove anything < N+1, since fsimage_N
    // reflects the state up to and including N. However, we also
    // provide a "cushion" of older txns that we keep, which is
    // handy for HA, where a remote node may not have as many
    // new images.
    //
    // First, determine the target number of extra transactions to retain based
    // on the configured amount.
    long minimumRequiredTxId = minImageTxId + 1;
    long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
   
    ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
    purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false, false);
    Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
      @Override
      public int compare(EditLogInputStream a, EditLogInputStream b) {
        return ComparisonChain.start()
            .compare(a.getFirstTxId(), b.getFirstTxId())
            .compare(a.getLastTxId(), b.getLastTxId())
            .result();
      }
    });

    // Remove from consideration any edit logs that are in fact required.
    while (editLogs.size() > 0 &&
        editLogs.get(editLogs.size() - 1).getFirstTxId() >= minimumRequiredTxId) {
      editLogs.remove(editLogs.size() - 1);
    }

    // Next, adjust the number of transactions to retain if doing so would mean
    // keeping too many segments around.
    while (editLogs.size() > maxExtraEditsSegmentsToRetain) {
      purgeLogsFrom = editLogs.get(0).getLastTxId() + 1;
      editLogs.remove(0);
    }
   
    // Finally, ensure that we're not trying to purge any transactions that we
    // actually need.
    if (purgeLogsFrom > minimumRequiredTxId) {
      throw new AssertionError("Should not purge more edits than required to "
          + "restore: " + purgeLogsFrom + " should be <= "
          + minimumRequiredTxId);
    }
   
    purgeableLogs.purgeLogsOlderThan(purgeLogsFrom);
  }

相关内容