K天熟悉Apache Storm (一),apachestorm


最近找工作看到很多大数据处理的基本都是要求Hadoop、MapReduce之类的,其中如果有熟悉Storm、Spark的会有加分。Storm、Spark属于大数据实时处理的框架,MapReduce是属于离线的、非实时的。这几天都在看Storm,(Spark之前有看过,不过当时耽搁了,不是很深入,之前看的是《Fast Data Processing with Spark》)想记录一下自己的学习过程,希望可以和同学们共同探讨。。。

看一个技术框架,或许首先应该先google下,看下别人写的博客,技术文章等,大概了解下,如果英语过关,可以直接看官网文档(不过个人感觉storm的文档有点不是很好,根据其提供的例子弄了好些天才搞定,当然或许是自己能力太差也说不定);然后就是搭建集群(Storm集群、当然单机也可以学习的);运行自己的第一个“Word Count”程序(大数据的“hello world”?);结合一些讲原理的书籍或者博客看WordCount的代码,对照理解;尝试自己写代码,并运行,总结经验;之后就是慢慢的积累过程了!(以上纯属个人观点)

Storm简介:http://www.searchtb.com/2012/09/introduction-to-storm.html , 一篇阿里的技术博客,感觉不错(有一定Hadoop基础看着理解会好点)。里面讲到了Storm记录级容错的原理,看的不是很明白(能力不够呀!)。

一、Storm安装配置

首先参考了官网的配置:https://storm.apache.org/documentation/Setting-up-development-environment.html ,跑着有问题。接着,网上找了一篇:http://www.michael-noll.com/tutorials/running-multi-node-storm-cluster/,这个感觉靠谱点(我就是参考这个配置的)。

集群配置:

node101 : CentOS6.5  64bit、2G 内存、nimbus、ui、zookeeper(虚拟机)

node102 : CentOS6.5  64bit、1G 内存、supervisor(虚拟机)

node103 : CentOS6.5  64bit、1G 内存、supervisor(虚拟机)

版本: Storm:0.9.3 、Zookeeper:zookeeper-3.4.6

1. 安装并启动Zookeeper

1)下载Zookeeper,解压到/opt文件夹(这个文件夹可以自定义)

2)进入解压后的bin目录执行./zkServer.sh start ;

3)查看是否启动:

[root@node101 bin]# ./zkServer.sh status
JMX enabled by default
Using config: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: standalone
2. 安装并配置Storm:

(jdk不用说了,自己安装吧)

1)下载ZeroMQ、JZMQ并安装

下载地址:https://github.com/downloads/saltstack/salt/zeromq-2.1.7-1.el6.x86_64.rpm 、https://s3.amazonaws.com/cdn.michael-noll.com/rpms/jzmq-2.1.0.el6.x86_64.rpm。 这两个rpm官网没有说要安装,不安装的话,后面可能会有问题。(我现在都不清楚后面出现的问题是否是因为这个没有安装)

yum install zero*
yum install jzmq*

2)下载Storm,并配置;

在Storm官网下载0.9.3版本,并解压到/opt目录。修改conf/storm.yaml如下:

########### These MUST be filled in for a storm configuration
 storm.zookeeper.servers:
     - "node101"
#     - "server2"
#
#
 storm.local.dir: "/opt/data/storm" 
 nimbus.host: "node101"

 supervisor.slots.ports:
        - 6700
        - 6701
        - 6702
        - 6703

然后使用scp把这个文件拷贝到node102、node103机器对应的文件夹中

3)启动storm相关进程

可以在bin目录建立一个start.sh文件,并赋予执行权限;在nimbus机器建立的文件内容如下:

#!/bin/bash

/opt/apache-storm-0.9.3/bin/storm nimbus > /dev/null 2>&1 &
/opt/apache-storm-0.9.3/bin/storm ui > /dev/null 2>&1 &
在supervisor机器建立的文件如下:

#!/bin/bash

/opt/apache-storm-0.9.3/bin/storm supervisor > /dev/null 2>&1 &
当然,也可以参考http://www.michael-noll.com/tutorials/running-multi-node-storm-cluster/使用supervision把Storm的相关进程管理起来,这样就可以直接使用service命令了。

分别在node101、node102、node103中执行./start.sh 即可启动Storm集群。

4) ui监控

浏览器访问:http://node101:8080 ,即可看到集群的监控,如下:



第一次启动的时候在Topology summary中不会有内容。

这样集群就启动了!

二、Storm第一个程序WordCount

WordCount程序参考《Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf》

1) 下载https://github.com/storm-book/examples-ch02-getting_started/zipball/master ,导入到自建的java工程中,新建的java工程需要导入Storm的相关包。

2)修改WordCount使其可以在集群中运行:

a. 修改TopologyMain.java

package main;
import spouts.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import bolts.WordCounter;
import bolts.WordNormalizer;


