Pulsar adaptor for Apache Spark

Spark structured streaming 连接器

Pulsar Spark Connector 是 Apache Pulsar 和 Apache Spark(数据处理引擎)的集成,它允许 Spark 使用 Spark structured streaming 和 Spark SQL 从 Pulsar 读取数据和将数据写入 Pulsar,并提供精准一次 source 语义和至少一次的 sink 语义。 详细信息,请参阅 StreamNative Hub 中的 Pulsar Spark Connector

Spark streaming 连接器

Pulsar的Spark Streaming receiver经过专门的定制,使 Apache Spark Streaming 能够从Pulsar接收数据.

应用程序可以通过 Spark Streaming Pulsar receiver 接收 Resilient Distributed Dataset (RDD) 格式的数据,并可以通过多种方式对其进行处理.

先决条件

若要使用 receiver,请在 java 配置中包含 pulsar-spark 库的依赖。

Maven

如果你使用maven,添加以下内容到你的 pom.xml 中:

  1. <!-- in your <properties> block -->
  2. <pulsar.version>2.8.0</pulsar.version>
  3. <!-- in your <dependencies> block -->
  4. <dependency>
  5. <groupId>org.apache.pulsar</groupId>
  6. <artifactId>pulsar-spark</artifactId>
  7. <version>${pulsar.version}</version>
  8. </dependency>

Gradle

如果你使用Gradle,添加以下内容到你的 build.gradle 中:

  1. def pulsarVersion = "2.8.0"
  2. dependencies {
  3. compile group: 'org.apache.pulsar', name: 'pulsar-spark', version: pulsarVersion
  4. }

用法

把一个 SparkStreamingPulsarReceiver 实例,传入 JavaStreamingContextreceiverStream方法中作为的参数:

  1. String serviceUrl = "pulsar://localhost:6650/";
  2. String topic = "persistent://public/default/test_src";
  3. String subs = "test_sub";
  4. SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");
  5. JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60));
  6. ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData();
  7. Set<String> set = new HashSet<>();
  8. set.add(topic);
  9. pulsarConf.setTopicNames(set);
  10. pulsarConf.setSubscriptionName(subs);
  11. SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
  12. serviceUrl,
  13. pulsarConf,
  14. new AuthenticationDisabled());
  15. JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);

For a complete example, click here. In this example, the number of messages that contain the string “Pulsar” in received messages is counted.

请注意,如果需要,可以使用其他 Pulsar 认证类。 例如,为了在认证过程中使用令牌,可以为 SparkStreamingPulsarReceiver 构造函数设置以下参数。

  1. SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
  2. serviceUrl,
  3. pulsarConf,
  4. new AuthenticationToken("token:<secret-JWT-token>"));