基于Storm0.8.1用java语言开发一个完整的流式计算程序


1.概述

上一篇文章中,我们已经将storm0.8.1的环境搭建好了。现在是时候写一个实用的程序并且验证一下storm强大威力的时候了。其实,官方的样例就比较完整的介绍了。一来那个是英文的,二来那个例子略显简单,所以这里我主要介绍用java来编写一个具备完整需求的storm0.8.1流式计算程序的例子。首先需要介绍一下storm的编程模型。

2.编程模型

在storm的编程模型中,主要有三个组件,一个是Topology,这个Topology是一个由多个计算节点构成的拓扑图。结算节点分为两种,一种是Spout,一种是Blot。这些Spout和Bolt构成下图所示的一个多节点的有向图。(数据从左向右流动)。整个图是一个Topology,其中有2个Spout和4个Bolot组成。

Spout是Storm的Topoloy的入口,数据流都是从Spout进入topology来进行处理。他负责从外部接收数据,处理,然后发射出去。每个Topology至少要有一个Spout

Bolt是数据处理单元,他可以从一个或者多个其他的计算节点接收数据,处理并发射出去。他可以从Spout接收数据,也可以从其他的Bolt接收数据。Bolt数量可以为0。

Spout的数据发射给谁,Bolt从谁接收数据这些将不会再Spout和Bolt类中声明,在组装Topology的时候说明。

3.编程步骤

我们要编写一个完整的storm的程序,需要经过四步

4.详细实现

4.1需求

假设我们会不断的将一些移动用户上网日志记录文件存储到HDFS的某个目录下,要求我们能够快速的将新到达的文件解析成我们需要的结构,存储到HBase数据库的上网日志表中,并且对每个用户上网记录的次数做统计,超出门限值后存储到Hbase的超门限信息表中。

4.2具体实现

通过分析,我们采用storm来实现该需求,首先我们设计出我们的处理架构图如下所示

由以上设计,我们需要实现一个Spout类(com.ygc.mobilenet.MobileNetLogAnalyseSpout)负责从HDFS读取数据,初步处理然后发射(emit)出去。我们要实现三个Bolt类,一个Bolt类(com.ygc.mobilenet.MobileNetLogSaveBolt)负责获取每条记录,并存储到HBase数据库中。一个Bolt类(com.inspur.mobilenet.MobileNetLogStatisticsBolt)负责按手机号在时间窗口内统计上网记录数。一个Bolt类(com.inspur.mobilenet.MobileNetLogThresholdBolt)负责检查每个手机号时间窗口内上网记录数是否超门限,如果超门限则保存到Hbase数据库中。

<dependencies>
        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm</artifactId>
            <version>0.8.1</version>
        </dependency>
        <dependency>
            <groupId>com.ygc.hadoop</groupId>
            <artifactId>hadoop2client</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

我对HBase和HDFS的访问都封装到hadoop2client这个jar包中了,而这个jar包依赖的hadoop和hbase的包在他自己的pom.xml文件中声明,不需要我们自己显式的声明。这是maven的好处,只用声明直接依赖即可。

以下章节分别描述每个部分具体的实现。

4.2.1Spout实现

4.2.1.1com.ygc.mobilenet.MobileNetLogVal 类

这个是一个辅助类,用来描述读取的数据文件结构的,不需要解释
package com.ygc.mobilenet;

import backtype.storm.tuple.Fields;

/**
 * 用来描述要解析的日志内容的辅助类
 */
public class MobileNetLogVal {
    //上网开始时间
    public static String START_TIME_FIELD = "START_TIME";
    //上网响应时间
    public static String RESPONSE_TIME_FIELD = "RESPONSE_TIME";
    //响应结束时间
    public static String END_TIME_FIELD = "END_TIME";
    //源设备IP
    public static String SOURCE_DEV_IP_FIELD = "SOURCE_DEV_IP";
    //源用户IP
    public static String SOURCE_USER_IP_FIELD = "SOURCE_USER_IP";
    //源端口
    public static String SOURCE_PORT_FIELD = "SOURCE_PORT";
    //目标设备IP
    public static String DEST_DEV_IP_FIELD = "DEST_DEV_IP";
    //目标用户IP
    public static String DEST_USER_IP_FIELD = "DEST_USER_IP";
    //目标端口
    public static String DEST_PORT_FIELD = "DEST_PORT";
    //IMSI标识设备
    public static String IMSI_FIELD = "IMSI";
    //MSISDN标识手机号码
    public static String MSISDN_FIELD = "MSISDN";
    //移动上网接入点
    public static String APN_FIELD = "APN";
    //用户操作系统。一般是浏览器型号或者程序名
    public static String USER_AGENT_FIELD = "USER_AGENT";
    //目标URL
    public static String URL_FIELD = "URL";
    //目标主机,包含在URL中
    public static String HOST_FIELD = "HOST";


