Pulsar Java 客户端可以用于创建Java生产者(Producer)和消费者(Consumer),消息读取器以及执行管理的任务 Java 客户端的当前版本为 2.6.1。
Pulsar客户端的Javadoc分成了两个包:
包 | Description | Maven Artifact |
---|---|---|
org.apache.pulsar.client.api | 生产者和消费者API | org.apache.pulsar:pulsar-client:2.6.1 |
org.apache.pulsar.client.admin | Java 管理API | org.apache.pulsar:pulsar-client-admin:2.6.1 |
本文档仅关注Pulsar主题消息的生产和消费的客户端API. 关于使用 Java 管理客户端的指南, 请参见 Pulsar管理接口。
安装
The latest version of the Pulsar Java client library is available via Maven Central. To use the latest version, add the pulsar-client
library to your build configuration.
Maven
如果你使用maven,添加以下内容到你的 pom.xml
中:
<!-- 在你的 <properties> 部分-->
<pulsar.version>2.6.1</pulsar.version>
<!-- 在你的 <dependencies> 部分-->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
Gradle
如果你使用Gradle,添加以下内容到你的 build.gradle
中:
def pulsarVersion = '2.6.1'
dependencies {
compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion
}
连接URL
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
Pulsar protocol URLs are assigned to specific clusters, use the pulsar
scheme and have a default port of 6650. Here’s an example for localhost
:
pulsar://localhost:6650
A URL for a production Pulsar cluster may look something like this:
pulsar://pulsar.us-west.example.com:6650
If you’re using TLS authentication, the URL will look like something like this:
pulsar+ssl://pulsar.us-west.example.com:6651
客户端配置
你可以用一个URL来实例化一个连接到指定的Pulsar 集群的 PulsarClient 对象,像这样:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
默认的broker URL是单机集群。
如果你使用单机模式运行一个集群,broker将默认使用
pulsar://localhost:6650
完整的配置参数列表参考 PulsarClient 类的javadoc文档 。
Producers
在Pulsar中,生产者写消息到主题中。 一旦你实例化一个PulsarClient 客户端对象(在如上z章节),你可以创建一个Producer 生产者用于特定的主题。
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.create();
// 然后你就可以发送消息到指定的broker 和topic上:
producer.send("My message".getBytes());
By default, producers produce messages that consist of byte arrays. You can produce different types, however, by specifying a message schema.
Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("my-topic")
.create();
stringProducer.send("My message");
在不再使用时,你需要确保关闭生产者、消费者和客户端:
producer.close(); consumer.close(); client.close();
关闭操作也可以是异步的:
```java
producer.closeAsync()
.thenRun(() -> System.out.println("Producer closed"));
.exceptionally((ex) -> {
System.err.println("Failed to close producer: " + ex);
return ex;
});
生产者配置
如果实例化 生产者
对象时仅指定主题topic名称 (如上面的示例所示), 则生产者将使用默认配置。 要使用非默认配置, 你可以设置多种可配置的参数。 For a full listing, see the Javadoc for the ProducerBuilder class. Here’s an example:
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.sendTimeout(10, TimeUnit.SECONDS)
.blockIfQueueFull(true)
.create();
消息路由
当使用分区主题时,当你使用生产者发布消息时你可以指定路由模式。 有关使用 Java 客户端指定路由模式的更多内容, 请参见 分区主题 cookbook。
异步发送
你可以使用Java客户端异步发布消息。 使用异步发送,生产者将消息放入阻塞队列并立即返回。 然后,客户端将在后台将消息发送给broker。 如果队列已满(最大值可配置),则在调用API时,生产者可能会被阻塞或立即失败,具体取决于传递给生产者的参数。
以下是异步发送操作的示例:
producer.sendAsync("my-async-message".getBytes()).thenAccept(msgId -> {
System.out.printf("Message with ID %s successfully sent", msgId);
});
As you can see from the example above, async send operations return a MessageId wrapped in a CompletableFuture
.
消息配置
除了value之外, 还可以在特定消息上设置其他选项:
producer.newMessage()
.key("my-message-key")
.value("my-async-message".getBytes())
.property("my-key", "my-value")
.property("my-other-key", "my-other-value")
.send();
对于前一种情况,也可以使用sendAsync()
来终止构建器链,并获取future返回值。
Consumers
在Pulsar中,消费者订阅topic主题并处理生产者发布到这些主题的消息。 你可以首先实例化一个PulsarClient 对象并传递给他一个borker(如上所示) URL来实例化一个消费者。
一旦实例化一个PulsarClient 对象,你可以指定一个主题和一个订阅来创建一个 Consumer 消费者。
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();
subscribe
方法将自动将订阅消费者指定的主题和订阅。 一种让消费者监听主题的方法是使用while
循环。 In this example loop, the consumer listens for messages, prints the contents of any message that’s received, and then acknowledges that the message has been processed. If the processing logic fails, we use negative acknowledgement to have the message redelivered at a later point in time.
while (true) {
// Wait for a message
Message msg = consumer.receive();
try {
// Do something with the message
System.out.printf("Message received: %s", new String(msg.getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} catch (Exception e) {
// Message failed to process, redeliver later
consumer.negativeAcknowledge(msg);
}
}
消费者配置
如果实例化 消费者
对象, 仅指定主题和订阅名称, 如上面的示例所示, 消费者将采用默认配置。 要使用非默认配置, 你可以设置多种可配置的参数。 有关完整列表, 请参阅 ConsumerBuilder 类javadoc文档。 Here’s an example:
这是一个示例配置:
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
异步接收
receive
方法将异步接受消息(消费者处理器将被阻塞,直到有消息到达)。 你也可以使用异步接收方法,这将在一个新消息到达时立即返回一个CompletableFuture
对象。
Here’s an example:
CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
Async receive operations return a Message wrapped inside of a CompletableFuture
.
多主题订阅
消费者除了订阅单个Pulsar主题外,你还可以使用多主题订阅订阅多个主题。 若要使用多主题订阅, 可以提供一个topic正则表达式 (regex) 或 主题List
。 如果通过 regex 选择主题, 则所有主题都必须位于同一Pulsar命名空间中。
下面是一些示例:
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
.subscriptionName(subscription);
// 订阅命名空间中的所有主题
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(allTopicsInNamespace)
.subscribe();
// 使用regex订阅命名空间中的主题子集
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(someTopicsInNamespace)
.subscribe();
你还可以订阅明确的主题列表 (如果愿意, 可跨命名空间):
List<String> topics = Arrays.asList(
"topic-1",
"topic-2",
"topic-3"
);
Consumer multiTopicConsumer = consumerBuilder
.topics(topics)
.subscribe();
// 或者:
Consumer multiTopicConsumer = consumerBuilder
.topics(
"topic-1",
"topic-2",
"topic-3"
)
.subscribe();
You can also subscribe to multiple topics asynchronously using the subscribeAsync
method rather than the synchronous subscribe
method. Here’s an example:
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*");
consumerBuilder
.topics(topics)
.subscribeAsync()
.thenAccept(consumer -> {
do {
try {
Message msg = consumer.receive();
// Do something with the received message
} catch (PulsarClientException e) {
e.printStackTrace();
}
} while (true);
});
Reader 接口
使用 reader 接口, Pulsar客户可以在主题中“手动定位”自己,从指定的消息开始向前读取所有消息。 The Pulsar API for Java enables you to create Reader objects by specifying a topic, a MessageId , and ReaderConfiguration .
Here’s an example:
ReaderConfiguration conf = new ReaderConfiguration();
byte[] msgIdBytes = // 一些消息ID 的字节数组
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(id)
.create();
while (true) {
Message message = reader.readNext();
// 处理消息
}
在上面的示例中,实例化一个Reader
对象对指定的主题和消息(ID); reader将遍历主题中msgIdBytes
(取值方式取决于应用程序) 之后的消息。
上面的示例代码展示了Reader
对象指向特定的消息(ID),但你也可以使用MessageId.earliest
来指向topic上最早可用的消息,使用MessageId.latest
指向最新的消息。
Schema
在Pulsar中,所有的消息数据都在字节数组中,消息schema允许在构造和处理消息时使用其他类型的数据(从简单类型(如字符串)到更复杂的特定应用程序的类型)。 如果在不指定schema的情况下构造 生产者,则生产者只能生成类型为 byte[]
的消息。 Here’s an example:
Producer<byte[]> producer = client.newProducer()
.topic(topic)
.create();
上面的生产者相当于 Producer<byte[]>
(实际上, 你应该 总是 显式指定类型)。 如果你想让产生者使用不同类型的数据,你需要指定一个schema来通知Pulsar 在topic上传输哪种类型的数据。
Schema实例
假设您有一个 SensorReading
类, 你想通过Pulsar主题进行传输:
public class SensorReading {
public float temperature;
public SensorReading(float temperature) {
this.temperature = temperature;
}
// A no-arg constructor is required
public SensorReading() {
}
public float getTemperature() {
return temperature;
}
public void setTemperature(float temperature) {
this.temperature = temperature;
}
}
你可以创建一个Producer<SensorReading>
(或Consumer<SensorReading>
)像这样:
Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
.topic("sensor-readings")
.create();
以下schema格式目前可用于 Java:
- 无schema 或者字节数组schema(可以使用
Schema.BYTES
):
Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)
.topic("some-raw-bytes-topic")
.create();
Or, equivalently:
Producer<byte[]> bytesProducer = client.newProducer()
.topic("some-raw-bytes-topic")
.create();
String
for normal UTF-8-encoded string data. This schema can be applied usingSchema.STRING
:Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("some-string-topic")
.create();
JSON schemas can be created for POJOs using the
JSONSchema
class. Here’s an example:Schema<MyPojo> pojoSchema = JSONSchema.of(MyPojo.class);
Producer<MyPojo> pojoProducer = client.newProducer(pojoSchema)
.topic("some-pojo-topic")
.create();
Authentication
Pulsar currently supports two authentication schemes: TLS and Athenz. The Pulsar Java client can be used with both.
TLS Authentication
To use TLS, you need to set TLS to true
using the setUseTls
method, point your Pulsar client to a TLS cert path, and provide paths to cert and key files.
Here’s an example configuration:
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", "/path/to/client-cert.pem");
authParams.put("tlsKeyFile", "/path/to/client-key.pem");
Authentication tlsAuth = AuthenticationFactory
.create(AuthenticationTls.class.getName(), authParams);
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://my-broker.com:6651")
.enableTls(true)
.tlsTrustCertsFilePath("/path/to/cacert.pem")
.authentication(tlsAuth)
.build();
Athenz
要使用Athenz做为身份认证提供者,你需要use TLS并且在hash提供如下四个参数的值:
tenantDomain
tenantService
providerDomain
privateKey
You can also set an optional keyId
. 这是一个示例配置:
Map<String, String> authParams = new HashMap<>();
authParams.put("tenantDomain", "shopping"); // Tenant domain name
authParams.put("tenantService", "some_app"); // Tenant service name
authParams.put("providerDomain", "pulsar"); // Provider domain name
authParams.put("privateKey", "file:///path/to/private.pem"); // Tenant private key path
authParams.put("keyId", "v1"); // Key id for the tenant private key (optional, default: "0")
Authentication athenzAuth = AuthenticationFactory
.create(AuthenticationAthenz.class.getName(), authParams);
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://my-broker.com:6651")
.enableTls(true)
.tlsTrustCertsFilePath("/path/to/cacert.pem")
.authentication(athenzAuth)
.build();
支持的格式:
privateKey
参数支持如下三种格式: *file:///path/to/file
*file:/path/to/file
*data:application/x-pem-file;base64,<base64-encoded value>