Hadoop2.x NodeManager启动之服务初始化


  NM和RM类似,也是以服务的形式启动,但服务相对于RM来说少些,也经历服务初始化和服务启动两个阶段,NM继承自CompositeService。NM的服务初始化列表如下:


NM的启动入口
public static void main(String[] args) {
	//异常处理
  Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
  StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
  //创建初始NM
  NodeManager nodeManager = new NodeManager();
  //载入控制文件
  Configuration conf = new YarnConfiguration();
  setHttpPolicy(conf);
  //初始化并启动
  nodeManager.initAndStartNodeManager(conf, false);
}
下面进入initAndStartNodeManager函数,该函数主要包含两个功能:init和start
private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
    try {
			.....
			.....
      this.init(conf);//初始化
      this.start();//启动
    } catch (Throwable t) {
      LOG.fatal("Error starting NodeManager", t);
      System.exit(-1);
    }
  }
服务初始化最终进入NM的serviceInit函数,该函数比较清晰的描述了NM服务用到的子服务
@Override
protected void serviceInit(Configuration conf) throws Exception {


  conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
	//容器令牌管理器
  NMContainerTokenSecretManager containerTokenSecretManager =
      new NMContainerTokenSecretManager(conf);
	//NM令牌管理
  NMTokenSecretManagerInNM nmTokenSecretManager =
      new NMTokenSecretManagerInNM();
  //APP权限管理
  this.aclsManager = new ApplicationACLsManager(conf);
	//容器启动器,非常重要
  ContainerExecutor exec = ReflectionUtils.newInstance(
      conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
        DefaultContainerExecutor.class, ContainerExecutor.class), conf);
  try {
    exec.init();
  } catch (IOException e) {
    throw new YarnRuntimeException("Failed to initialize container executor", e);
  }    
  //文件清理服务
  DeletionService del = createDeletionService(exec);
  addService(del);


  // NodeManager level dispatcher
  //事件分发器
  this.dispatcher = new AsyncDispatcher();
  //节点健康检查
  nodeHealthChecker = new NodeHealthCheckerService();
  addService(nodeHealthChecker);
  dirsHandler = nodeHealthChecker.getDiskHandler();


  this.context = createNMContext(containerTokenSecretManager,
      nmTokenSecretManager);
  //状态更新服务
  nodeStatusUpdater =
      createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
	//节点资源监控
  NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
  addService(nodeResourceMonitor);
	//容器管理器
  containerManager =
      createContainerManager(context, exec, del, nodeStatusUpdater,
      this.aclsManager, dirsHandler);
  addService(containerManager);
  ((NMContext) context).setContainerManager(containerManager);
	//web服务
  WebServer webServer = createWebServer(context, containerManager
      .getContainersMonitor(), this.aclsManager, dirsHandler);
  addService(webServer);
  ((NMContext) context).setWebServer(webServer);


  dispatcher.register(ContainerManagerEventType.class, containerManager);
  dispatcher.register(NodeManagerEventType.class, this);
  addService(dispatcher);
  //指标监控系统
  DefaultMetricsSystem.initialize("NodeManager");


  // StatusUpdater should be added last so that it get started last 
  // so that we make sure everything is up before registering with RM. 
  addService(nodeStatusUpdater);
  //循环初始化上述服务
  super.serviceInit(conf);
  // TODO add local dirs to del
}
DeletionService服务:主要用于删除文件,该服务启动了一个线程池,默认核心数为4,有文件需要删除时会构建FileDeletionTask放入线程池
@Override
protected void serviceInit(Configuration conf) throws Exception {
	//创建线程工厂
  ThreadFactory tf = new ThreadFactoryBuilder()
    .setNameFormat("DeletionService #%d")
    .build();
  if (conf != null) {
  	//构建线程池
    sched = new ScheduledThreadPoolExecutor(
        conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT),
        tf);
    debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
  } else {
    sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT,
        tf);
  }
  sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
  sched.setKeepAliveTime(60L, SECONDS);
  super.serviceInit(conf);
}

