storm基础框架分析,storm基础框架


背景

前期收到的问题:

1、在Topology中我们可以指定spout、bolt的并行度,在提交Topology时Storm如何将spout、bolt自动发布到每个服务器并且控制服务的CPU、磁盘等资源的?

2、Storm处理消息时会根据Topology生成一棵消息树,Storm如何跟踪每个消息、如何保证消息不丢失以及如何实现重发消息机制?

上篇:storm是如何保证at least once语义的
回答了第2个问题。

本篇来建立一个基本的背景,来大概看下构成storm流式计算能力的一些基础框架,并部分回答第一个问题。

worker、executor、task的关系

这里写图片描述

supervisor会定时从zookeeper获取拓补信息topologies、任务分配信息assignments及各类心跳信息,以此为依据进行任务分配。

在supervisor同步时,会根据新的任务分配情况来启动新的worker或者关闭旧的worker并进行负载均衡。

worker通过定期的更新connections信息,来获知其应该通讯的其它worker。

worker启动时,会根据其分配到的任务启动一个或多个executor线程。这些线程仅会处理唯一的topology。
如果有新的tolopogy被提交到集群,nimbus会重新分配任务,这个后面会说到。

executor线程负责处理多个spouts或者多个bolts的逻辑,这些spouts或者bolts,也称为tasks。

具体有多少个worker,多少个executor,每个executor负责多少个task,是由配置和指定的parallelism-hint共同决定的,但这个值并不一定等于实际运行中的数目。

如果计算出的总的executors超过了nimbus的限制,此topology将不会得到执行。

并行度的作用:

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; 计算所有tolopogy的topology-id到executors的映射
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- compute-topology->executors [nimbus storm-ids]
  "compute a topology-id -> executors map"
  (into {} (for [tid storm-ids]
             {tid (set (compute-executors nimbus tid))})))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; 计算topology-id到executors的映射
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- compute-executors [nimbus storm-id]
  (let [conf (:conf nimbus)
        storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
        component->executors (:component->executors storm-base)
        storm-conf (read-storm-conf conf storm-id)
        topology (read-storm-topology conf storm-id)
        task->component (storm-task-info topology storm-conf)]
    (->> (storm-task-info topology storm-conf)
         reverse-map
         (map-val sort)
         (join-maps component->executors)
         (map-val (partial apply partition-fixed))
         (mapcat second)
         (map to-executor-id)
         )))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; 计算topology的task-info
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn storm-task-info
  "Returns map from task -> component id"
  [^StormTopology user-topology storm-conf]
  (->> (system-topology! storm-conf user-topology)
       all-components
    ;; 获取每个组件的并行数
       (map-val (comp #(get % TOPOLOGY-TASKS) component-conf))
       (sort-by first)
       (mapcat (fn [[c num-tasks]] (repeat num-tasks c)))
       (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1)))
       (into {})
       ))

上述代码会在nimbus进行任务分配时调用:

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; nimbus进行任务分配
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
mk-assignments
->compute-new-topology->executor->node+port
->compute-topology->executors
-> ...

线程模型及消息系统

基本关系如下所示:

这里写图片描述

所谓本地发布,是指在worker进程内及executor线程间进行消息发布。
所谓远程发布,是指在worker进程间、不同的机器间进行消息发布。

任务调度及负载均衡

任务调度的主要角色

集群的状态机:

这里写图片描述

集群状态管理

集群的状态是通过一个storm-cluster-state的对象来描述的。
其提供了许多功能接口,比如:

如下图所示:

这里写图片描述

任务调度的依据

所以,nimbus会根据心跳、topologies信息及已分配的任务信息为依据,来重新分配任务,如下图所示:

这里写图片描述

任务调度的时机

topology提交过程

一个topology的提交过程:

主要过程如下图所示:

这里写图片描述

结语

以上,基本阐述了storm的基础框架,但未涉及trident机制,也基本回答了问题1。

终。

版权声明:本文为博主原创文章,未经博主允许不得转载。

相关内容