Kafka-Storm 集成部署
Kafka-Storm 集成部署
前言
分布式实时计算的主要组件采用基于流式计算的 Apache Storm,而实时计算的数据源来自基础数据输入组件中的 Kafka,如何将 Kafka 的消息数据传入 Storm 就是本文讨论的内容。
0. 材料准备
- 正常稳定运行的 Kafka 集群(版本:Kafka 0.8.2)
- 正常稳定运行的 Storm 集群(版本:Storm 0.9.8)
- Maven 3.x
1. Storm Topology 工程
Storm 的任务(Job)称为 Topology,为了处理实时计算任务,需要新建一个 Storm Topology 工程。由于 Kafka 的消息传输模式,所谓的 Kafka-Storm 集成部署实际上就是需要实现一个接收 Kafka 消息的 Spout 接口。幸运的是,最新的 Storm 官方版本中已经内置了可靠的 KafkaSpout,不需要再去手工编写,只需要将 KafkaSpout 配置为 Topology 的输入数据源即可。
2. Maven 配置
本项目工程基于 Maven 构建。
- 需要配置的主要依赖
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.3</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.3</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.1</version> <scope>provided</scope> </dependency>
注意:这里的依赖的 scope 均为“provided”
- Maven 编译配置
<build> <finalName>storm-kafka-topology</finalName> <resources> <resource> <directory>src/main/resources</directory> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> <configuration> <finalName>${project.artifactId}-${project.version}-shade</finalName> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <artifactSet> <excludes> <exclude>log4j:log4j:jar:</exclude> </excludes> </artifactSet> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>storm.kafka.example.StormTopology</mainClass> </transformer> </transformers> </configuration> </plugin> </plugins> </build>
3. 实现 Topology
以下是 Topology 的一个简单示例(Java 版)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
public class StormTopology { // Topology 关闭命令(通过外部传入消息控制) public static boolean shutdown = false; public static void main(String[] args) { // 注册 ZooKeeper 主机 BrokerHosts brokerHosts = new ZkHosts( "hd182:2181,hd185:2181,hd128:2181"); // 所接收 Kafka 的 topic 名称 String topic = "flumeTopic"; // ZooKeeper 的注册 node 名称(注意:需要加“/”,否则 ZooKeeper 会无法识别) String zkRoot = "/kafkastorm"; // 配置 Spout String spoutId = "myKafka"; SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId); // 配置 Scheme(可选) spoutConfig.scheme = new SchemeAsMultiScheme(new SimpleMessageScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-spout", kafkaSpout); builder.setBolt("operator", new OperatorBolt()) .shuffleGrouping("kafka-spout"); Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(3); // 测试环境采用 local mode 模式 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); while (!shutdown) { Utils.sleep(100); } cluster.killTopology("test"); cluster.shutdown(); } } |
由于一个 KafkaSpout 只能接收一个指定 topic 的消息数据,因此,在实际生产环境 Topology 的实现中需要根据业务需求配置 Spout 的个数。
4. 必要的依赖包
由于 Topology 工程的依赖均为“provided”的 scope,需要将涉及到的依赖jar包拷贝到 Storm 安装目录的 lib 文件夹下,包括:
- kafka_2.10-0.8.2.1.jar
- storm-kafka-0.9.3.jar
- scala-library-2.10.4.jar
- zookeeper-3.4.6.jar
- curator-client-2.6.0.jar
- curator-framework-2.6.0.jar
- curator-recipes-2.6.0.jar
- guava-16.0.1.jar
- metrics-core-2.2.0.jar
5. 上线运行
向 Storm 集群提交任务,观察数据输出结果。另外,还可以在 Storm 的 UI 界面查看 Topology 内部组件运行状态(需要使用 Cluster 模式)。
分布式发布订阅消息系统 Kafka 架构设计
Apache Kafka 代码实例
Apache Kafka 教程笔记
Apache kafka原理与特性(0.8V)
Kafka部署与代码实例
Kafka介绍和集群环境搭建
Kafka 的详细介绍:请点这里
Kafka 的下载地址:请点这里
本文永久更新链接地址:
评论暂时关闭