Hadoop源码分析之NameNode的启动与停止


在前面几篇文章中已经分析完了IPC的调用的大致过程,接下来分析NameNode节点的工作过程

NameNode节点在HDFS文件系统中只有一个实例,它提供了与多个DataNode实例一起组成了HDFS系统。NameNode节点主要功能是维护着HDFS文件中两个重要的关系:

其中HDFS目录树、元信息和数据块索引等信息会持久化到磁盘上,保存在命名空间镜像和编辑日志中。数据块和数据节点的对应关系则在NameNode节点和DataNode节点启动后,由DataNode节点上报给NameNode节点。NameNode节点通过接收数据节点的注册、心跳和数据块提交等信息来管理DataNode节点。NameNode节点会通过发送数据块复制、删除、恢复等NameNode节点指令来操作数据块,此外,NameNode节点还为客户端对文件系统目录树的操作和对文件数据读写以及对HDFS系统进行管理提供了支持。

NameNode的启动

在org.apache.hadoop.hdfs.server.namenode.NameNode.java文件中有个main函数,也就是启动NameNode节点的入口函数。NameNode类的main函数中通过方法createNameNode()创建了一个NameNode对象,然后就调用NameNode.join()函数等待NameNode停止。main()函数代码如下:

public static void main(String argv[]) throws Exception {
    try {
      StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
      NameNode namenode = createNameNode(argv, null);
      if (namenode != null)
        namenode.join();
    } catch (Throwable e) {
      LOG.error(StringUtils.stringifyException(e));
      System.exit(-1);
    }
  }
NameNode.join()函数就是调用NameNode.server成员变量的join方法,而NameNode.server成员变量是一个Hadoop IPC中讨论的RPC.Server对象,最终就调用了RPC.Server.join()方法。RPC.Server.join()方法也很简单,就是判断Server.runnning变量是否为true,如果running为true,说明这个IPC服务还在运行,就进入等待(wait()函数)。通过上面的分析可以知道,NameNode对象创建成功之后就可以为其他对象提供服务了,直到调用了NameNode的stop()方法,这部分代码比较简单,就不贴出来了。还有一点要注意的是NameNode的成员变量中有两个RPC.Server对象,一个是server,另一个是serviceRpcServer,其中server用于处理客户端的调用,serviceRpcServer用于处理HDFS内部的通信服务,如Datanode节点的调用。

NameNode类的main()方法中,最重要的就是这行代码,通过这行代码创建了一个NameNode对象。下面以createNameNode()方法为入口来分析NameNode节点的启动过程。createNameNode()函数的代码如下:

  public static NameNode createNameNode(String argv[], 
                                 Configuration conf) throws IOException {
    if (conf == null)
      conf = new Configuration();
    StartupOption startOpt = parseArguments(argv);
    if (startOpt == null) {
      printUsage();
      System.exit(-2);
    }
    setStartupOption(conf, startOpt);

    switch (startOpt) {
      case FORMAT:
        boolean aborted = format(conf, startOpt.getConfirmationNeeded(),
            startOpt.getInteractive());
        System.exit(aborted ? 1 : 0);
      case FINALIZE:
        aborted = finalize(conf, true);
        System.exit(aborted ? 1 : 0);
      case RECOVER:
        NameNode.doRecovery(startOpt, conf);
        return null;
      default:
    }
    DefaultMetricsSystem.initialize("NameNode");
    NameNode namenode = new NameNode(conf);
    return namenode;
  }

在createNameNode()函数中首先获取一个Configuration对象,用于加载启动NameNode节点所需的配置参数。然后就是调用parseArguments()函数来获取运行参数。如果运行参数有误,那么startOpt变量就为null,调用函数printUsage()打印出NameNode可接受的参数,再结束程序,printUsage()函数打印出的内容如下:


在第一次配置Hadoop环境的时候,都执行过命令,这里就是使用到了上面的[-format]参数。

