Pulsar Clients

Pulsar 暴露了支持 JavaGoPythonC++C# 等语言的客户端 API。 Pulsar API 优化并封装了客户端到 broker 端的通信协议,暴露出一套简单直观的 API 供应用程序使用。

底层实现上,目前官方版的 Pulsar 客户端支持对 broker 的透明连接/重连、故障切换、未 ack 消息的缓冲、消息重传。

Custom client libraries If you’d like to create your own client library, we recommend consulting the documentation on Pulsar’s custom binary protocol.

客户端设置步骤

在应用程序创建生产者/消费者之前,Pulsar 客户端库需要启动一个设置阶段,包括两个步骤:

  1. 客户端将尝试通过向服务器(Broker)发送 HTTP 查找请求,来确定主题(Topic)所在的服务器(Broker)。 客户端通过查询 ZooKeeper 中 (缓存) 的元数据,来确定这条消息的 topic 在哪个 broker 上,如果该 topic 不在任何一个 broker 上,则把这个 topic 分配在负载最少的 broker 上。
  2. 当客户端获取了broker的地址之后,将会创建一个TCP连接 (或复用连接池中的连接) 并且进行鉴权。 客户端和broker通过该连接交换基于自定义协议的二进制命令。 同时,客户端会向broker发送一条命令用以在broker上创建生产者/消费者,该命令将会在验证授权策略后生效。

每当 TCP 连接中断时, 客户端将立即重新启动上述步骤的设置阶段, 并将继续尝试使用指数退避重新建立生产者或消费者, 直到上述步骤执行成功为止。

Reader 接口

在Pulsar中, “标准” 消费者接口 涉及使用消费者监听 主题, 处理传入消息, 并在处理完这些消息后最终确认它们。 不论任何时候创建的一个新订阅,默认都会定位在 topic 的末尾位置,这意味着使用该订阅的消费者都只能接收在这之后新产生的消息。 如果消费者使用已经存在的订阅来连接 topic 时,它将从该订阅内最早的未确认消息开始读取。 总之,消费者接口是基于消息确认机制来自动管理订阅游标位置。

The reader interface for Pulsar enables applications to manually manage cursors. When you use a reader to connect to a topic—-rather than a consumer—-you need to specify which message the reader begins reading from when it connects to a topic. 当连接到一个 topic 时,reader 接口支持的开始位置包括:

  • The earliest available message in the topic
  • The latest available message in the topic
  • 在最早和最新之间的其他消息。 如果你选择此选项,则需要明确提供消息 ID。 你的应用程序将需要提前“知道”这个消息 ID,可能要从持久化存储或缓存中获取。

Reader 接口对流处理系统中,需要用到 effectively-once(仅仅一次) 语义的场景是很有帮助的。 Pulsar能够将主题的消息进行重放,并从重放后的位置开始读取消息,是满足流处理的场景的重要基础。 Reader 接口为 Pulsar 客户端在 Topic 内提供了一种能“手动管理起始位置”的底层抽象。

Reader 接口内部是作为一个使用独占、非持久化订阅的被随机命名的一个消费者来实现的。

[ IMPORTANT ]

Unlike subscription/consumer, readers are non-durable in nature and does not prevent data in a topic from being deleted, thus it is strongly advised that data retention be configured. 如果主题没有配置足够长的消息保留时间,就会出现消息还没有被 Reader 读取就被删除的情况。 这将导致 reader 读取不到这条消息。 为主题配置数据保留时间,就可以保证 reader 可以在一定时间内可以获取到该消息。

Please also note that a reader can have a “backlog”, but the metric is only used for users to know how behind the reader is. The metric is not considered for any backlog quota calculations.

Pulsar的消费者和读取器接口

下面是一个Java语言实现的从主题上最早可用消息的位置开始消费的例子

  1. import org.apache.pulsar.client.api.Message;
  2. import org.apache.pulsar.client.api.MessageId;
  3. import org.apache.pulsar.client.api.Reader;
  4. // Create a reader on a topic and for a specific message (and onward)
  5. Reader<byte[]> reader = pulsarClient.newReader()
  6. .topic("reader-api-test")
  7. .startMessageId(MessageId.earliest)
  8. .create();
  9. while (true) {
  10. Message message = reader.readNext();
  11. // Process the message
  12. }

创建一个从最新可用消息处开始读取消息的 reader:

  1. Reader<byte[]> reader = pulsarClient.newReader()
  2. .topic(topic)
  3. .startMessageId(MessageId.latest)
  4. .create();

创建一个从最早和最新位置之间读取消息的 reader:

  1. byte[] msgIdBytes = // Some byte array
  2. MessageId id = MessageId.fromByteArray(msgIdBytes);
  3. Reader<byte[]> reader = pulsarClient.newReader()
  4. .topic(topic)
  5. .startMessageId(id)
  6. .create();