Hadoop2.x ResourceManager启动之服务启动


YARN ResourceManager启动之服务启动
RM是个综合服务类,内部包含了多个服务,所有的服务被放在列表中,通过循环逐个启动,其他服务的列表如下:

每个服务的启动都遵循一定的流程,服务的启动流程如下:
1、ResourceManager.java中的serviceStart调用父类的serviceStart

    //ResourceManager.java
    protected void serviceStart() throws Exception {
			......
      super.serviceStart();
    }
2、父类CompositeService.serviceStart
  protected void serviceStart() throws Exception {
  	//获得服务列表
    List<Service> services = getServices();
    if (LOG.isDebugEnabled()) {
      LOG.debug(getName() + ": starting services, size=" + services.size());
    }
    //循环启动服务,每一次start调用最终都会进入服务本身的serviceStart函数
    for (Service service : services) {
      // start the service. If this fails that service
      // will be stopped and an exception raised
      service.start();
    }
    super.serviceStart();
  }
3、进入父类的父类AbstractService
public void start() {
		//服务是否已经启动?
    if (isInState(STATE.STARTED)) {
      return;
    }
    //enter the started state
    synchronized (stateChangeLock) {
      if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
        try {
        	//记录服务启动时间
          startTime = System.currentTimeMillis();
          //开始启动服务,此处会进入子类的函数+++
          serviceStart();
          //检测服务是否启动成功
          if (isInState(STATE.STARTED)) {
            //if the service started (and isn't now in a later state), notify
            if (LOG.isDebugEnabled()) {
              LOG.debug("Service " + getName() + " is started");
            }
            notifyListeners();
          }
        } catch (Exception e) {
          noteFailure(e);
          ServiceOperations.stopQuietly(LOG, this);
          throw ServiceStateException.convert(e);
        }
      }
    }
  }
4、最终进入子类serviceStart函数中启动服务
由此可以看出的由于服务的抽象对服务的统一管理带来了便利,如果后续再增加服务,只要按这个继承关系就可以将服务纳入统一管理了。
Token管理器服务线程启动

@Override
public void serviceStart() throws Exception {
	//Token管理器启动,具体作用以后分析,每个管理器由Timer驱动
  amRmTokenSecretManager.start();
  containerTokenSecretManager.start();
  nmTokenSecretManager.start();


  try {
  	//过期Token移除线程
    rmDTSecretManager.startThreads();
  } catch(IOException ie) {
    throw new YarnRuntimeException("Failed to start secret manager threads", ie);
  }
  super.serviceStart();
}
Ping Checker服务:AbstractLivelinessMonitor的内部类,循环遍历已记录的NodeManager列表,当发现某个节点超过一段时间未汇报,则认为他已经挂掉,在列表中删除。
  private class PingChecker implements Runnable {
    @Override
    public void run() {
      while (!stopped && !Thread.currentThread().isInterrupted()) {
        synchronized (AbstractLivelinessMonitor.this) {
        	//获得活动NM列表的迭代器
          Iterator<Map.Entry<O, Long>> iterator = 
            running.entrySet().iterator();


          //avoid calculating current time everytime in loop
          long currentTime = clock.getTime();
					//迭代每个节点,若发现节点超过expireInterval(yarn.nm.liveness-monitor.expiry-interval-ms控制,默认10分钟)
					//则认为他已经挂掉,删除该节点
          while (iterator.hasNext()) {
            Map.Entry<O, Long> entry = iterator.next();
            if (currentTime > entry.getValue() + expireInterval) {
              iterator.remove();
              expire(entry.getKey());
              LOG.info("Expired:" + entry.getKey().toString() + 
                      " Timed out after " + expireInterval/1000 + " secs");
            }
          }
        }
        try {
        	//线程暂停monitorInterval( expireInterval/3)
          Thread.sleep(monitorInterval);
        } catch (InterruptedException e) {
          LOG.info(getName() + " thread interrupted");
          break;
        }
      }
    }
  }