parseArguments()函数的代码如下:

  private static StartupOption parseArguments(String args[]) {
    int argsLen = (args == null) ? 0 : args.length;
    StartupOption startOpt = StartupOption.REGULAR;
    for(int i=0; i < argsLen; i++) {
      String cmd = args[i];
      if (StartupOption.FORMAT.getName().equalsIgnoreCase(cmd)) {
        startOpt = StartupOption.FORMAT;
        // check if there are other options
        for (i = i + 1; i < argsLen; i++) {
          if (args[i].equalsIgnoreCase(StartupOption.FORCE.getName())) {
            startOpt.setConfirmationNeeded(false);
          }
          if (args[i].equalsIgnoreCase(StartupOption.NONINTERACTIVE.getName())) {
            startOpt.setInteractive(false);
          }
        }
      } else if (StartupOption.REGULAR.getName().equalsIgnoreCase(cmd)) {
        startOpt = StartupOption.REGULAR;
      } else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)) {
        startOpt = StartupOption.UPGRADE;
      } else if (StartupOption.RECOVER.getName().equalsIgnoreCase(cmd)) {
        if (startOpt != StartupOption.REGULAR) {
          throw new RuntimeException("Can't combine -recover with " +
              "other startup options.");
        }
        startOpt = StartupOption.RECOVER;
        while (++i < argsLen) {
          if (args[i].equalsIgnoreCase(
                StartupOption.FORCE.getName())) {
            startOpt.setForce(MetaRecoveryContext.FORCE_FIRST_CHOICE);
          } else {
            throw new RuntimeException("Error parsing recovery options: " + 
              "can't understand option \"" + args[i] + "\"");
          }
        }
      } else if (StartupOption.ROLLBACK.getName().equalsIgnoreCase(cmd)) {
        startOpt = StartupOption.ROLLBACK;
      } else if (StartupOption.FINALIZE.getName().equalsIgnoreCase(cmd)) {
        startOpt = StartupOption.FINALIZE;
      } else if (StartupOption.IMPORT.getName().equalsIgnoreCase(cmd)) {
        startOpt = StartupOption.IMPORT;
      } else
        return null;
    }
    return startOpt;
  }
首先默认是将startOpt设置位StartupOption.REGULAR,StartupOption是一个枚举类型,由上面打印的Usage可以看出NameNode启动选项主要有5类,分别是format,upgrade,rollback,finalize和importCheckpoint,它的意义分别是:
  • format:格式化NameNode,建立NameNode节点的文件结构。带有format参数启动NameNode节点时,首先启动NameNode节点,然后对其机型格式化,再关闭节点,如果文件目录已经存在当前文件系统,则会提示用户。它有两个参数nonInteractive和force,nonInteractive表示如果NameNode节点的文件夹在当前的底层文件系统中存在,那么用户将不会收到提示,并且当前的格式化会失败,force表示不管NameNode的目录存不存在,强制格式化NameNode节点,也不会提示用户,如果nonInteractive和force参数同时存在,那么force参数将会被忽略;
  • upgrade:升级系统;
  • rollback:从升级系统中回滚到前一个版本,这个参数必须在停止的集群分布式文件系统中使用;
  • finalize:提交一次升级,使用这个参数将会删除前一个版本的文件系统,并且当前版本的文件系统将会变成所使用的文件系统,并且rollback参数将不会再有效,这个参数最终会停止NameNode。
  • importCheckpoint:从名字节点的一个检查点恢复,这个参数将会从检查点目录导入镜像文件,作为NameNode的文件目录结构。
  • recover:执行元数据恢复,对于这一点还不理解,HDFS已经可以使用importCheckpoint参数从SecondaryNameNode恢复元数据,为什么还需要一个recover的过程?http://stackoverflow.com/questions/17387477/namenode-recovery-how-does-namenode-recovery-works?answertab=active#tab-top这里说的Hadoop2.0开始可以有多个NameNode节点,

在startupOption类中,还有一个枚举值是REGULAR,它表示正常启动NameNode。StartupOption.REGULAR是parseArguments()方法的默认值,从方法中可以看出,方法会遍历程序运行时给出的所有参数,根据参数值设定startOpt的值,如果参数中第一个参数没有NameNode启动所期望的值,那么就返回null,然后就会打印出启动NameNode的方法。

通过parseArguments()方法获取到启动参数之后,就调用私有静态方法将启动方式设置到Congifuration对象中,随后就根据启动参数的值执行相应的方法,这里值针对format,finalize和recover的处理。先不管这些处理,看看NameNode对象的创建过程。这行代码是用于NameNode的运行时的数据统计的,暂时不考虑。下面就是执行NameNode构造方法创建一个NameNode对象。