    /**
     * 创建一个Spout要发射的数据结构。
     *
     * @return 返回Spout要发射的数据结构。
     */
    public Fields createFields() {
        return new Fields(
                START_TIME_FIELD,
                RESPONSE_TIME_FIELD,
                END_TIME_FIELD,
                SOURCE_DEV_IP_FIELD,
                SOURCE_USER_IP_FIELD,
                SOURCE_PORT_FIELD,
                DEST_DEV_IP_FIELD,
                DEST_USER_IP_FIELD,
                DEST_PORT_FIELD,
                IMSI_FIELD,
                MSISDN_FIELD,
                APN_FIELD,
                USER_AGENT_FIELD,
                URL_FIELD,
                HOST_FIELD);
    }
}

4.2.1.2com.ygc.mobilenet.

package com.ygc.mobilenet;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import com.ygc.hadoop.hdfs.HDFSClient;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


import java.io.BufferedReader;
import java.io.File;
import java.util.Map;


/**
 * @author inspur research
 * @since 2014-01-09
 * 接收移动用户上网记录并发射
 */
public class MobileNetLogAnalyseSpout implements IRichSpout {
    private SpoutOutputCollector outputCollector;
    private HDFSClient hdfsClient;
    private Log log = LogFactory.getLog(MobileNetLogAnalyseSpout.class);
    //要监控的HDFS文件目录
    private String MONITOR_PATH = "/storm/mobilelog";


    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //每个Spout或者Blot要emit数据,必须指定数据的结构。
        outputFieldsDeclarer.declare(new MobileNetLogVal().createFields());
    }


    /**
     * 解析一行数据,转换为后续要处理的数据结构,这里从数据中读取了15列数据
     * @param line 从数据源接收的一行数据
     * @return 格式化好的Values对象。
     */
    public Values createValues(String line) {
        String[] cols = line.split("[|]");
        return new Values(
                cols[0],
                cols[1],
                cols[2],
                cols[4],
                cols[6],
                cols[12],
                cols[5],
                cols[7],
                cols[13],
                cols[28],
                cols[29],
                cols[32],
                cols[33],
                cols[30],
                cols[31]
        );
    }


    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }


    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        //需要通过SpoutOutputCollector对象来emit数据流和
        this.outputCollector = spoutOutputCollector;
        try {            
            //读取HDFS文件的客户端,自己实现
            hdfsClient = new HDFSClient();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Override
    public void close() {
    }


    @Override
    public void activate() {
    }


    @Override
    public void deactivate() {
    }


    @Override
    public void nextTuple() {
        Utils.sleep(10000);
        try {
            //该方法从指定的目录中中读取符合条件的文件列表,并随机从中选择一个将其独占并返回文件名
            String lockFileName = hdfsClient.lockFileName(MONITOR_PATH, ".TXT");
            if (lockFileName != null) {
                //从HDFS中打开已经被独占的文本文件
                BufferedReader read = hdfsClient.getBufferedReader(lockFileName);
                String line;
                int count = 0;
                while ((line = read.readLine()) != null) {
                    if (line.trim().length() > 0) {
                        count++;                        
                        this.outputCollector.emit(createValues(line));
                    }
                }
                log.info("deal file " + lockFileName + " " + count + " rows!");
            }           
        } catch (Exception e) {
            log.error(e);
        }
    }


    @Override
    public void ack(Object o) {

    }

    @Override
    public void fail(Object o) {

    }
}


4.2.1.3重点解释

要实现一个Spout类,一般的做法是实现backtype.storm.topology.IRichSpout接口,重点的是要实现下面的几个接口