public class TopologyMain {
	public static void main(String[] args) throws InterruptedException {
        
		if(args.length!=4){
			System.out.println("<input> <threadtime> <parallel_wc> <local|distribute>");
			System.exit(-1);
		}
		
        //Topology definition
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("word-reader",new WordReader());
		builder.setBolt("word-normalizer", new WordNormalizer())
			.shuffleGrouping("word-reader");
		builder.setBolt("word-counter", new WordCounter(),Integer.parseInt(args[2]))
			.fieldsGrouping("word-normalizer", new Fields("word"));
		
        //Configuration
		Config conf = new Config();
		conf.put("wordsFile", args[0]);
		conf.setDebug(false);
        //Topology run
		conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
		if(args[3].equals("local")){
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
		try{
			Thread.sleep(Long.parseLong(args[1]));
		}	catch(Exception e){
			e.printStackTrace();	
		}
			
			cluster.shutdown();
		}else if ("distribute".equals(args[3])){
			try {
				StormSubmitter.submitTopology("wc1-1", conf,
						 builder. createTopology());
			} catch (AlreadyAliveException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}else{
			System.out.println("Wrong mode!");
			System.exit(-1);
		}
		System.out.println(new java.util.Date()+": 任务已经提交到Strom集群!");
	}
}

这里的提交方式可以选择两种,一种是Local模式一种是分布式的。

b. WordCounter.java 修改

@Override
	public void execute(Tuple input, BasicOutputCollector collector) {
		String str = input.getString(0);
		/**
		 * If the word dosn't exist in the map we will create
		 * this, if not We will add 1 
		 */
		if(!counters.containsKey(str)){
			counters.put(str, 1);
		}else{
			Integer c = counters.get(str) + 1;
			counters.put(str, c);
		}
		System.out.println("Counter.size:"+counters.size()+",input:"+str);
	}

修改其exec方法,在最后加上一句打印即可;

3) 打包运行:

使用eclipse的export把代码打包到jar文件中,在node101中使用storm jar提交任务:

storm jar /opt/storm_user_lib/wc1.1.jar main.TopologyMain /opt/wc.txt 5000 2 distribute
这里需要先在node102、node103机器的/opt目录下新建wc.txt文档,不然会报文件找不到的错误(其实只用在运行的节点上运行即可);

wc.txt:(可以自定义)

we are the future
see me 
ok
see you 
because what i am doing
what the fuck

4)查看结果:

在storm 监控界面找到名字为 wc1-1的Topology,找到其wordcount的bolt,查看其运行节点及worker端口,如下

在节点node102中的logs中查看work-6702.log文件:

但是这里并没有看到cleanup的打印信息代码,这个是因为cleanup会在集群关闭的时候调用,所以,如果你把集群关闭就可以看到打印的结果了。(如何关闭集群?jps 查看相关的进程,kill -9直接杀掉)。

三、Storm Real-Life 例子

参考《Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf》 chapter6 

版本:Storm:0.9.3 ,Redis:2.8.19; Nodejs:0.12.0;jedis:2.6.2;

1)下载、编译、安装Redis、Nodejs:

redis
  下载地址:http://redis.io/download
  安装出错:http://www.cnblogs.com/la-isla-bonita/p/3582751.html
nodejs
  下载地址:https://nodejs.org/download/

jedis

   下载地址:https://github.com/xetorthio/jedis/releases ,下载源码使用mvn编译(使用mvn clean install -Dmaven.test.skip=true命令)

2)下载源码,并打包:

下载地址:https://github.com/storm-book/examples-ch06-real-life-app。

下载源码并导入后,如果没有使用mvn构建的话,会报错。首先把jedis jar包加入classpath路径中,同时也需要把jedis jar包加入到storm_home/lib/下面(三个节点都需要,其实应该只用两个supervisor的就行吧);

由于版本的原因,所以会有些地方报错,一般包括下面三个部分:

a. TopologyStarter.java :

修改redis、nodejs的机器名

public final static String REDIS_HOST = "node101";
	public final static int REDIS_PORT = 6379;
	public final static String WEBSERVER = "http://node101:3000/news";
去掉下面这句,应该是在21行左右;

//        Logger.getRootLogger().removeAllAppenders();

由于虚拟机资源有限,所以把所有的并行都改成了2;

builder.setSpout("read-feed", new UsersNavigationSpout(), 2);
        
        builder.setBolt("get-categ", new GetCategoryBolt(), 2)
        				.shuffleGrouping("read-feed");
        
        builder.setBolt("user-history", new UserHistoryBolt(), 2)
        				.fieldsGrouping("get-categ", new Fields("user"));
        
        builder.setBolt("product-categ-counter", new ProductCategoriesCounterBolt(), 2)
        				.fieldsGrouping("user-history", new Fields("product"));
提交任务改为集群提交:

//        LocalCluster cluster = new LocalCluster();
//        cluster.submitTopology("analytics", conf, builder.createTopology());
        StormSubmitter.submitTopology("analytics", conf,
				 builder. createTopology());
b. ProductCategoriesCounterBolt.java、utilities/ProductsReader.java

由于jedis使用中如果有超时,那就会报下面的错误:

redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out
或
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Long cannot be cast to [B
一种改法是加大time out的时间,修改上面两个类中的reconnect方法
public void reconnect() {
//		this.jedis = new Jedis(host, port);
		this.jedis = new Jedis(host, port,10000); // 修改默认time out 到10s
	}

c. NewsNotifierBolt.java 

经过b。a的修改后,还会报错,需要修改上面这个类,把http相关的包改为storm下面的http包相关类

import org.apache.storm.http.HttpResponse;
import org.apache.storm.http.client.HttpClient;
import org.apache.storm.http.client.methods.HttpPost;
import org.apache.storm.http.entity.StringEntity;
import org.apache.storm.http.impl.client.DefaultHttpClient;

经过a、b、c的修改,现在可以把源码打成jar包并运行了!


3) 运行,并查看结果

在node101中运行redis: nohup redis-server & ; 

运行jar包:storm jar real-life1.1.jar storm.analytics.TopologyStarter ;

运行Nodejs:node webapp/app.js;

在浏览器打开http://node101:3000,即可看到下图

点击某个产品,接着查看相关条目,多点击几次,就会有数据了,如下:


同时,查看Storm的监控,可以看到Storm的spout和bolt模块的传输数据记录:


实例来自《Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf》。

例子跑完后,可以开始具体分析各个步骤了,暂时不想分析real-life的例子,有点复杂,可以先分析wordcount,同时结合《Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf》这本电子书。




分享,成长,快乐

脚踏实地,专注

转载请注明blog地址:http://blog.csdn.net/fansy1990



相关内容