spark streaming 交互

Apache Spark 是一个高性能集群计算框架,其中 Spark Streaming 作为实时批处理组件,因为其简单易上手的特性深受喜爱。在 es-hadoop 2.1.0 版本之后,也新增了对 Spark 的支持,使得结合 ES 和 Spark 成为可能。

目前最新版本的 es-hadoop 是 2.1.0-Beta4。安装如下:

  1. wget http://d3kbcqa49mib13.cloudfront.net/spark-1.0.2-bin-cdh4.tgz
  2. wget http://download.elasticsearch.org/hadoop/elasticsearch-hadoop-2.1.0.Beta4.zip

然后通过 ADD_JARS=../elasticsearch-hadoop-2.1.0.Beta4/dist/elasticsearch-spark_2.10-2.1.0.Beta4.jar 环境变量,把对应的 jar 包加入 Spark 的 jar 环境中。

下面是一段使用 spark streaming 接收 kafka 消息队列,然后写入 ES 的配置:

  1. import org.apache.spark._
  2. import org.apache.spark.streaming.kafka.KafkaUtils
  3. import org.apache.spark.streaming._
  4. import org.apache.spark.streaming.StreamingContext._
  5. import org.apache.spark.SparkContext
  6. import org.apache.spark.SparkContext._
  7. import org.apache.spark.SparkConf
  8. import org.apache.spark.sql._
  9. import org.elasticsearch.spark.sql._
  10. import org.apache.spark.storage.StorageLevel
  11. import org.apache.spark.Logging
  12. import org.apache.log4j.{Level, Logger}
  13. object Elastic {
  14. def main(args: Array[String]) {
  15. val numThreads = 1
  16. val zookeeperQuorum = "localhost:2181"
  17. val groupId = "test"
  18. val topic = Array("test").map((_, numThreads)).toMap
  19. val elasticResource = "apps/blog"
  20. val sc = new SparkConf()
  21. .setMaster("local[*]")
  22. .setAppName("Elastic Search Indexer App")
  23. sc.set("es.index.auto.create", "true")
  24. val ssc = new StreamingContext(sc, Seconds(10))
  25. ssc.checkpoint("checkpoint")
  26. val logs = KafkaUtils.createStream(ssc,
  27. zookeeperQuorum,
  28. groupId,
  29. topic,
  30. StorageLevel.MEMORY_AND_DISK_SER)
  31. .map(_._2)
  32. logs.foreachRDD { rdd =>
  33. val sc = rdd.context
  34. val sqlContext = new SQLContext(sc)
  35. val log = sqlContext.jsonRDD(rdd)
  36. log.saveToEs(elasticResource)
  37. }
  38. ssc.start()
  39. ssc.awaitTermination()
  40. }
  41. }

注意,代码中使用了 spark SQL 提供的 jsonRDD() 方法,如果在对应的 kafka topic 里的数据,本身并不是已经处理好了的 JSON 数据的话,这里还需要自己写一写额外的处理函数,利用 cast class 来规范数据。