Spark Streaming和Flume集成指南V1.4.1,flumev1.4.1


Apache Flume是一个用来有效地收集,聚集和移动大量日志数据的分布式的,有效的服务。这里我们解释一下怎样配置Flume和Spark Streaming来从Flume获取数据。这里有两个方法。

Python API:Flume现在还不支持PythonAPI

 

方法1:Flume风格的推方法

Flume被设计用来在Flume代理之间推送数据。在这种方法中,Spark Streaming本质上设置了一个接收器作为Flume的一个Avro代理,Flume把数据推送到接收器上。下面是配置的步骤。

 

一般需求

在你的集群中选择一台机器满足如下条件:

1.      当你的Flume+Spark Streaming程序启动之后,Spark节点之一必须运行在那台机器上。

2.      Flume可以配置为推送数据到那台机器的端口上。

根据推模型,流程序需要启动,同时接收器按计划运行并监听选择的端口,以让Flume能够推送数据。

 

配置Flume

配置Flume代理来发送数据到一个Avro池,需要在配置文件中加入如下的内容。

agent.sinks = avroSink
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.channel = memoryChannel
agent.sinks.avroSink.hostname = <chosen machine's hostname>
agent.sinks.avroSink.port = <chosen port on the machine>

查看Flume文档来获取更多关于配置Flume代理的信息。


配置Spark Streaming程序

1.连接:在你的SBT/Maven项目定义中,通过下面的内容连接你的流程序(在主编程指南中的连接章节寻找更多的信息)。

groupId = org.apache.spark
artifactId = spark-streaming-flume_2.10
version = 1.4.1

2.编程:在流处理程序的代码中,引入FlumeUtils并如下创建一个输入DStream流。

Scala

importorg.apache.spark.streaming.flume._
val flumeStream = FlumeUtils.createStream(streamingContext, [chosenmachine's hostname], [chosen port])

Java

import org.apache.spark.streaming.flume.*;
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
        FlumeUtils.createStream(streamingContext, [chosen machine'shostname], [chosen port]);

查看API文档和示例。

注意,这里的主机名应该和集群中的资源管理器使用的主机名相同(Mesos,YARN或Spark Standalone),这样资源分配可以匹配名字并在正确的机器上启动接收器。

3.部署:将spark-streaming-flume_2.10包和它的依赖(除了由spark-submit提供的spark-core_2.10和spark-streaming_2.10)添加到程序包中。然后使用spark-submit来启动你的应用程序(在主编程指南中查看部署章节)。

 

方法2:使用自定义池的拉方式

不是Flume直接推送数据到SparkStreaming,这种方法运行了一个如下所示的Flume池。

1.      Flume将数据推送到池中,然后数据在此处缓存。

2.      Spark Streaming使用一个可靠的Flume接收器和操作从池中拉取数据。只有在Spark Streaming接收到数据并且把数据复制后才认为操作成功。

这个方法比前面的方法提供了强大的可靠性和容错保证。然而,这需要配置Flume运行一个自定义池。下面是配置步骤。

 

一般需求

选择一台在Flume代理中运行自定义池的机器。Flume其余的管道被配置为向那个代理发送数据。Spark集群中的机器都能连接到运行自定义池的那台机器上。

 

配置Flume

在选定的机器上配置Flume需要如下的两步。

1.池JAR包:添加如下的JAR包到要运行自定义池的机器中的Flume的classpath中(查看Flume的文档https://flume.apache.org/documentation.html)。

         (i)自定义池JAR包:下载与下面内容一致的JAR包(或直接下载的地址https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-flume-sink_2.10/1.4.1/spark-streaming-flume-sink_2.10-1.4.1.jar)

        groupId= org.apache.spark
        artifactId =spark-streaming-flume-sink_2.10
        version = 1.4.1

         (ii)Scala库JAR包:下载Scala库2.10.4版本JAR包。它可以用下面的内容找到(或直接在这里下载https://repo1.maven.org/maven2/org/scala-lang/scala-library/2.10.4/scala-library-2.10.4.jar)

        groupId= org.scala-lang
        artifactId = scala-library
        version = 2.10.4

         (iii)CommonsLang3 JAR包:下载Commons Lang 3 JAR包。它可以用下面的内容找到(或者直接下载https://repo1.maven.org/maven2/org/apache/commons/commons-lang3/3.3.2/commons-lang3-3.3.2.jar)

        groupId= org.apache.commons
        artifactId = commons-lang3
        version = 3.3.2

2.配置文件:在那台机器上,通过下面的配置文件配置Flume代理发送数据到一个Avro池中。

 agent.sinks = spark
 agent.sinks.spark.type =org.apache.spark.streaming.flume.sink.SparkSink
 agent.sinks.spark.hostname = <hostname ofthe local machine>
 agent.sinks.spark.port = <port to listen onfor connection from Spark>
 agent.sinks.spark.channel = memoryChannel

也要确保上行流的Flume管道配置了发送数据到运行这个池的Flume代理。

查看Flume文档寻找更多关于配置Flume代理的信息。

 

配置Spark Streaming程序

1.      连接:在你的SBT/Maven项目定义中,通过下面的内容连接你的流程序(在主编程指南中的连接章节寻找更多的信息http://spark.apache.org/docs/latest/streaming-programming-guide.html#linking)

2.      编程:在流处理程序的代码中,引入FlumeUtils并如下创建一个输入DStream流。

Scala

importorg.apache.spark.streaming.flume._
 
val flumeStream =FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sinkport])


Java

importorg.apache.spark.streaming.flume.*;
 
JavaReceiverInputDStream<SparkFlumeEvent>flumeStream=
    FlumeUtils.createPollingStream(streamingContext, [sink machinehostname], [sink port]);

查看Scala示例https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala

注意每个输入DStream可以配置为从多个池中接收数据。

3.      部署:将spark-streaming-flume_2.10包和它的依赖(除了由spark-submit提供的spark-core_2.10和spark-streaming_2.10)添加到程序包中。然后使用spark-submit来启动你的应用程序(在主编程指南中查看部署章节http://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications)。

相关内容