Storm0.8.1下的并发概念、模型、设置和实践


前言

上一篇文章《用java编写Storm0.8.1程序实例详解》中,我们完成了一个完整的Storm0.8.1下的应用的设计和实现。在其中组装Topology的时候,有几个部分代码涉及到Storm的Topology并发执行的参数,当时没有仔细的讲,因为这部分涉及到Storm的并发模型,需要单独讨论。这里专门来讨论Storm的并发模型。

概念说明

storm并发模型有三个概念,分别是Worker,Executor,Task。根据官方的说明和我的理解。Worker是执行一个Topology子集的一个进程,他运行在Storm集群的某台supervisor机器上。根据之前的文章《centos6.4 下storm-0.8.1 安装记录》,我们可以知道,一个supervisor机器上可以运行的worker的数量取决于这台机器的STORM_HOME/conf/storm.yaml文件中的配置项supervisor.slots.ports的内容,在该配置项中配置了多少个端口号,该机器就可以运行多少个Worker进程。可以根据机器的性能调整这个数量,性能好的机器多设置几个端口号,性能差的机器少设置几个端口号。整个Storm集群可运行的worker的数量是所有集群中supervisor可运行的worker的数量的和。如果在supervisor的storm.yaml文件中没有找到到该设置,则Storm会从storm的jar包中的default.xml中读取。默认的配置是

supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

而Executor是在某个Worker生成一个或者多个线程。Task则是某个Executor线程发起的执行实际数据处理过程的任务。我的理解是在Storm中会启动1个或者多个Worker进程来执行某个Topology,每个worker进程生成一个或者多个executor线程来处理摸个Topology组件(Spout或者Bolt)的任务,每个executor启动一个或者多个Task执行实际的数据处理任务。其中一个executor和Topology的组件是1对多的关系也就是说一个Topology组件可能会有多个executor来执行,一个executor只会处理一个组件的任务。而一个Task则对应了你实现的Spout或者Bolt类的一个实例,来具体执行数据处理逻辑。每个worker和Topology的关系也是1对多的关系,即一个Topology可能运行在一个或者多个worker上,但是一个worker只会为一个topology服务。官方的概念图如下所示




设置配置

说完概念后,该说如何在你的topology中设置这些值了。Storm设置这些值得方法有两个途径,

第一种是在代码中设置。在前面的文章《基于Storm0.8.1用java语言开发一个完整的流式计算程序》中,我们在topology组装的时候有下面的代码


            int worknum = Integer.parseInt(args[0]);
            int execunum = Integer.parseInt(args[1]);
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("analyseMobileNetlog", new MobileNetLogAnalyseSpout(), execunum).setNumTasks(execunum * 2);
            builder.setBolt("saveMobileNetlog", new MobileNetLogSaveBolt(), execunum).setNumTasks(execunum * 2).shuffleGrouping("analyseMobileNetlog");
            builder.setBolt("countMobileNetlog", new MobileNetLogStatisticsBolt(), execunum).setNumTasks(execunum * 2).fieldsGrouping("analyseMobileNetlog", new Fields(MobileNetLogVal.MSISDN_FIELD));
            builder.setBolt("thresholdMobileNetlog", new MobileNetLogThresholdBolt(), execunum).setNumTasks(execunum * 2).shuffleGrouping("countMobileNetlog");
            Config conf = new Config();
            conf.setNumWorkers(worknum);

这里面涉及到如何设置Storm的Workers,每个组件的Executors和Tasks的具体代码。其中conf.setNumWorkers(worknum)语句设置本Topology运行的最大的worker的数量。builder.setSpout("analyseMobileNetlog", new MobileNetLogAnalyseSpout(), execunum).setNumTasks(execunum * 2);语句中的execunum和execunum * 2参数设置的是这个计算单元的Executors数量和Tasks数量。这里我用代码吧Tasks的数量设置为Executors的2倍。

第二种方式是在配置文件中配置这些数值。配置文件默认生效的优先级是default.yaml<storm.yaml。default.yaml在storm的jar包中,storm.yaml文件在STORM_HOME/conf文件夹中。default.yaml相关的配置项如下所示

topology.workers: 1
topology.tasks: null

可见,默认的workers的值是1,没有找到默认的executors的配置项,根据测试,默认值应该是1,而tasks的默认设置为null,实际中,他的默认值和executors的值是保持一致的。因为这几个值一般我会在程序中自己来设置,所以没有仔细测试配置文件的生效规则。


在代码设置和配置文件设置两个方式中,代码设置的优先级高于配置文件设置,我一般会在代码中自己设置,所以对于配置文件的加载方式,我没有仔细测试生效规则。以《基于Storm0.8.1用java语言开发一个完整的流式计算程序》中的例子为例,如果我执行如下的命令启动Topology

storm jar stormdemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.ygc.mobilenet.MobileNetLogTopology 12 2

则在Storm UI界面中看到


根据代码,我们的Topology有4个组件,每个组件2个executor,每个executor有2个task,那么我们将预料到有4*2=8个executor,有8*2=16个Task。12个workers。但是图上显示的确不是。17个Task和9个Executors分别比我们预料的多了一个,这是因为Storm还要分配系统的worker来跟踪storm中处理的消息,默认的数量就是1个work,他对应的executor和task也默认是一个。而worker的数量不会大于executos的数量。所以就出现了上面的显示数字。

动态调整

实际上对于storm的动态调整功能,我是比较不满意的。根据官方说明可以通过Storm UI来reblance功能来调整或者在命令行中通过storm rebalance来动态调整。但实际上Storm UI上根本找不到这个功能(可能0.9版本有)。而在命令方式中,不能调整tasks的数量,只能调整executors的数量和worker的数量。worker的数量不可能大于executors的数量。所以最后,我们最多能够将worker的数量和executor的数量调整到和初始的task的数量相同。这种情况下在代码和配置文件中设置worker的数量 只有在worker的数量小于executor的数量的时候才有作用。那么我们设置并发参数的时候关键是要一开始就将Task的数量设置得合适,才能在后面不停止Topology运行的情况下动态扩展Topology的处理能力(增加线程和进程)。

我们执行如下的命令

storm rebalance mobilenetlogtopology -n 5 -e analyseMobileNetlog=6

可以想象,最终,我们可以缩减mobilenetlogtopology的work的数量从9到5,但是我们想增加该Topology的analyseMobileNetlog组件的executor的数量到6的时候,最多只能增加到他的初始Task的数量4。如下图所示


好了,关于storm的并行方面的问题讲完了。希望我已经讲清楚了。这个版本的这个方面我觉得做得还是不够完善。希望新版本中能够有所改善。

相关内容

    暂无相关文章