ContainerManager:容器管理器,相关参数yarn.nodemanager.sleep-delay-before-sigkill.ms yarn.nodemanager.process-kill-wait.ms,包含多个子服务,列表如下:


//ContainerManagerImpl.java
@Override
public void serviceInit(Configuration conf) throws Exception {
  LogHandler logHandler =
    createLogHandler(conf, this.context, this.deletionService);
  addIfService(logHandler);
  dispatcher.register(LogHandlerEventType.class, logHandler);
  
  waitForContainersOnShutdownMillis =
      conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
          YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
      conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
          YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
      SHUTDOWN_CLEANUP_SLOP_MS;
  //此处会启动各个子服务
  super.serviceInit(conf);
}
ContainerManager之ResourceLocalizationService:负责本地初始化,建立相应目录,相关参数yarn.nodemanager.localizer.cache.target-size-mb yarn.nodemanager.localizer.cache.cleanup.interval-ms 
@Override
public void serviceInit(Configuration conf) throws Exception {
  this.validateConf(conf);
  this.publicRsrc =
      new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
  this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);


  try {
    FileContext lfs = getLocalFileContext(conf);
    lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
		//清理本地目录
    cleanUpLocalDir(lfs,delService);
		//建立初始化目录
    List<String> localDirs = dirsHandler.getLocalDirs();
    for (String localDir : localDirs) {
      // $local/usercache
      Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE);
      lfs.mkdir(userDir, null, true);
      // $local/filecache
      Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE);
      lfs.mkdir(fileDir, null, true);
      // $local/nmPrivate
      Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
      lfs.mkdir(sysDir, NM_PRIVATE_PERM, true);
    }
		//日志目录创建
    List<String> logDirs = dirsHandler.getLogDirs();
    for (String logDir : logDirs) {
      lfs.mkdir(new Path(logDir), null, true);
    }
  } catch (IOException e) {
    throw new YarnRuntimeException("Failed to initialize LocalizationService", e);
  }
  //初始化相关参数设置
  cacheTargetSize =
    conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_TARGET_SIZE_MB, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB) << 20;
  cacheCleanupPeriod =
    conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
  localizationServerAddress = conf.getSocketAddr(
      YarnConfiguration.NM_LOCALIZER_ADDRESS,
      YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
      YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);


  localizerTracker = createLocalizerTracker(conf);
  addService(localizerTracker);
  dispatcher.register(LocalizerEventType.class, localizerTracker);
  super.serviceInit(conf);
}
ContainerManager之containers-launcher:容器启动服务,初始化过程比较简单,如下
@Override
protected void serviceInit(Configuration conf) throws Exception {
  try {
    //TODO Is this required?
    FileContext.getLocalFSFileContext(conf);
  } catch (UnsupportedFileSystemException e) {
    throw new YarnRuntimeException("Failed to start ContainersLauncher", e);
  }
  super.serviceInit(conf);
}
ContainerManager之containers-monitor:容器状态监控,主要是内存合理性方面的校验
@Override
protected void serviceInit(Configuration conf) throws Exception {
	//监控频率设定
  this.monitoringInterval =
      conf.getLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS,
          YarnConfiguration.DEFAULT_NM_CONTAINER_MON_INTERVAL_MS);
	//资源计算类,可由yarn.nodemanager.container-monitor.resource-calculator.class指定
  Class<? extends ResourceCalculatorPlugin> clazz =
      conf.getClass(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, null,
          ResourceCalculatorPlugin.class);
  this.resourceCalculatorPlugin =
      ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
  LOG.info(" Using ResourceCalculatorPlugin : "
      + this.resourceCalculatorPlugin);
  processTreeClass = conf.getClass(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, null,
          ResourceCalculatorProcessTree.class);
  this.conf = conf;
  LOG.info(" Using ResourceCalculatorProcessTree : "
      + this.processTreeClass);
	//容器运行内存设置
  long configuredPMemForContainers = conf.getLong(
      YarnConfiguration.NM_PMEM_MB,
      YarnConfiguration.DEFAULT_NM_PMEM_MB) * 1024 * 1024l;


  // Setting these irrespective of whether checks are enabled. Required in
  // the UI.
  // ///////// Physical memory configuration //////
  //最大物理内存
  this.maxPmemAllottedForContainers = configuredPMemForContainers;


  // ///////// Virtual memory configuration //////
  //最大虚拟内存计算
  float vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
      YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
  Preconditions.checkArgument(vmemRatio > 0.99f,
      YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0");
  this.maxVmemAllottedForContainers =
      (long) (vmemRatio * configuredPMemForContainers);
  //以下都是内存检测操作,这些操作通过资源计算插件resourceCalculatorPlugin来实现
  pmemCheckEnabled = conf.getBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED,
      YarnConfiguration.DEFAULT_NM_PMEM_CHECK_ENABLED);
  vmemCheckEnabled = conf.getBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED,
      YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED);
  LOG.info("Physical memory check enabled: " + pmemCheckEnabled);
  LOG.info("Virtual memory check enabled: " + vmemCheckEnabled);
	//物理内存计算
  if (pmemCheckEnabled) {
    // Logging if actual pmem cannot be determined.
    long totalPhysicalMemoryOnNM = UNKNOWN_MEMORY_LIMIT;
    if (this.resourceCalculatorPlugin != null) {
      totalPhysicalMemoryOnNM = this.resourceCalculatorPlugin
          .getPhysicalMemorySize();
      if (totalPhysicalMemoryOnNM <= 0) {
        LOG.warn("NodeManager's totalPmem could not be calculated. "
            + "Setting it to " + UNKNOWN_MEMORY_LIMIT);
        totalPhysicalMemoryOnNM = UNKNOWN_MEMORY_LIMIT;
      }
    }
		//最大使用内存超过物理内存的80%,则发出警告,因为这很有可能引起OOM
    if (totalPhysicalMemoryOnNM != UNKNOWN_MEMORY_LIMIT &&
        this.maxPmemAllottedForContainers > totalPhysicalMemoryOnNM * 0.80f) {
      LOG.warn("NodeManager configured with "
          + TraditionalBinaryPrefix.long2String(maxPmemAllottedForContainers,
              "", 1)
          + " physical memory allocated to containers, which is more than "
          + "80% of the total physical memory available ("
          + TraditionalBinaryPrefix.long2String(totalPhysicalMemoryOnNM, "",
              1) + "). Thrashing might happen.");
    }
  }
  super.serviceInit(conf);
}
NodeStatusUpdater服务:
@Override
protected void serviceInit(Configuration conf) throws Exception {
	//获得内存和虚拟CPU核心相关配置
  int memoryMb = 
      conf.getInt(
          YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
  float vMemToPMem =             
      conf.getFloat(
          YarnConfiguration.NM_VMEM_PMEM_RATIO, 
          YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); 
  int virtualMemoryMb = (int)Math.ceil(memoryMb * vMemToPMem);
  
  int virtualCores =
      conf.getInt(
          YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
	//记录上述资源配置
  this.totalResource = Resource.newInstance(memoryMb, virtualCores);
  //将资源记录至指标监控系统
  metrics.addResource(totalResource);
  this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
  this.tokenRemovalDelayMs =
      conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
          YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);


  this.minimumResourceManagerVersion = conf.get(
      YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
      YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
  
  // Default duration to track stopped containers on nodemanager is 10Min.
  // This should not be assigned very large value as it will remember all the
  // containers stopped during that time.
  //记录执行完毕容器的时间间隔,默认10分钟
  durationToTrackStoppedContainers =
      conf.getLong(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
        600000);
  if (durationToTrackStoppedContainers < 0) {
    String message = "Invalid configuration for "
      + YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " default "
        + "value is 10Min(600000).";
    LOG.error(message);
    throw new YarnException(message);
  }
  if (LOG.isDebugEnabled()) {
    LOG.debug(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " :"
      + durationToTrackStoppedContainers);
  }
  super.serviceInit(conf);
  LOG.info("Initialized nodemanager for " + nodeId + ":" +
      " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
      " virtual-cores=" + virtualCores);
}

相关内容