storm流分组,storm分组


流分组 
在设计一个topology的时候,你需要做的最重要的事情是定义数据在组件之间怎样交换(流怎样被bolts消费)。流分组指定了每个bolt消费哪些流和这些流被怎样消费。 
一个结点可以发射不止一条数据流。流分组允许我们选择接收哪些流。 
正如我们在第二章看到的,当topology被定义的时候流分组就被设置好了: 
... 
builder.setBolt("word-normalizer", new WordNormalizer()) 
.shuffleGrouping("word-reader"); ... 
在上述代码块中,在topology builder上设置了一个bolt,然后源被设置为shuffle分组。流分组通常使用源组件的ID作为参数,并且也会选择性的使用其他参数,取决于流分组的种类。 
每个InputDeclare可以有不止一个源,同时每个源可以用不同的流分组来分组。 
Shuffle分组 
Shuffle分组是最常用的分组方式。它使用一个参数(源组件),源组件会发射元组到一个随机
选择的bolt并确保每个消费者会收到等数量的元组。 
Shuffle分组对于做原子操作例如数学操作是很有用的。然而,如果操作不能被随机分布,就像第二章中的你需要计数单词的示例,你应用考虑使用其他的分组。 Fields分组 
Fields分组允许你基于元组的一个或多个域来控制元组怎样被发送到bolts。它确保一个联合域中给定的值集合总是会被送到相同的bolt。回到单词计数的示例,如果你通过单词域分组流,word-normalizer bolt总是会将元组和给定的单词一起发送到相同的word-counter bolt实例中。 
... 
builder.setBolt("word-counter", new WordCounter(),2) 
.fieldsGrouping("word-normalizer", new Fields("word")); ... 
fields分组中的所有的域必须在源组件中被声明。 
All分组 
All分组发送每个元组的一份单独拷贝到接收bolt的所有实例上。这种分组被用来向bolts发送信号。例如,如果你需要刷新缓存,你可以发送一个刷新缓存信号到所有的bolts。在单词计数的示例中,你可以通过使用all分组来增加清空计数器缓存的功能(见Topologies示例)。 
public void execute(Tuple input) { 
String str = null; 
try{ 
if(input.getSourceStreamId().equals("signals")){ 
str = input.getStringByField("action"); if("refreshCache".equals(str)) 
counters.clear(); 

}catch (IllegalArgumentException e) { 
//Do nothing } ... 

我们增加了一个条件判断来检查流的源。Storm给了我们声明命名的流的可能性(如果你不发送元组到一个命名的流,则流的名字是”default”);它是一个非常好的方式来确定元组的源,正如这个例子中我们需要确定信号一样。 
在topology定义中,你会增加另一个流到单词计数bolt来将元组从signals-spout 流发送到这个bolt的所有实例。 
builder.setBolt("word-counter", new WordCounter(),2) 
.fieldsGrouping("word-normalizer", new Fields("word")) .allGrouping("signals-spout","signals") 
Signals-spout的实现可以在git库中找到。 自定义分组 
你可以通过实现backtype.storm.grouping.CustomStreamGrouping接口来实现你的自定义流分组。这给了你决定每个元组将被哪个(些)bolt收到的权力。 
我们修改单词计数示例来对元组进行分组,这样的话相同字母开头的单词将被相同的bolt接收。 
public class ModuleGrouping implements CustomStreamGrouping, Serializable{ 
int numTasks = 0; @Override 
public List<Integer> chooseTasks(List<Object> values) { 
List<Integer> boltIds = new ArrayList(); if(values.size()>0){ 
String str = values.get(0).toString(); if(str.isEmpty()) 
boltIds.add(0); else 
boltIds.add(str.charAt(0) % numTasks); 

return boltIds; } 
@Override 
public void prepare(TopologyContext context, Fields outFields, 
List<Integer> targetTasks) { 
numTasks = targetTasks.size(); } 

你可以看到一个CustomStreamGrouping的简单实现,在这里我们使用任务的数量来对单词的第一个字符的整型值取模,由此来决定哪个bolt将接收这个元组。 要使用示例中分分组,按照下列方式修改word-normalizer分组。  
builder.setBolt("word-normalizer", new WordNormalizer()) 
.customGrouping("word-reader", new ModuleGrouping()); Direct分组 
这是一个由源决定哪个组件将接收元组的分组。与前一个示例类似,源将基于单词的第一个字母决定哪个bolt接收元组。为使用direct分组,在WordNormalizer中,使用emitDirect方法代替emit方法。 
public void execute(Tuple input) { 
... 
for(String word : words){ 
if(!word.isEmpty()){ 
... 
collector.emitDirect(getWordCountIndex(word),new Values(word)); } } 
// Acknowledge the tuple collector.ack(input); } 
public Integer getWordCountIndex(String word) { 
word = word.trim().toUpperCase(); if(word.isEmpty()) 
return 0; else 
return word.charAt(0) % numCounterTasks; 

在prepare方法中算出目标任务的数量: 
public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) { this.collector = collector; 
this.numCounterTasks = context.getComponentTasks("word-counter"); } 
在topology的定义中,指定流将被直接分组: 
builder.setBolt("word-counter", new WordCounter(),2) 
.directGrouping("word-normalizer"); 
Global分组 
Global分组将所有源实例产生的元组发送到一个单独的目标实例(特别地,ID最低的任务)中。 None分组在写这本著作的时候(storm版本0.7.1),使用这种分组与使用22页的”Shuffle分组”是一样的。换言之,当用这个分组时,流怎样分组是无所谓的。

更多精彩内容请关注:http://bbs.superwu.cn 

关注超人学院微信二维码:

相关内容