Storm应用系列之——Spout、Bolt API
Storm应用系列之——Spout、Bolt API
前言:
昨天有朋友聊天说,我写的前面三篇太简单了,没有太多深入的东西。好吧,这说明我的目的达到了。我写这个系列的原因就是为了面向应用,进一步细化为两点:
1. 以例子说话,由简入深,一步步了解如何在Storm上开发应用,不会读起来吃力;
2. 对于一些原理性的东西,不去过于深究,只要记住Storm是这样实现的,开发的时候加以利用或规避。
在明白了这些基础的东西以后,如果对于原理性的东西Storm是如何实现的感兴趣,可以再去看源代码也不迟。毕竟这部分对开发应用的帮助并不直接。我认为,不必每个用Storm的人都必须了解Storm底层是如何实现的,当然,我会尝试在适当的位置插入相关原理性解释的链接,有兴趣可以直接去看看。就此原因,我把标题改成“Storm应用系列”。
注:转帖请注明,原帖地址:
http://blog.csdn.net/xeseo/article/details/17750379
Component
Storm中,Spout和Bolt都是其Component。所以,Storm定义了一个名叫IComponent的总接口全家普如下:
绿色部分是我们最常用、比较简单的部分。红色部分是与事务相关的,在以后的文章会具体讲解。
BaseComponent 是Storm提供的“偷懒”的类。为什么这么说呢,它及其子类,都或多或少实现了其接口定义的部分方法。这样我们在用的时候,可以直接继承该类,而不是自己每次都写所有的方法。但值得一提的是,BaseXXX这种定义的类,它所实现的方法,都是空的,直接返回null。
Spout
在前面基本例子中,我们实现了一个RandomSpout,来看看其类图
- Spout的最顶层抽象是ISpout接口。
结论:
Bolt
ExclaimBasicBolt的类图:这里可以看到一个奇怪的问题: 为什么IBasicBolt并没有继承IBolt? 我们带着问题往下看。
IBolt定义了三个方法:
- IBolt继承了java.io.Serializable,我们在nimbus上提交了topology以后,创建出来的bolt会序列化后发送到具体执行的worker上去。worker在执行该Bolt时,会先调用prepare方法传入当前执行的上下文
- cleanup 同ISpout的close方法,在关闭前调用。同样不保证其一定执行。
我们来再写一个Bolt继承BaseRichBolt替代ExclaimBasicBolt。代码如下:
public class ExclaimRichBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { this.collector.emit(tuple, new Values(tuple.getString(0)+"!")); this.collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("after_excl")); } }修改topology
//builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout"); builder.setBolt("exclaim", new ExclaimRichBolt(), 2).shuffleGrouping("spout");运行下,结果一致。
结论: 通常情况下,实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理结果反馈,可以实现IBasicBolt接口或继承BaseBasicBolt,它实际上相当于自动做掉了prepare方法和collector.emit.ack(inputTuple);
评论暂时关闭