Pulsar adaptor for Apache Spark
Spark structured streaming 连接器
Pulsar Spark Connector 是 Apache Pulsar 和 Apache Spark(数据处理引擎)的集成,它允许 Spark 使用 Spark structured streaming 和 Spark SQL 从 Pulsar 读取数据和将数据写入 Pulsar,并提供精准一次 source 语义和至少一次的 sink 语义。 详细信息,请参阅 StreamNative Hub 中的 Pulsar Spark Connector 。
Spark streaming 连接器
Pulsar的Spark Streaming receiver经过专门的定制,使 Apache Spark Streaming 能够从Pulsar接收数据.
应用程序可以通过 Spark Streaming Pulsar receiver 接收 Resilient Distributed Dataset (RDD) 格式的数据,并可以通过多种方式对其进行处理.
先决条件
若要使用 receiver,请在 java 配置中包含 pulsar-spark
库的依赖。
Maven
如果你使用maven,添加以下内容到你的 pom.xml
中:
<!-- in your <properties> block -->
<pulsar.version>2.8.0</pulsar.version>
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-spark</artifactId>
<version>${pulsar.version}</version>
</dependency>
Gradle
如果你使用Gradle,添加以下内容到你的 build.gradle
中:
def pulsarVersion = "2.8.0"
dependencies {
compile group: 'org.apache.pulsar', name: 'pulsar-spark', version: pulsarVersion
}
用法
把一个 SparkStreamingPulsarReceiver
实例,传入 JavaStreamingContext
的receiverStream
方法中作为的参数:
String serviceUrl = "pulsar://localhost:6650/";
String topic = "persistent://public/default/test_src";
String subs = "test_sub";
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60));
ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData();
Set<String> set = new HashSet<>();
set.add(topic);
pulsarConf.setTopicNames(set);
pulsarConf.setSubscriptionName(subs);
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
serviceUrl,
pulsarConf,
new AuthenticationDisabled());
JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);
For a complete example, click here. In this example, the number of messages that contain the string “Pulsar” in received messages is counted.
请注意,如果需要,可以使用其他 Pulsar 认证类。 例如,为了在认证过程中使用令牌,可以为 SparkStreamingPulsarReceiver
构造函数设置以下参数。
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
serviceUrl,
pulsarConf,
new AuthenticationToken("token:<secret-JWT-token>"));