flume集成指南

flume是一个分布式的、稳定的、有效的服务,它能够高效的收集、聚集以及移动大量的日志数据。flume的架构如下图所示。

flume

本节将介绍怎样配置flume以及Spark Streaming如何从flume中接收数据。主要有两种方法可用。

方法一:基于推送的flume风格的方法

flume被设计用来在flume agent间推送数据。在这个方法中,Spark Streaming本质上是建立一个receiver,这个receiver充当一个Avro代理,用于接收flume推送的数据。下面是配置的过程。

一般需求

在你的集群中,选择一台满足下面条件的机器:

  • 当你的flume和Spark Streaming应用程序启动以后,必须有一个Spark worker运行在这台机器上面
  • 可以配置flume推送数据到这台机器的某个端口

因为是推送模式,安排好receiver并且监听选中端口的流应用程序必须是开启的,以使flume能够发送数据。

配置flume

通过下面的配置文件配置flume agent,发送数据到一个Avro sink。

  1. agent.sinks = avroSink
  2. agent.sinks.avroSink.type = avro
  3. agent.sinks.avroSink.channel = memoryChannel
  4. agent.sinks.avroSink.hostname = <chosen machine's hostname>
  5. agent.sinks.avroSink.port = <chosen port on the machine>

查看flume文档了解更多的信息。

配置Spark Streaming应用程序

  • 关联:在你的SBT或者Maven项目定义中,引用下面的组件到流应用程序中。
    1. groupId = org.apache.spark
    2. artifactId = spark-streaming-flume_2.10
    3. version = 1.1.1
  • 编程:在应用程序代码中,引入FlumeUtils创建输入DStream。
    1. import org.apache.spark.streaming.flume._
    2. val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
    查看API文档例子

注意,hostname必须和集群(Mesos,YARN或者Spark Standalone)的resource manager所使用的机器的hostname是同一个,这样就可以根据名称分配资源,在正确的机器上启动receiver

  • 部署:将spark-streaming-flume_2.10和它的依赖(除了spark-core_2.10spark-streaming_2.10)打包到应用程序jar包中。然后用spark-submit方法启动你的应用程序。

方法2:利用自定义sink的基于拉的方法

作为直接推送数据到Spark Streaming方法的替代方法,这个方法运行一个自定义的flume sink用于满足下面两点功能。

  • flume推送数据到sink,该数据被缓存在sink中。
  • Spark Streaming利用事务从sink中拉取数据。只有数据接收并且被Spark Streaming复制了之后,事务才算成功。这使这个方法比之前的方法具有更好的稳定性和容错性。然而,这个方法需要flume去
    运行一个自定义sink。下面是配置的过程。

一般需求

选择一台机器在flume agent中运行自定义的sink,配置余下的flume管道(pipeline)发送数据到agent中。集群中的其它机器应该能够访问运行自定义sink的机器。

配置flume

在选定的机器上面配置flume需要以下两个步骤。

  • Sink Jars:添加下面的jar文件到flume的classpath目录下面
    • 自定义sink jar:通过下面的方式下载jar(或者这里
      1. groupId = org.apache.spark
      2. artifactId = spark-streaming-flume-sink_2.10
      3. version = 1.1.1
    • scala library jar:下载scala 2.10.4库,你能够通过下面的方式下载(或者这里)
      1. groupId = org.scala-lang
      2. artifactId = scala-library
      3. version = 2.10.4
  • 配置文件:通过下面的配置文件配置flume agent用于发送数据到Avro sink。
  1. agent.sinks = spark
  2. agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
  3. agent.sinks.spark.hostname = <hostname of the local machine>
  4. agent.sinks.spark.port = <port to listen on for connection from Spark>
  5. agent.sinks.spark.channel = memoryChannel

要确保配置的逆流flume管道(upstream Flume pipeline)运行这个sink发送数据到flume代理。

配置Spark Streaming应用程序

  • 关联:在你的SBT或者Maven项目定义中,引入spark-streaming-flume_2.10组件
  • 编程:在应用程序代码中,引入FlumeUtils创建输入DStream。
  1. import org.apache.spark.streaming.flume._
  2. val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])

可以查看用例FlumePollingEventCount

注意,每个输入DStream都可以配置为从多个sink接收数据。

  • 部署:将spark-streaming-flume_2.10和它的依赖(除了spark-core_2.10spark-streaming_2.10)打包到应用程序的jar包中。然后用spark-submit方法启动你的应用程序。