【Twitter Storm系列】flume-ng+Kafka+Storm+HDFS 实时系统搭建


一直以来都想接触Storm实时计算这块的东西,最近在群里看到上海一哥们罗宝写的Flume+Kafka+Storm的实时日志流系统的搭建文档,自己也跟着整了一遍,之前罗宝的文章中有一些要注意点没提到的,以后一些写错的点,在这边我会做修正;

之前在弄这个的时候,跟群里的一些人讨论过,有的人说,直接用storm不就可以做实时处理了,用不着那么麻烦;其实不然,做软件开发的都知道模块化思想,这样设计的原因有两方面:

一方面是可以模块化,功能划分更加清晰,从“数据采集--数据接入--流失计算--数据输出/存储”


那么接下来我们来看下整体的架构图



$tar zxvf apache-flume-1.4.0-bin.tar.gz/usr/local

$bin/flume-ng agent--conf conf --conf-file conf/flume-conf.properties --name producer-Dflume.root.logger=INFO,console


Kafka

的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。

罗宝兄弟文章上的架构图是这样的

> tar xzf kafka-<VERSION>.tgz
> cd kafka-<VERSION>
> ./sbt update
> ./sbt package
> ./sbt assembly-package-dependency

> bin/zookeeper-server-start.shconfig/zookeeper.properties
> bin/kafka-server-start.shconfig/server.properties
这里是官网上的教程,kafka本身有内置zookeeper,但是我自己在实际部署中是使用单独的zookeeper集群,所以第一行命令我就没执行,这里只是些出来给大家看下。

配置独立的zookeeper集群需要配置server.properties文件,讲zookeeper.connect修改为独立集群的IP和端口

zookeeper.connect=nutch1:2181

> bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1--partition 1 --topic test
> bin/kafka-list-topic.sh --zookeeperlocalhost:2181

> bin/kafka-console-producer.sh--broker-list localhost:9092 --topic test

> bin/kafka-console-consumer.sh--zookeeper localhost:2181 --topic test --from-beginning

在实际开发中还是要自行开发自己的生产者与消费者;

kafka的安装也可以参考我之前写的文章:http://blog.csdn.net/weijonathan/article/details/18075967

Storm

GitHubClojure


  1. ØMQ

producer.sources.s.command = tail -f -n+1 /mnt/hgfs/vmshare/test.log
producer.sources.s.channels = c




以上这个是我的test.log文件通过flume抓取传到kafka的数据;说明我们的flume和kafka流程走通了;

大家还记得刚开始我们的流程图么,其中有一步是通过flume到kafka,还有一步是到hdfs的;而我们这边还没有提到如何存入kafka且同时存如hdfs;

flume是支持数据同步复制,同步复制流程图如下,取自于flume官网,官网用户指南地址:http://flume.apache.org/FlumeUserGuide.html


怎么设置同步复制呢,看下面的配置:

#2个channel和2个sink的配置文件  这里我们可以设置两个sink,一个是kafka的,一个是hdfs的;
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
具体配置大伙根据自己的需求去设置,这里就不具体举例了

kafka和storm的整合

完成以上步骤之后,我们还有一件事情要做,就是使用kafka-storm0.8插件,写一个自己的Storm程序;

这里我给大伙附上一个我弄的storm程序,百度网盘分享地址:http://pan.baidu.com/s/1bnEdgh5;

先稍微看下程序的创建Topology代码


数据操作主要在WordCounter类中,这里只是使用简单JDBC进行插入处理


这里只需要输入一个参数作为Topology名称就可以了!我们这里使用本地模式,所以不输入参数,直接看流程是否走通;

storm-0.9.0.1/bin/storm jar storm-start-demo-0.0.1-SNAPSHOT.jar com.storm.topology.MyTopology

先看下日志,这里打印出来了往数据库里面插入数据了


然后我们查看下数据库;插入成功了!


到这里我们的整个整合就完成了!

但是这里还有一个问题,不知道大伙有没有发现。这个也是@晨色星空J2EE跟我说的,其实我也应该想到的;

由于我们使用storm进行分布式流式计算,那么分布式最需要注意的是数据一致性以及避免脏数据的产生;所以我提供的测试项目只能用于测试,正式开发不能这样处理;

@晨色星空J2EE给的建议是建立一个zookeeper的分布式全局锁,保证数据一致性,避免脏数据录入!

zookeeper客户端框架大伙可以使用Netflix Curator来完成,由于这块我还没去看,所以只能写到这里了!


转载的话请注明来源地址:http://blog.csdn.net/weijonathan/article/details/18301321

相关内容

    暂无相关文章