Pulsar Clients

Pulsar 推出了支持 JavaGoPythonC++ 的客户端 API。 Puslar API 优化并封装了客户端到 broker 端的通信协议,暴露出一套简单直观的 API 供应用程序使用。

底层实现上,目前官方版的Pulsar客户端支持对用户透明的连接重连、故障切换、未ack消息的缓冲、消息重传。

自定义客户端库

如果您想创建自己的客户端库, 我们建议参考Pulsar的自定义 二进制协议 的文档。

客户端使用步骤

当应用程序要创建生产者/消费者时, Pulsar客户端库执行按以下两个步骤的工作:

  1. 客户端将尝试通过向服务器(Broker)发送 HTTP 查找请求,来确定主题(Topic)所在的服务器(Broker)。 客户端通过查询Zookeeper中(缓存)的元数据,来确定这条消息的topic在哪个broker上,如果该topic不在任何一个broker上,则把这个topic分配在负载最少的broker上。
  2. 当客户端获取了broker的地址之后,将会创建一个TCP连接(或复用连接池中的连接)并且进行鉴权。 客户端和broker通过该连接交换基于自定义协议的二进制命令。 同时,客户端会向broker发送一条命令用以在broker上创建生产者/消费者,该命令将会在验证授权策略后生效。

每当 TCP 连接中断时, 客户端将立即重新启动此安装阶段, 并将继续尝试使用指数退避重新建立生产者或使用者, 直到操作成功为止。

Reader 接口

在Pulsar中, “标准” 消费者接口 涉及使用消费者监听 主题, 处理传入消息, 并在处理完这些消息后最终确认它们。 不论任何时候创建的一个新订阅,默认都会定位在 topic 的末尾位置,这意味着使用该订阅的消费者都只能接收在这之后新产生的消息。 如果消费者使用已经存在的订阅来连接 topic 时,它将从该订阅内最早的未确认消息开始读取。 总之,消费者接口是基于消息确认机制来自动管理订阅游标位置。

The reader interface for Pulsar enables applications to manually manage cursors. When you use a reader to connect to a topic—-rather than a consumer—-you need to specify which message the reader begins reading from when it connects to a topic. 当连接到一个 topic 时,reader 接口支持的开始位置包括:

  • The earliest available message in the topic
  • The latest available message in the topic
  • 如果你想开始的位置在最早和最新之间, 则需要显示的指定消息 ID。 你的应用程序将需要提前“知道”这个消息 ID,可能要从持久化存储或缓存中获取。

Reader 接口对流处理系统中,需要用到 effectively-once(仅仅一次) 语义的场景是很有帮助的。 Pulsar能够将主题的消息进行重放,并从重放后的位置开始读取消息,是满足流处理的场景的重要基础。 Reader 接口为 Pulsar 客户端在 Topic 内提供了一种能“手动管理起始位置”的底层抽象。

Reader 接口内部是作为一个使用独占、非持久化订阅的被随机命名的一个消费者来实现的。

[ IMPORTANT ]

Unlike subscription/consumer, readers are non-durable in nature and will not prevent data in a topic from being deleted, thus it is strongly advised that data retention be configured. 如果主题没有配置足够长的消息保留时间,就会出现消息还没有被读取就被删除的情况。 这将导致 reader 读取不到这条消息。 为主题配置数据保留时间,就可以保证 reader 可以在一定时间内可以获取到该消息。

请注意 reader 可以有一个 “backlog”,但是该指标只是为了让用户了解到 reader 背后的运行情况,但是在进行任何积压配额计算是都不会考虑该因素。

Pulsar的消费者和读取器接口

仅支持没有分区的 topic

Reader 接口目前无法在有分区主题(partitioned topics)上使用。

下面是一个Java语言实现的从主题上最早可用消息的位置开始消费的例子

  1. import org.apache.pulsar.client.api.Message;
  2. import org.apache.pulsar.client.api.MessageId;
  3. import org.apache.pulsar.client.api.Reader;
  4. // Create a reader on a topic and for a specific message (and onward)
  5. Reader<byte[]> reader = pulsarClient.newReader()
  6. .topic("reader-api-test")
  7. .startMessageId(MessageId.earliest)
  8. .create();
  9. while (true) {
  10. Message message = reader.readNext();
  11. // Process the message
  12. }

创建一个从最新可用消息处开始读取消息的读取器

  1. Reader<byte[]> reader = pulsarClient.newReader()
  2. .topic(topic)
  3. .startMessageId(MessageId.latest)
  4. .create();

创建一个从其他位置(非最早可用且非最新可用消息处)读取消息的读取器

  1. byte[] msgIdBytes = // Some byte array
  2. MessageId id = MessageId.fromByteArray(msgIdBytes);
  3. Reader<byte[]> reader = pulsarClient.newReader()
  4. .topic(topic)
  5. .startMessageId(id)
  6. .create();