ResourceManager Event Processor服务:
private final class EventProcessor implements Runnable {
  @Override
  public void run() {


    SchedulerEvent event;


    while (!stopped && !Thread.currentThread().isInterrupted()) {
      try {
      	//取出事件
        event = eventQueue.take();
      } catch (InterruptedException e) {
        LOG.error("Returning, interrupted : " + e);
        return; // TODO: Kill RM.
      }


      try {
      	//处理事件
        scheduler.handle(event);
      } catch (Throwable t) {
				.....
      }
    }
  }
}
ResourceTrackerService服务:RPC服务器,实现了ResourceTracker接口,提供NM的注册和心跳服务
//ResourceTrackerService.java
@Override
protected void serviceStart() throws Exception {
  super.serviceStart();
  // ResourceTrackerServer authenticates NodeManager via Kerberos if
  // security is enabled, so no secretManager.
  //创建RPC服务器,该服务器实现ResourceTracker接口,handler数量由yarn.resourcemanager.
resource-tracker.client.thread-count控制
  Configuration conf = getConfig();
  YarnRPC rpc = YarnRPC.create(conf);
  this.server =
    rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
        conf, null,
        conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT, 
            YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
  
  // Enable service authorization?
  if (conf.getBoolean(
      CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
      false)) {
    refreshServiceAcls(conf, new RMPolicyProvider());
  }
	//服务启动
  this.server.start();
  conf.updateConnectAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
                         server.getListenerAddress());
}
RPC服务器组件启动,主要包括responder、listener、handler
public synchronized void start() {
  responder.start();
  listener.start();
  handlers = new Handler[handlerCount];
  
  for (int i = 0; i < handlerCount; i++) {
    handlers[i] = new Handler(i);
    handlers[i].start();
  }
}
服务于客户端的RPC server:ClientRMService,类似ResourceTrackerService,该服务器实现了ApplicationClientProtocol接口,RPC server的启动都一样,只是实现的协议不同
@Override
protected void serviceStart() throws Exception {
  Configuration conf = getConfig();
  YarnRPC rpc = YarnRPC.create(conf);
  //handler数量由yarn.resourcemanager.client.thread-count控制
  this.server =   
    rpc.getServer(ApplicationClientProtocol.class, this,
          clientBindAddress,
          conf, this.rmDTSecretManager,
          conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, 
              YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT));
  
  // Enable service authorization?
  if (conf.getBoolean(
      CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
      false)) {
    refreshServiceAcls(conf, new RMPolicyProvider());
  }
  
  this.server.start();
  clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
                                             server.getListenerAddress());
  super.serviceStart();
}
服务于管理员的RPC server:AdminService ,handler数量由yarn.resourcemanager.admin.client.thread-count控制,该服务实现ResourceManagerAdministrationProtocol接口
  protected void startServer() throws Exception {
    Configuration conf = getConfig();
    YarnRPC rpc = YarnRPC.create(conf);
    this.server = (Server) rpc.getServer(
        ResourceManagerAdministrationProtocol.class, this, masterServiceAddress,
        conf, null,
        conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
            YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
		.......
    this.server.start();
    conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
        server.getListenerAddress());
  }
AsyncDispatcher event handler服务的启动:
调用层次比较深,只关注关键部分,调用栈的顶层:
AsyncDispatcher类直接继承自AbstractService,服务启动时会先调用父类的同名函数

@Override
protected void serviceStart() throws Exception {
  //调用父类同名函数,实际啥都木有做,以后全局初始化之类的操作可能会放进去
  super.serviceStart();
  //创建一个新的线程,并启动,主要的业务关系包含在createThread函数中
  eventHandlingThread = new Thread(createThread());
  eventHandlingThread.setName("AsyncDispatcher event handler");
  eventHandlingThread.start();
}
下面看AsyncDispatcher的线程执行体,由上面的createThread创建,该线程会进入主循环,并一直等待事件队列,一旦有新的事件到达,便执行dispatch(event),将事件分发出去
Runnable createThread() {
  return new Runnable() {
    @Override
    public void run() {
    	//查看服务标识和线程状态
      while (!stopped && !Thread.currentThread().isInterrupted()) {
        drained = eventQueue.isEmpty();
        // blockNewEvents is only set when dispatcher is draining to stop,
        // adding this check is to avoid the overhead of acquiring the lock
        // and calling notify every time in the normal run of the loop.
        //加入该检测是防止事件过多导致该线程压力过大
        if (blockNewEvents) {
          synchronized (waitForDrained) {
            if (drained) {
              waitForDrained.notify();
            }
          }
        }
        Event event;
        try {
        	//在队列中取出事件
          event = eventQueue.take();
        } catch(InterruptedException ie) {
          if (!stopped) {
            LOG.warn("AsyncDispatcher thread interrupted", ie);
          }
          return;
        }
        if (event != null) {
        	//分发事件
          dispatch(event);
        }
      }
    }
  };
}
RM的服务类型还是比较多的,而且好多服务都是多线程的,比如RPCserver,默认的handler就有50个,而且有多个RPC server,RM中整体的服务列表服下:
Service org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService 
Service org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer 
Service AMLivelinessMonitor in state AMLivelinessMonitor: INITED
Service AMLivelinessMonitor in state AMLivelinessMonitor: INITED
Service org.apache.hadoop.yarn.server.resourcemanager.NodesListManager 
Service org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$SchedulerEventDispatcher 
Service NMLivelinessMonitor in state NMLivelinessMonitor: INITED
Service org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService 
Service org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService 
Service org.apache.hadoop.yarn.server.resourcemanager.ClientRMService 
Service org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher 
Service Dispatcher in state Dispatcher: INITED
Service org.apache.hadoop.yarn.server.resourcemanager.AdminService 


相关内容