storm源码分析---State APIs,---stateapis


你已经看到实现有且只有一次被执行的语义时的复杂性。Trident这样做的好处把所有容错想过的逻辑都放在了State里面 -- 作为一个用户,你并不需要自己去处理复杂的txid,存储多余的信息到数据库中,或者是任何其他类似的事情。你只需要写如下这样简单的code
  TridentTopology topology = new  TridentTopology();          TridentState wordCounts =         topology.newStream("spout1", spout)           .each(new Fields("sentence"), new Split(), new  Fields("word"))           .groupBy(new Fields("word"))           .persistentAggregate(MemcachedState.opaque(serverLocations), new  Count(), new Fields("count"))                           .parallelismHint(6);  
所有管理opaque transactional state所需的逻辑都在MemcachedState.opaque方法的调用中被涵盖了,除此之外,数据库的更新会自动以batch的形式来进行以避免多次访问数据库。State的基本接口只包含下面两个方法:
  public interface State {       void beginCommit(Long txid); // can be null for things like  partitionPersist occurring off a DRPC stream       void commit(Long txid);  }  
当一个State更新开始时,以及当一个State更新结束时你都会被告知,并且会告诉你该次的txid。Trident并没有对你的state的工作方式有任何的假定。 假定你自己搭了一套数据库来存储用户位置信息,并且你想要在Trident中去访问它。则在State的实现中应该有用户信息的set、get方法:
  public class LocationDB implements State {       public void beginCommit(Long txid) {            }          public void commit(Long txid) {            }          public void setLocation(long userId, String location) {         // code to access database and set location       }          public String getLocation(long userId) {         // code to get location from database       }  }  
然后你还需要提供给Trident一个StateFactory来在Trident的task中创建你的State对象。LocationDB 的 StateFactory可能会如下所示:
  public class LocationDBFactory implements  StateFactory {      public State makeState(Map conf, int partitionIndex, int  numPartitions) {         return new LocationDB();     }    }  
Trident提供了一个QueryFunction接口用来实现Trident中在一个state source上查询的功能。同时还提供了一个StateUpdater来实现Trident中更新statesource的功能。比如说,让我们写一个查询地址的操作,这个操作会查询LocationDB来找到用户的地址。下面以以怎样在topology中使用该功能开始,假定这个topology会接受一个用户id作为输入数据流:
  TridentTopology topology = new  TridentTopology();  TridentState locations =  topology.newStaticState(new LocationDBFactory());  topology.newStream("myspout",  spout)           .stateQuery(locations, new Fields("userid"), new  QueryLocation(), new Fields("location"))  
接下来让我们一起来看看QueryLocation 的实现应该是什么样的:
  public class QueryLocation extends  BaseQueryFunction<LocationDB, String> {       public List<String> batchRetrieve(LocationDB state,  List<TridentTuple> inputs) {           List<String> ret = new ArrayList();           for(TridentTuple input: inputs) {               ret.add(state.getLocation(input.getLong(0)));           }           return ret;       }          public void execute(TridentTuple tuple, String location,  TridentCollector collector) {           collector.emit(new Values(location));       }      }  
QueryFunction的执行分为两部分。首先Trident收集了一个batch的read操作并把他们统一交给batchRetrieve。在这个例子中,batchRetrieve会接受到多个用户id。batchRetrieve应该返还一个大小和输入tuple数量相同的result列表。result列表中的第一个元素对应着第一个输入tuple的结果,result列表中的第二个元素对应着第二个输入tuple的结果,以此类推。 你可以看到,这段代码并没有像Trident那样很好的利用batch的优势,而是为每个输入tuple去查询了一次LocationDB。所以一种更好的操作LocationDB方式应该是这样的:
  public class LocationDB implements State {       public void beginCommit(Long txid) {            }          public void commit(Long txid) {            }          public void setLocationsBulk(List<Long> userIds,  List<String> locations) {         // set locations in bulk       }          public List<String> bulkGetLocations(List<Long> userIds) {         // get locations in bulk       }  }  
接着,你可以这样改写上面的QueryLocation:
  public class QueryLocation extends  BaseQueryFunction<LocationDB, String> {       public List<String> batchRetrieve(LocationDB state,  List<TridentTuple> inputs) {           List<Long> userIds = new ArrayList<Long>();           for(TridentTuple input: inputs) {               userIds.add(input.getLong(0));           }           return state.bulkGetLocations(userIds);       }          public void execute(TridentTuple tuple, String location,  TridentCollector collector) {           collector.emit(new Values(location));       }      }  
通过有效减少访问数据库的次数,这段代码比上一个实现会高效的多。 如果你要更新State,你需要使用StateUpdater接口,下面是一个StateUpdater的例子,用来将新的地址信息更新到LocationDB当中。
  public class LocationUpdater extends  BaseStateUpdater<LocationDB> {       public void updateState(LocationDB state, List<TridentTuple>  tuples, TridentCollector collector) {           List<Long> ids = new ArrayList<Long>();           List<String> locations = new ArrayList<String>();           for(TridentTuple t: tuples) {               ids.add(t.getLong(0));               locations.add(t.getString(1));          }           state.setLocationsBulk(ids, locations);       }  }  
下面列出了你应该如何在Trident topology中使用上面声明的LocationUpdater:
  TridentTopology topology = new  TridentTopology();  TridentState locations =        topology.newStream("locations", locationsSpout)           .partitionPersist(new LocationDBFactory(), new  Fields("userid", "location"), new LocationUpdater())  
partitionPersist 操作会更新一个State,其内部是将State和一批更新的tuple交给StateUpdater,由StateUpdater完成相应的更新操作。 在这段代码中,只是简单的从输入的tuple中提取出userid和对应的location,并一起更新到State中。 partitionPersist 会返回一个TridentState对象来表示被这个Trident topoloy更新过的location db。 然后你就可以使用这个state在topology的任何地方进行查询操作了。 同时,你也可以看到我们传了一个TridentCollector给StateUpdaters,collector发送的tuple就会去往一个新的stream。在这个例子中,我们并没有去往一个新的stream的需要,但是如果你在做一些事情,比如说更新数据库中的某个count,你可以emit更新的count到这个新的stream。然后你可以通过调用TridentState#newValuesStream方法来访问这个新的stream来进行其他的处理。
更多精彩内容请关注:http://bbs.superwu.cn 关注超人学院微信二维码: 关注超人学院java免费学习交流群:

相关内容