storm是如何保证at least once语义的,stormleast


背景

本篇看看storm是通过什么机制来保证消息至少处理一次的语义的。

storm中的一些原语

这里写图片描述
要说明上面的问题,得先了解storm中的一些原语,比如:

spout、bolt、acker的关系

简单的关系如下所示:

这里写图片描述

上图展示了spout、bolts等形成了一个DAG,如何追踪这个DAG的执行过程,就是storm保证仅处理一次消息的语义的机制所在。

storm如何追踪消息(tuple)的处理

这里写图片描述

spout在调用emit/emitDirect方法发送tuple时,会以单播或者广播的方式,将消息发送给流的下游的component/task/bolt,如果配置了acker,那么会在每次emit调用之后,向acker发送请求ack的消息:

;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; spout向acker发送请求ack消息
;;;;;;;;;;;;;;;;;;;;;;;;;;;;

;; rooted?表示是否设置了acker
(if (and rooted?
        (not (.isEmpty out-ids)))
 (do
   (.put pending root-id [task-id
                          message-id
                          {:stream out-stream-id :values values}
                          (if (sampler) (System/currentTimeMillis))])
   (task/send-unanchored task-data
                         ;;表示这是一个流初始化的消息
                         ACKER-INIT-STREAM-ID 
                         ;;将下游组件的out-id和0组成一个异或链,发送给acker用于追踪
                         [root-id (bit-xor-vals out-ids) task-id] 
                         overflow-buffer))

 ;; 如果没有配置acker,则调用自身的ack方法
 (when message-id
   (ack-spout-msg executor-data task-data message-id
                  {:stream out-stream-id :values values}
                  (if (sampler) 0) "0:")))

从上面的代码可以看出,每次emit tuple后,spout会向acker发送一个流ID为ACKER-INIT-STREAM-ID的消息,用于将DAG或者tuple-tree中的节点信息交给acker,acker会利用这个信息来追踪tuple-tree或DAG的完成。

而spout调用emit/emitDirect方法,将tuple发到下游的bolts,也同时会发送用于追踪DAG完成情况的信息:

;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; spout向流的下游emit消息
;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(let [tuple-id (if rooted?
                ;; 如果有acker,tuple的MessageId会包含一个<root-id,id>的哈希表
                ;; root-id和id都是long型64位整数
                (MessageId/makeRootId root-id id)
                (MessageId/makeUnanchored))
     ;;实例化tuple
     out-tuple (TupleImpl. worker-context
                           values
                           task-id
                           out-stream-id
                           tuple-id)]

 ;; 发送至队列,最终发送给流的下游的task/bolt
 (transfer-fn out-task
              out-tuple
              overflow-buffer)
 ))

这个追踪信息是什么呢?

这里写图片描述

如果是spout -> bolt或者bolt -> bolt,这个信息就是tuple的MessageId,其内部维护一个哈希表:

// map anchor to id
private Map<Long, Long> _anchorsToIds;

键为root-id,表示spout,值表示tuple在tuple-tree或者DAG的根(spout)或者经过的边(bolt),但这里没有利用任何常规意义上的“树”的算法,而是采用异或的方式来存储这个值:

如果是spout -> acker,或者bolt -> acker,那么用于追踪的是tuple的values:

下面给出上面调用的bit-xor-vals和bit-xor方法的代码:

(defn bit-xor-vals
  [vals]
  (reduce bit-xor 0 vals))

(defn bit-xor
  "Bitwise exclusive or"
  {:inline (nary-inline 'xor)
   :inline-arities >1?
   :added "1.0"}
  ([x y] (. clojure.lang.Numbers xor x y))
  ([x y & more]
    (reduce1 bit-xor (bit-xor x y) more)))

示例

说起来有点抽象,看个例子。

假设我们有1个spout,n个bolt,1个acker:

1.spout

spout发送tuple到下游的bolts:

;; id_1是发送到bolt_1的tuple-id,依此类推
spout : 
  ->bolt_1 : id_1
  ->bolt_2 : id_2
  ..
  ->bolt_n : id_n

2.bolt

bolt收到tuple,在execute方法中进行必要的处理,然后调用emit方法,最后调用ack方法:

;; bolt_1调用emit方法,追踪消息的这样一个值:让id_1和bid_1按位进行异或.
;; bid_1和id_1类似,是个long型的64位随机整数,在emit这一步生成
  bolt_1 emit : id_1 ^ bid_1

;; bolt_1调用ack方法,并将值表达为如下方式的异或链的结果
  bolt_1 ack : 0 ^ bid_1 ^ id_1 ^ bid_1 = 0 ^ id_1

以上,可以看出bolt进行了emit-ack组合后,其自身在异或链中的作用消失了,也就是说tuple在此bolt得到了处理。

(当然,此时的ack还没有得到acker的确认,假设acker确认了,那么上面所说的tuple在bolt得到了处理就成立了。)

来看看acker的确认。

3.acker

acker收到来自spout的tuple:

;; spout发消息给acker,tuple的MessageId包含下面的异或链的结果
spout -> acker : 0 ^ id_1 ^ id_2 ^ .. ^ id_n

;; acker收到来spout的消息,对tuple的ackVal进行处理,如下所示:
acker : 0 ^ (0 ^ id_1 ^ id_2 ^ .. ^ id_n) = 0 ^ id_1 ^ id_2 ^ .. ^ id_n

acker收到来自bolt的tuple:

;; bolt_1发消息给acker:
bolt_1 -> acker : 0 ^ id_1

;; acker维护的对应此tuple的源spout的ackVal : 
ackVal : 0 ^ id_1 ^ id_2 ^ .. ^ id_n

;; acker进行确认,也就是拿上面的两个值进行异或:
acker : (0 ^ id_1) ^ (0 ^ id_1 ^ id_2 ^ .. ^ id_n) = 0 ^ id_2 ^ .. ^ id_n

可以看出,bolt_1向acker请求ack,acker收到请求ack,异或之后,id_1的作用消失。也就是说,bolt_1已处理完毕这个tuple。

所以,在acker看来,如果某个bolt的处理完成,则此bolt在异或链中的作用就消失了。

如果所有的bolt 都得到处理,那么acker将会观察到ackVal值变成了0:

ackVal = 0
= (0 ^ id_1) ^ (0 ^ id_1 ^ .. ^ id_n) ^ .. (0 ^ id_n) 
= (0 ^ 0) ^ (id_1 ^ id_1) ^ (id_2 ^ id_2) ^ .. ^ (id_n ^ id_n)

如果出现了ackVal = 0,说明两个可能:

如果ackVal不为0,说明tuple-tree或DAG没有完成。如果长时间不为0,通过超时,可以触发一个超时回调,在这个回调中调用spout的fail方法,来进行重放。

如此,就保证了消息处理不会漏掉,但可能会重复。

这里写图片描述

结语

以上,就是storm保证消息至少处理一次的语义的机制 。

版权声明:本文为博主原创文章,未经博主允许不得转载。

相关内容