NameNode的构造方法很简单,直接调用了方法initialize()方法,下面来分析initialize()方法。在这个方法中执行了一系列的成员变量初始化,主要的代码如下(删除了安全相关和度量相关的代码):

  private void initializea(Configuration conf) throws IOException {
	    InetSocketAddress socAddr = NameNode.getAddress(conf);
	    //Handler线程的个数
	    int handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
	    this.namesystem = new FSNamesystem(this, conf);
	    // create rpc server,创建IPC服务器
	    InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
	    if (dnSocketAddr != null) {
	      int serviceHandlerCount =
	        conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
	                    DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
	      this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(), 
	          dnSocketAddr.getPort(), serviceHandlerCount,
	          false, conf, namesystem.getDelegationTokenSecretManager());
	      this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
	      setRpcServiceServerAddress(conf);
	    }
	    this.server = RPC.getServer(this, socAddr.getHostName(),
	        socAddr.getPort(), handlerCount, false, conf, namesystem
	        .getDelegationTokenSecretManager());
	    // Set terse exception whose stack trace won't be logged
	    this.server.addTerseExceptions(SafeModeException.class);
	    
	    // The rpc-server port can be ephemeral... ensure we have the correct info
	    this.serverAddress = this.server.getListenerAddress(); 
	    FileSystem.setDefaultUri(conf, getUri(serverAddress));
	    LOG.info("Namenode up at: " + this.serverAddress);
	    //启动http服务器
	    startHttpServer(conf);
	    this.server.start();  //start RPC server   
	    if (serviceRpcServer != null) {
	      serviceRpcServer.start();      
	    }
	    startTrashEmptier(conf);//启动垃圾桶功能
	    /*可以通过HDFS提供的Service plugin来完成。Service plugin是HDFS提供给管理员的access point,
	     * 我们可以通过这个access point为HDFS提供一些扩展功能。同样地,创建一个关闭EditLog的plugin。 */
	    plugins = conf.getInstances("dfs.namenode.plugins", ServicePlugin.class);
	    for (ServicePlugin p: plugins) {
	      try {
	        p.start(this);
	      } catch (Throwable t) {
	        LOG.warn("ServicePlugin " + p + " could not be started", t);
	      }
	    }
	  }
首先获取RPC地址,读取xml文件中的属性dfs.namenode.rpc-address的值,如果所读到的值为空(不配置默认就为空),则使用fs.default.name属性的值替代,这个过程相关的代码很清晰。再读取属性dfs.namenode.handler.count的值,将其赋值给handlerCount变量,用于配置IPC服务中Handler线程的数量。然后创建FSNamesystem对象。接下来就创建并启动IPC服务器,在NameNode中有两个IPC服务器对象,分别是serviceRpcServer和server。其中如果配置了参数dfs.namenode.servicerpc-address,则会创建serviceRpcServer对象,如果没有配置dfs.namenode.servicerpc-address参数,但是需要创建serviceRpcServer对象,那么就使用dfs.namenode.rpc-address参数创建需要的地址,否则就不创建。serviceRpcServer和server的区别在于,当serviceRpcServer存在时,HDFS内部的IPC调用就使用serviceRpcServer对象来执行,客户端的IPC调用由server对象来执行,如果serviceRpcServer不创建,则所有的IPC调用都由server执行。因为客户端的IPC调用执行时间短,往往需要等待日志的持久化;而数据节点和第二名字节点调用的远程过程,执行逻辑比较复杂,执行时间也比较长。在一个繁忙的系统中,对客户端和HDFS的其他实体的IPC请求使用不同的IPC服务器,可以保证客户端的请求得到即使的响应。

接下来就调用startHttpServer()方法启动HTTP服务器,调用startTrashEmptier()方法启动垃圾桶功能,最后加载为NameNode配置的插件,由于编译HDFS代码之后就不能修改HDFS代码了,如果要给HDFS增加一些其他功能,可以通过HDFS提供的Service plugin来完成。Service plugin是HDFS提供给管理员的access point,所以可以通过这个access point为HDFS提供一些扩展功能(参考自http://langyu.iteye.com/blog/1165292)。

Reference

《Hadoop技术内幕:深入理解Hadoop Common和HDFS架构设计与实现原理》

https://hadoop.apache.org/docs/r1.2.1/commands_manual.html

http://langyu.iteye.com/blog/1165292

相关内容