4.2.1.3.1public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) 接口
在这个接口里,我们要声明本Spout要发射(emit)的数据的结构,及一个backtype.storm.tuple.Fields对象。这个对象和public void nextTuple()接口中emit的backtype.storm.tuple.Values共同组成了一个元组对象(backtype.storm.tuple.Tuple)供后面接收该数据的Blot使用
4.2.1.3.2public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector)接口
该接口是初始化接口。在这里要将SpoutOutputCollector spoutOutputCollector对象保存下来,供后面的public void nextTuple()接口使用,还可以执行一些其他的操作。例如这里将HDFSClient对象初始化了。
4.2.1.3.3public void nextTuple() 接口
该接口实现具体的读取数据源的方法。本例中是从HDFS中读取一个数据文件,并逐行解析,将获取的数据emit出去。emit的对象是通过public Values createValues(String line)方法生成的backtype.storm.tuple.Values对象。该方法从数据源的一行数据中,选取的15个目标值组成一个backtype.storm.tuple.Values对象。这个对象中可以存储不同类型的对象,例如你可以同时将String对象,Long对象存取在一个backtype.storm.tuple.Values中emit出去。实际上只要实现了Storm要求的序列化接口的对象都可以存储在里面。emit该值得时候需要注意,他的内容要和declareOutputFields中声明的backtype.storm.tuple.ields对象相匹配,必须一一对应。他们被共同组成一个backtype.storm.tuple.Tuple元组对象,被后面接收该数据流的对象使用。

4.2.2Bolt实现


4.2.2.1com.ygc.mobilenet.MobileNetLogSaveBolt类

package com.ygc.mobilenet;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import com.inspur.hadoop.hbase.HTableClient;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.Map;

/**
 * 负责将所有上网记录保存到HBase数据库中
 */
public class MobileNetLogSaveBolt implements IRichBolt {
    private OutputCollector outputCollector;
    private HTableClient tableClient;
    private Log log = LogFactory.getLog(MobileNetLogSaveBolt.class);

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector = outputCollector;
        try {
            //初始化HBase数据库
            tableClient = new HTableClient("192.168.1.230");

        } catch (Exception e) {
            log.error(e);
        }
    }

    @Override
    public void execute(Tuple tuple) {
        try {
            log.info("access tuple " + tuple.getValues()+"files info = " +tuple.getFields().toString());
            saveTupleToHBase(tuple);
            this.outputCollector.emit(tuple, tuple.getValues());
        } catch (Exception e) {
            log.error(e);
        } finally {
            outputCollector.ack(tuple);
        }
    }

    /**
     * 将接收的元组保存到数据库
     * @param tuple  从其他计算节点接收的数据流,这里是从com.ygc.mobilenet.MobileNetLogAnalyseSpout中接收的数据流。注意和该类中声明的backtype.storm.tuple.Fields对象和emit的backtype.storm.tuple.Values对象对应
     * @return
     */
    private boolean saveTupleToHBase(Tuple tuple) {
        try {
            String rowKey = createRowKeyByeTuple(tuple);
            long ts = System.currentTimeMillis();
            tableClient.insertRow("t_mobilenet_log", rowKey, "TIME",
                    MobileNetLogVal.START_TIME_FIELD, ts, tuple.getStringByField(MobileNetLogVal.START_TIME_FIELD));
            tableClient.insertRow("t_mobilenet_log", rowKey, "TIME",
                    MobileNetLogVal.RESPONSE_TIME_FIELD, ts, tuple.getStringByField(MobileNetLogVal.RESPONSE_TIME_FIELD));
            tableClient.insertRow("t_mobilenet_log", rowKey, "TIME",
                    MobileNetLogVal.END_TIME_FIELD, ts, tuple.getStringByField(MobileNetLogVal.END_TIME_FIELD));
            tableClient.insertRow("t_mobilenet_log", rowKey, "SOURCE_INFO",
                    MobileNetLogVal.SOURCE_DEV_IP_FIELD, ts, tuple.getStringByField(MobileNetLogVal.SOURCE_DEV_IP_FIELD));
            tableClient.insertRow("t_mobilenet_log", rowKey, "SOURCE_INFO",
                    MobileNetLogVal.SOURCE_USER_IP_FIELD, ts, tuple.getStringByField(MobileNetLogVal.SOURCE_USER_IP_FIELD));
            tableClient.insertRow("t_mobilenet_log", rowKey, "SOURCE_INFO",
                    MobileNetLogVal.SOURCE_PORT_FIELD, ts, tuple.getStringByField(MobileNetLogVal.SOURCE_PORT_FIELD));
            tableClient.insertRow("t_mobilenet_log", rowKey, "DEST_INFO",
                    MobileNetLogVal.DEST_DEV_IP_FIELD, ts, tuple.getStringByField(MobileNetLogVal.DEST_DEV_IP_FIELD));
            tableClient.insertRow("t_mobilenet_log", rowKey, "DEST_INFO",
                    MobileNetLogVal.DEST_USER_IP_FIELD, ts, tuple.getStringByField(MobileNetLogVal.DEST_USER_IP_FIELD));
            tableClient.insertRow("t_mobilenet_log", rowKey, "DEST_INFO",
                    MobileNetLogVal.DEST_PORT_FIELD, ts, tuple.getStringByField(MobileNetLogVal.DEST_PORT_FIELD));
            tableClient.insertRow("t_mobilenet_log", rowKey, "BASIC_INFO",
                    MobileNetLogVal.APN_FIELD, ts, tuple.getStringByField(MobileNetLogVal.APN_FIELD));
            tableClient.insertRow("t_mobilenet_log", rowKey, "BASIC_INFO",
                    MobileNetLogVal.IMSI_FIELD, ts, tuple.getStringByField(MobileNetLogVal.IMSI_FIELD));
            tableClient.insertRow("t_mobilenet_log", rowKey, "BASIC_INFO",
                    MobileNetLogVal.MSISDN_FIELD, ts, tuple.getStringByField(MobileNetLogVal.MSISDN_FIELD));
            tableClient.insertRow("t_mobilenet_log", rowKey, "BASIC_INFO",
                    MobileNetLogVal.URL_FIELD, ts, tuple.getStringByField(MobileNetLogVal.URL_FIELD));
            tableClient.insertRow("t_mobilenet_log", rowKey, "BASIC_INFO",
                    MobileNetLogVal.HOST_FIELD, ts, tuple.getStringByField(MobileNetLogVal.HOST_FIELD));
        } catch (Exception e) {
            log.error(e);
            return false;
        }
        return true;
    }

    private String createRowKeyByeTuple(Tuple tuple) {
        return tuple.getStringByField(MobileNetLogVal.MSISDN_FIELD)
                + tuple.getStringByField(MobileNetLogVal.START_TIME_FIELD);
    }

    @Override
    public void cleanup() {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new MobileNetLogVal().createFields());
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

