Storm应用系列之——可靠性与acker机制
Storm应用系列之——可靠性与acker机制
本文属原创系列,转载请注明。
转自:http://blog.csdn.net/xeseo/article/details/17754825
” ——可靠性
@Override public void ack(Object msgId) { System.err.println("ack " + msgId); } @Override public void fail(Object msgId) { System.err.println("fail " + msgId); }
重新运行ExclaimBasicTopo,看下结果。并没有任何的ack 和 fail 出现?
public class RandomSpout extends BaseRichSpout { private SpoutOutputCollector collector; private Random rand; private AtomicInteger counter; private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"}; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.rand = new Random(); counter = new AtomicInteger(); } @Override public void nextTuple() { Utils.sleep(5000); String toSay = sentences[rand.nextInt(sentences.length)]; int msgId = this.counter.getAndIncrement(); toSay = "["+ msgId + "]"+ toSay; PrintHelper.print("Send " + toSay ); this.collector.emit(new Values(toSay), msgId); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } @Override public void ack(Object msgId) { PrintHelper.print("ack " + msgId); } @Override public void fail(Object msgId) { PrintHelper.print("fail " + msgId); } }
public class PrintHelper { private static SimpleDateFormat sf = new SimpleDateFormat("mm:ss:SSS"); public static void print(String out){ System.err.println(sf.format(new Date()) + " [" + Thread.currentThread().getName() + "] " + out); } }
看下打印结果:
53:33:891 [Thread-26-spout] Send [0]ted:I'm excited 53:33:896 [Thread-20-print] Bolt[0] String recieved: [0]ted:I'm excited! 53:38:895 [Thread-26-spout] Send [1]edi:I'm happy 53:38:895 [Thread-22-print] Bolt[1] String recieved: [1]edi:I'm happy! 53:38:895 [Thread-26-spout] ack 0 53:43:896 [Thread-26-spout] Send [2]edi:I'm happy 53:43:896 [Thread-22-print] Bolt[1] String recieved: [2]edi:I'm happy! 53:43:896 [Thread-26-spout] ack 1 53:48:896 [Thread-26-spout] Send [3]edi:I'm happy 53:48:896 [Thread-26-spout] ack 2 53:48:896 [Thread-24-print] Bolt[2] String recieved: [3]edi:I'm happy! 53:53:896 [Thread-26-spout] Send [4]ted:I'm excited 53:53:896 [Thread-26-spout] ack 3 53:53:896 [Thread-20-print] Bolt[0] String recieved: [4]ted:I'm excited! 53:58:897 [Thread-26-spout] Send [5]laden:I'm dangerous 53:58:897 [Thread-26-spout] ack 4 53:58:898 [Thread-24-print] Bolt[2] String recieved: [5]laden:I'm dangerous!
很明显看到:
Tuple树
Anchor
Tuple的ack
,看到这里,你应该能猜出其实就是Storm里面track一个保证其一定被处理的功能。也是一个。
的工作流程:
在初始化时会产生一个tasksId;
中创建新的,其id是一个64位的随机数;
将新建的发送出去(开启的追踪), 同时
- 的taskId:用户acker在整个tuple树被完全处理后找到原始的进行回调ack或fail
后,如果发射了一个新的,Storm会维护的列表;
时,Storm会做如下操作:
- 列表中每个已经过的和新创建的的id做异或()。假定发出的TupleID是,该新生成的TupleID为,那么,XORtuple-id-0XOR tuple-id-1
- ,然后把上面异或后得出的
收到新的ack val值后,与保存的原始的的id进行异或,如果为0,表示该已被完全处理,则根据其taskId找到原始的,回调其ack()方法。
的fail方法。
Unanchor
调整可靠性
在某些特定的情况下,你或许想调整Storm的可靠性。例如,你并不关心数据是否丢失,或者你想看看后面是否有某个Bolt拖慢了Spout的速度? 那么,有三种方法可以实现: 1. 在build topology时,设置acker数目为0,即conf.setNumAckers(0); 2. 在Spout中,不指定messageId,使得Storm无法追踪; 3. 在Bolt中,使用Unanchor方式发射新的Tuple。
评论暂时关闭