Pulsar adaptor for Apache Spark
Spark 流式接收器
Pulsar 的 Spark Streaming 接收器是一个自定义的接收器,它使用 Apache Spark Streaming 能够从 Pulsar 接收原始数据。
应用程序可以通过 Spark Streaming receiver 接收 Resilient Distributed Dataset (RDD) 格式的数据,并可以通过多种方式对其进行处理。
先决条件
若要使用 receiver,请在 java 配置中包含 pulsar-spark
库的依赖。
Maven
如果你使用maven,添加以下内容到你的 pom.xml
中:
<!-- in your <properties> block -->
<pulsar.version>2.9.2</pulsar.version>
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-spark</artifactId>
<version>${pulsar.version}</version>
</dependency>
Gradle
如果你使用Gradle,添加以下内容到你的 build.gradle
中:
def pulsarVersion = "2.9.2"
dependencies {
compile group: 'org.apache.pulsar', name: 'pulsar-spark', version: pulsarVersion
}
用法
把一个 SparkStreamingPulsarReceiver
实例,传入 JavaStreamingContext
的receiverStream
方法中作为的参数:
String serviceUrl = "pulsar://localhost:6650/";
String topic = "persistent://public/default/test_src";
String subs = "test_sub";
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60));
ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData();
Set<String> set = new HashSet<>();
set.add(topic);
pulsarConf.setTopicNames(set);
pulsarConf.setSubscriptionName(subs);
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
serviceUrl,
pulsarConf,
new AuthenticationDisabled());
JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);
For a complete example, click here. In this example, the number of messages that contain the string “Pulsar” in received messages is counted.
请注意,如果需要,可以使用其他 Pulsar 认证类。 例如,为了在认证过程中使用令牌,可以为 SparkStreamingPulsarReceiver
构造函数设置以下参数。
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
serviceUrl,
pulsarConf,
new AuthenticationToken("token:<secret-JWT-token>"));