Pulsar的Spark Streaming receiver经过专门的定制,使 Apache Spark Streaming 能够从Pulsar接收数据.
应用程序可以通过 Spark Streaming Pulsar receiver 接收 Resilient Distributed Dataset (RDD) 格式的数据,并可以通过多种方式对其进行处理.
先决条件
若要使用 receiver,请在 java 配置中包含 pulsar-spark
库的依赖。
Maven
如果你使用maven,添加以下内容到你的 pom.xml
中:
<!-- 在你的 <properties> 部分-->
<pulsar.version>2.6.1</pulsar.version>
<!-- 在你的 <dependencies> 部分-->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
Gradle
如果你使用Gradle,添加以下内容到你的 build.gradle
中:
def pulsarVersion = "2.6.1"
dependencies {
compile group: 'org.apache.pulsar', name: 'pulsar-spark', version: pulsarVersion
}
用法
把一个 SparkStreamingPulsarReceiver
实例,传入 JavaStreamingContext
的receiverStream
方法中作为的参数:
String serviceUrl = "pulsar://localhost:6650/";
String topic = "persistent://public/default/test_src";
String subs = "test_sub";
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60));
ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData();
Set<String> set = new HashSet<>();
set.add(topic);
pulsarConf.setTopicNames(set);
pulsarConf.setSubscriptionName(subs);
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
serviceUrl,
pulsarConf,
new AuthenticationDisabled());
JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);
示例
You can find a complete example here. 该示例统计接收的消息有多少条包含字符串 “Pulsar”。
当前内容版权归 Apache Pulsar 或其关联方所有,如需对内容或内容相关联开源项目进行关注与资助,请访问 Apache Pulsar .