JStorm之NimbusServer启动流程,jstormnimbusserver


   NimbusServer相当于hadoop里的JobTracker或yarn里的ResourceManager,在集群中属于首脑地位,负责分发任务,监控集群状态,与supervisor的通信主要通过Zookeeper。nimbus在启动过程中会做以下工作,以保证集群稳定运行:
1、清理无效topology
2、建立zk连接并创建相应znode
3、启动监控线程
4、启动httpserver
启动主函数如下:
public static void main(String[] args) throws Exception {
		// 读取配置文件,如果我们在JVM启动参数中包含了storm.conf.file系统属性
		//则会直接读取参数指定的文件,否则会找strom.yaml,这个一个yaml格式的文件
		//读取出的配置信息最终放入Map中
		@SuppressWarnings("rawtypes")
		Map config = Utils.readStormConfig();
		NimbusServer instance = new NimbusServer();
		//处理资源分配的工具
		INimbus iNimbus = new DefaultInimbus();
		//主要启动流程包含此函数中
		instance.launchServer(config, iNimbus);
	}
nimbus启动的主方法如下:
private void launchServer(final Map conf, INimbus inimbus) {
		LOG.info("Begin to start nimbus with conf " + conf);

		try {
			// 检测集群配置是本地模式还是集群模式
			StormConfig.validate_distributed_mode(conf);
			//清理pid目录,由参数storm.local.dir指定,该过程包含如下操作
			//1、创建本地进程的pid文件
			//2、删除历史pid文件,比如上次进程直接被kill
			createPid(conf);
			//初始化关闭钩子线程:里面包含各种清理操作,可参考cleanup方法
			initShutdownHook();
			//初始化ininumbus目录
			inimbus.prepare(conf, StormConfig.masterInimbus(conf));
			//创建数据容器,存放各种统计信息例如提交此俗、集群状态、启动时间,参考下面NimbusData类成员
			data = createNimbusData(conf, inimbus);
			//启动fllower线程不断更新zk状态,如果发现自己不是leader则会关闭进程
			initFollowerThread(conf);
			//启动http服务器
			int port = ConfigExtension.getNimbusDeamonHttpserverPort(conf);
			hs = new Httpserver(port, conf);
			hs.start();
			//如果不是运行在yarn类似的容器中,不用关心此方法
			initContainerHBThread(conf);
		
			while (!data.isLeader())
				Utils.sleep(5000);
			
			//初始换监控线程
			initUploadMetricThread(data);
			//进行初始化操作,该方法会清理zk中残留的topology信息,初始化TopologyAssign线程负责分配任务
			init(conf);
		} catch (Throwable e) {
			LOG.error("Fail to run nimbus ", e);
		} finally {
			cleanup();
		}


		LOG.info("Quit nimbus");
	}
NimbusServer的init方法如下:
private void init(Map conf) throws Exception {


		NimbusUtils.cleanupCorruptTopologies(data);
		//初始化TopologyAssign线程
		initTopologyAssign();


		initTopologyStatus();
		//jar文件清理线程,默认每10分钟会清理一次,清理目录:$LOCAL-DIR/nimbus/inbox
		initCleaner(conf);


		serviceHandler = new ServiceHandler(data);


		if (!data.isLocalMode()) {
			//监控各task运行状态,发现dead状态的则会清理掉释放资源
			initMonitor(conf);
			//启动Thrift server
			initThrift(conf);


		}
	}
最后附上NimbusData的数据结构:
	public class NimbusData {


		private Map<Object, Object> conf;
	
		private StormClusterState stormClusterState;
	
		// Map<topologyId, Map<taskid, TkHbCacheTime>>
		private ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> taskHeartbeatsCache;
	
		// TODO two kind of value:Channel/BufferFileInputStream
		private TimeCacheMap<Object, Object> downloaders;
		private TimeCacheMap<Object, Object> uploaders;
	
		private int startTime;
	
		private final ScheduledExecutorService scheduExec;
	
		private AtomicInteger submittedCount;
	
		private Object submitLock = new Object();
	
		private StatusTransition statusTransition;
	
		private static final int SCHEDULE_THREAD_NUM = 8;
	
		private final INimbus inimubs;
	
		private Map<String, Map<String, Map<ThriftResourceType, Integer>>> groupToTopology;
	
		private Map<String, Map<ThriftResourceType, Integer>> groupToResource;
	
		private Map<String, Map<ThriftResourceType, Integer>> groupToUsedResource;
		
		private final boolean localMode;
		
		private volatile boolean isLeader;
		
		...........	
	}


相关内容