需要注意几个方面:

4.2.2.2com.ygc.mobilenet.MobileNetLogStatisticsBolt

package com.ygc.mobilenet;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

/**
 * 统计时间窗口内上网记录数。这里没有实现时间窗口。
 */
public class MobileNetLogStatisticsBolt implements IRichBolt {
    private OutputCollector outputCollector;
    private Map<String, Integer> requestStatistic = new HashMap<String, Integer>();

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String msisdn = tuple.getStringByField(MobileNetLogVal.MSISDN_FIELD);
        Integer count = 1;
        if (requestStatistic.containsKey(msisdn)) {
            count = requestStatistic.get(msisdn) + 1;
        }
        requestStatistic.put(msisdn, count);
        this.outputCollector.emit(tuple, new Values(msisdn, count));
        this.outputCollector.ack(tuple);
    }

    @Override
    public void cleanup() {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(MobileNetLogVal.MSISDN_FIELD,"count"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

重点注意:

4.2.2.3com.ygc.mobilenet.MobileNetLogThresholdBolt

package com.ygc.mobilenet;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import com.inspur.hadoop.hbase.HTableClient;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.Map;

/**
 * 负责检查用户上网次数是否门限,如果超门限则保存到HBase数据库中
 */
public class MobileNetLogThresholdBolt implements IRichBolt {
    private OutputCollector outputCollector;
    //操作HBase数据库的客户端
    private HTableClient tableClient;
    private Log log = LogFactory.getLog(MobileNetLogThresholdBolt.class);

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector = outputCollector;
        try {
            tableClient = new HTableClient("192.168.1.230");
        } catch (Exception e) {
            log.error(e);
        }
    }

    @Override
    public void execute(Tuple tuple) {
        log.info("deal data " + tuple.getString(0) + "=" + tuple.getInteger(1));
        if (tuple.getInteger(1) > 2) {
            try {
                tableClient.insertRow("t_mobilenet_threshold", tuple.getString(0), "STAT_INFO", "COUNT", String.valueOf(tuple.getInteger(1)));
            } catch (Exception e) {
                log.error(e);
            }
        }
        this.outputCollector.emit(tuple, tuple.getValues());
        this.outputCollector.ack(tuple);
    }

    @Override
    public void cleanup() {

    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

重点解释:
只要注意在从元组中取数的时候,tuple.getString(0)和tuple.getInteger(1)分别对应了com.inspur.mobilenet.MobileNetLogStatisticsBolt对象中发射的MobileNetLogVal.MSISDN_FIELD,"count"字段对应即可。在com.inspur.mobilenet.MobileNetLogStatisticsBolt中我们取元组数据的时候是通过Field的名字来取的,这里是通过序号来取的。

4.2.3组装实现

现在,我们开发的组件需要组装成Topology了这个是在一个带Main函数的类里面实现的。从上面的设计图看,结构应该是com.ygc.mobilenet.MobileNetLogAnalyseSpout作为该Topology读取数据的源,他发射的数据分别被com.ygc.mobilenet.MobileNetLogSaveBolt、com.ygc.mobilenet.MobileNetLogStatisticsBolt对象接收。而com.ygc.mobilenet.MobileNetLogStatisticsBolt对象发射的数据流被com.ygc.mobilenet.MobileNetLogThresholdBolt对象接收。所以组装Topology的代码如下所示

package com.ygc.mobilenet;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

/**
 * 组装storm的Topology的代码
 */
public class MobileNetLogTopology {
    public static void main(String[] args) {


        if (args.length > 0) {
            int worknum = Integer.parseInt(args[0]);
            int execunum = Integer.parseInt(args[1]);
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("analyseMobileNetlog", new MobileNetLogAnalyseSpout(), execunum);
            builder.setBolt("saveMobileNetlog", new MobileNetLogSaveBolt(), execunum).shuffleGrouping("analyseMobileNetlog");
            builder.setBolt("countMobileNetlog", new MobileNetLogStatisticsBolt(), execunum).fieldsGrouping("analyseMobileNetlog", new Fields(MobileNetLogVal.MSISDN_FIELD));
            builder.setBolt("thresholdMobileNetlog", new MobileNetLogThresholdBolt(), execunum).shuffleGrouping("countMobileNetlog");
            Config conf = new Config();
            conf.setNumWorkers(worknum);
            conf.setMaxSpoutPending(5000);
            try {
                StormSubmitter.submitTopology("mobilenetlogtopology", conf, builder.createTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

在组装Topology的过程中涉及到一个Stream Grouping的概念,这个概念就是Topology的各个计算单元之间数据流分发的一个策略。不同的策略应用在不同的场景下。例如上面第三节,要求某个字段(手机号)相同的数据必须发送到后续计算节点同一个任务中。Stream Grouping默认提供如下的类别

4.2.4提交执行

提交这个storm的Topology任务之前,要将以上代码需要的相关jar中和storm0.8.1无关的其他jar包全部打到一个jar中,例如上述相关的就有自己实现的HDFSClient、HTableClient以及他们相关的Hadoop、Hbase依赖包都打包到同一个stormdemo-1.0-SNAPSHOT-jar-with-dependencies.jar文件中。这里可以使用maven的装配插件来完成这个打包的工作

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>                    
                </configuration>
            </plugin>
        </plugins>
    </build>

这个组装插件可以将你的项目依赖的所有jar包解压和你的class一起打包到你的jar文件中。当然,里面也会有一些坑。例如多个jar包中有同一个配置文件互相覆盖导致的问题我就遇到过。
打包好了后,将该jar文件上传到storm的nimbus进程所在的服务器上,执行如下的命令

storm jar stormdemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.ygc.mobilenet.MobileNetLogTopology 16 4

其中16、4是com.ygc.mobilenet.MobileNetLogTopology对象要读取的的命令行参数。分别用户用来设置执行该Topology的worker的数量和executors的数量。这两个数量分别设置了这个Topoploy要执行的进程数和每个计算单元执行的线程数。

如果成功,会显示如下的信息

0    [main] INFO  backtype.storm.StormSubmitter  - Jar not uploaded to master yet. Submitting jar...
10   [main] INFO  backtype.storm.StormSubmitter  - Uploading topology jar stormdemo-1.0-SNAPSHOT-jar-with-dependencies.jar to assigned location: /home/storm/stormlocale/nimbus/inbox/stormjar-a3bdefb1-6ec5-40ab-8153-6baab889b043.jar
593  [main] INFO  backtype.storm.StormSubmitter  - Successfully uploaded topology jar to assigned location: /home/storm/stormlocale/nimbus/inbox/stormjar-a3bdefb1-6ec5-40ab-8153-6baab889b043.jar
593  [main] INFO  backtype.storm.StormSubmitter  - Submitting topology mobilenetlogtopology in distributed mode with conf {"topology.workers":16,"topology.max.spout.pending":5000}
719  [main] INFO  backtype.storm.StormSubmitter  - Finished submitting topology: mobilenetlogtopology

在Storm UI界面上将看到如下的界面


4.2.5执行效果

4.2.5.1查看Storm UI效果

现在我们将几个要处理的上网日志文件上传到HDFS的/storm/mobilelist文件夹中。等一会刷新storm UI界面,点击上图中的mobilenetlogtopology链接,可以看到如下的画面。里面是该Topology运行的详情


4.2.5.2查看HBase数据

在HBase的HMaster上执行

hbase shell

进入HBase交互界面,执行如下的命令

scan 't_mobilenet_log'


相关内容

    暂无相关文章