Storm应用系列之——可靠性与acker机制


本文属原创系列,转载请注明。

转自:http://blog.csdn.net/xeseo/article/details/17754825

” ——可靠性

@Override
    public void ack(Object msgId) {
		System.err.println("ack " + msgId);
    }

    @Override
    public void fail(Object msgId) {
    	System.err.println("fail " + msgId);
    }

重新运行ExclaimBasicTopo,看下结果。并没有任何的ack 和 fail 出现?

public class RandomSpout extends BaseRichSpout {

	private SpoutOutputCollector collector;

	private Random rand;
	
	private AtomicInteger counter;
	
	private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"};
	
	@Override
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		this.collector = collector;
		this.rand = new Random();
		counter = new AtomicInteger();
	}

	@Override
	public void nextTuple() {
		Utils.sleep(5000);
		String toSay = sentences[rand.nextInt(sentences.length)];
		int msgId = this.counter.getAndIncrement();
		toSay = "["+ msgId + "]"+ toSay;
		PrintHelper.print("Send " + toSay );
		
		this.collector.emit(new Values(toSay), msgId);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("sentence"));
	}
	
	@Override
    public void ack(Object msgId) {
		PrintHelper.print("ack " + msgId);
    }

    @Override
    public void fail(Object msgId) {
    	PrintHelper.print("fail " + msgId);
    }

}

public class PrintHelper {

	private static SimpleDateFormat sf = new SimpleDateFormat("mm:ss:SSS");
	
	public static void print(String out){
		System.err.println(sf.format(new Date()) + " [" + Thread.currentThread().getName() + "] " + out);
	}
	
}


看下打印结果:

53:33:891 [Thread-26-spout] Send [0]ted:I'm excited
53:33:896 [Thread-20-print] Bolt[0] String recieved: [0]ted:I'm excited!
53:38:895 [Thread-26-spout] Send [1]edi:I'm happy
53:38:895 [Thread-22-print] Bolt[1] String recieved: [1]edi:I'm happy!
53:38:895 [Thread-26-spout] ack 0
53:43:896 [Thread-26-spout] Send [2]edi:I'm happy
53:43:896 [Thread-22-print] Bolt[1] String recieved: [2]edi:I'm happy!
53:43:896 [Thread-26-spout] ack 1
53:48:896 [Thread-26-spout] Send [3]edi:I'm happy
53:48:896 [Thread-26-spout] ack 2
53:48:896 [Thread-24-print] Bolt[2] String recieved: [3]edi:I'm happy!
53:53:896 [Thread-26-spout] Send [4]ted:I'm excited
53:53:896 [Thread-26-spout] ack 3
53:53:896 [Thread-20-print] Bolt[0] String recieved: [4]ted:I'm excited!
53:58:897 [Thread-26-spout] Send [5]laden:I'm dangerous
53:58:897 [Thread-26-spout] ack 4
53:58:898 [Thread-24-print] Bolt[2] String recieved: [5]laden:I'm dangerous!

很明显看到:

Tuple树

Anchor

Tuple的ack

,看到这里,你应该能猜出其实就是Storm里面track一个保证其一定被处理的功能。也是一个

的工作流程:

在初始化时会产生一个tasksId;

中创建新的,其id是一个64位的随机数;

将新建的发送出去(开启的追踪), 同时

  • 的taskId:用户acker在整个tuple树被完全处理后找到原始的进行回调ack或fail

后,如果发射了一个新的,Storm会维护的列表;

时,Storm会做如下操作:

  • 列表中每个已经过的和新创建的的id做异或()。假定发出的TupleID是,该新生成的TupleID为,那么,XORtuple-id-0XOR tuple-id-1
  • ,然后把上面异或后得出的

收到新的ack val值后,与保存的原始的的id进行异或,如果为0,表示该已被完全处理,则根据其taskId找到原始的,回调其ack()方法。

的fail方法。

Unanchor

调整可靠性

在某些特定的情况下,你或许想调整Storm的可靠性。例如,你并不关心数据是否丢失,或者你想看看后面是否有某个Bolt拖慢了Spout的速度? 那么,有三种方法可以实现: 1. 在build topology时,设置acker数目为0,即conf.setNumAckers(0); 2. 在Spout中,不指定messageId,使得Storm无法追踪; 3. 在Bolt中,使用Unanchor方式发射新的Tuple。

相关内容