Pulsar 推出了支持 Java,Go,Python 和 C++ 的客户端 API。 Puslar API 优化并封装了客户端到 broker 端的通信协议,暴露出一套简单直观的 API 供应用程序使用。
底层实现上,目前官方版的Pulsar客户端支持对用户透明的连接重连、故障切换、未ack消息的缓冲、消息重传。
自定义客户端库
如果您想创建自己的客户端库, 我们建议参考Pulsar的自定义 二进制协议 的文档。
客户端使用步骤
当应用程序要创建生产者/消费者时, Pulsar客户端库执行按以下两个步骤的工作:
- 客户端将尝试通过向服务器(Broker)发送 HTTP 查找请求,来确定主题(Topic)所在的服务器(Broker)。 客户端通过查询Zookeeper中(缓存)的元数据,来确定这条消息的topic在哪个broker上,如果该topic不在任何一个broker上,则把这个topic分配在负载最少的broker上。
- 当客户端获取了broker的地址之后,将会创建一个TCP连接(或复用连接池中的连接)并且进行鉴权。 客户端和broker通过该连接交换基于自定义协议的二进制命令。 同时,客户端会向broker发送一条命令用以在broker上创建生产者/消费者,该命令将会在验证授权策略后生效。
每当 TCP 连接中断时, 客户端将立即重新启动此安装阶段, 并将继续尝试使用指数退避重新建立生产者或使用者, 直到操作成功为止。
Reader 接口
在Pulsar中, “标准” 消费者接口 涉及使用消费者监听 主题, 处理传入消息, 并在处理完这些消息后最终确认它们。 不论任何时候创建的一个新订阅,默认都会定位在 topic 的末尾位置,这意味着使用该订阅的消费者都只能接收在这之后新产生的消息。 如果消费者使用已经存在的订阅来连接 topic 时,它将从该订阅内最早的未确认消息开始读取。 总之,消费者接口是基于消息确认机制来自动管理订阅游标位置。
Pulsar 的 reader 接口允许应用程序手动管理游标。 当您使用 reader(而不是消费者)连接 topic 时,需要指定 reader 在连接到该 topic 时从哪条消息开始消费。 当连接到一个 topic 时,reader 接口支持的开始位置包括:
- Topic 中最早的可用消息
- Topic 中最新的可用消息
- 如果你想开始的位置在最早和最新之间, 则需要显示的指定消息 ID。 你的应用程序将需要提前“知道”这个消息 ID,可能要从持久化存储或缓存中获取。
Pulsar的读取器接口在流计算场景下,对提供effective-once的语义很有帮助。 Pulsar能够将主题的消息进行重放,并从重放后的位置开始读取消息,是满足流处理的场景的重要基础。 Reader 接口为 Pulsar 客户端在 Topic 内提供了一种能“手动管理起始位置”的底层抽象。
仅支持没有分区的 topic
Reader 接口目前无法在有分区主题(partitioned topics)上使用。
下面是一个Java语言实现的从主题上最早可用消息的位置开始消费的例子
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
// Create a reader on a topic and for a specific message (and onward)
Reader<byte[]> reader = pulsarClient.newReader()
.topic("reader-api-test")
.startMessageId(MessageId.earliest)
.create();
while (true) {
Message message = reader.readNext();
// Process the message
}
创建一个从最新可用消息处开始读取消息的读取器
Reader<byte[]> reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.latest)
.create();
创建一个从其他位置(非最早可用且非最新可用消息处)读取消息的读取器
byte[] msgIdBytes = // Some byte array
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader<byte[]> reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(id)
.create();