The Pulsar Node.js client
Pulsar Node.js 客户端可用于在 Node.js 中创建 Pulsar producer、consumer 和 reader。
Node.js 客户端中 producer、consumer 和 reader 中的所有方法都对线程可用。
安装
You can install the pulsar-client
library via npm.
要求
Pulsar Node.js client library is based on the C++ client library. Follow these instructions and install the Pulsar C++ client library.
兼容性
Compatibility between each version of the Node.js client and the C++ client is as follows:
Node.js 客户端 | C++ client |
---|---|
1.0.0 | 2.3.0 及以上 |
1.1.0 | 2.4.0 or later |
1.2.0 | 2.5.0 or later |
If an incompatible version of the C++ client is installed, you may fail to build or run this library.
使用 npm 安装
Install the pulsar-client
library via npm:
$ npm install pulsar-client
Note
因为这个库使用
node-addon-api
模块来包装 C++ 库,因此只能在 Node.js 10.x 或更高版本中运行。
连接URL
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
Pulsar protocol URLs are assigned to specific clusters, use the pulsar
scheme and have a default port of 6650. Here is an example for localhost
:
pulsar://localhost:6650
A URL for a production Pulsar cluster may look something like this:
pulsar://pulsar.us-west.example.com:6650
If you are using TLS encryption or TLS Authentication, the URL will look like something like this:
pulsar+ssl://pulsar.us-west.example.com:6651
创建客户端
In order to interact with Pulsar, you will first need a client object. You can create a client instance using a new
operator and the Client
method, passing in a client options object (more on configuration below).
Here is an example:
const Pulsar = require('pulsar-client');
(async () => {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
await client.close();
})();
客户端配置
The following configurable parameters are available for Pulsar clients:
参数名 | Description | 默认值 |
---|---|---|
serviceUrl | The connection URL for the Pulsar cluster. 更多详细信息,参阅这里。 | |
authentication | Configure the authentication provider. (default: no authentication). 更多详细信息,参阅 TLS 认证。 | |
operationTimeoutSeconds | Node.js 客户端操作的超时时间(创建 producer、订阅/取消订阅 topic)。 Retries will occur until this threshold is reached, at which point the operation will fail. | 30 |
ioThreads | 用于处理 Pulsar broker 连接的线程数。 | 1 |
messageListenerThreads | consumer 和 readers 监听消息使用的线程数。 | 1 |
concurrentLookupRequest | The number of concurrent lookup requests that can be sent on each broker connection. Setting a maximum helps to keep from overloading brokers. 只有当客户端需要生产及/或订阅数千 Pulsar topic 时,才需要修改预设的默认值:50000 。 | 50000 |
tlsTrustCertsFilePath | 信任的 TLS 证书的文件路径。 | |
tlsValidateHostname | 设置是否启用 TLS 主机名验证。 | false |
tlsAllowInsecureConnection | 设置是否让 Pulsar 客户端从 broker 接受不信任的 TLS 证书。 | false |
statsIntervalInSeconds | 两次信息统计之间的时间间隔。 当 statsInterval 为正数时启用信息统计。 最小值为 1 秒。 | 600 |
log | A function that is used for logging. | console.log |
Producers
Pulsar producers publish messages to Pulsar topics. You can configure Node.js producers using a producer configuration object.
Here is an example:
const producer = await client.createProducer({
topic: 'my-topic',
});
await producer.send({
data: Buffer.from("Hello, Pulsar"),
});
await producer.close();
Promise 操作
When you create a new Pulsar producer, the operation will return
Promise
object and get producer instance or an error through executor function.
In this example, using await operator instead of executor function.
Producer operations
Pulsar Node.js producers have the following methods available:
Method | Description | Return type |
---|---|---|
send(Object) | Publishes a message to the producer’s topic. 当 Pulsar broker 成功确认消息,或是准备抛出异常时,Promise 对象会运行 executor 函数。 | Promise<null> |
flush() | 从发送队列发送消息给 Pulsar broker。 当 Pulsar broker 成功确认消息,或是准备抛出异常时,Promise 对象会运行 executor 函数。 | Promise<null> |
close() | Closes the producer and releases all resources allocated to it. 如果调用 close() ,则不再接收发布者的消息。 此方法将返回 Promise 对象,并在 Pulsar 持久化所有待发布请求时运行 executor 函数。 If an error is thrown, no pending writes will be retried. | Promise<null> |
Producer 配置
参数名 | Description | 默认值 |
---|---|---|
topic | Producer 发布消息的目标 Pulsar topic。 | |
producerName | A name for the producer. 如果没有指定名称,Pulsar 将自动生成全局唯一的名称。 如果指定名称,则指定的名称必须在所有 Pulsar 集群中唯一,否则创建操作将会抛出异常。 | |
sendTimeoutMs | When publishing a message to a topic, the producer will wait for an acknowledgment from the responsible Pulsar broker. If a message is not acknowledged within the threshold set by this parameter, an error will be thrown. 如果将 sendTimeoutMs 设为 -1,那么超时时间将为无穷大(因而移除超时限定)。 Removing the send timeout is recommended when using Pulsar’s message de-duplication feature. | 30000 |
initialSequenceId | 消息的初始序列号 Producer 在发送消息时,将序列号添加到消息中。 只要发送消息,序列号就会相应增加。 | |
maxPendingMessages | The maximum size of the queue holding pending messages (i.e. messages waiting to receive an acknowledgment from the broker). 默认情况下,当队列已满时,不能继续调用 send 方法,除非将 blockIfQueueFull 设置为 true 。 | 1000 |
maxPendingMessagesAcrossPartitions | 所有分区等待队列空间总和的最大值。 | 50000 |
blockIfQueueFull | 如果设置为 true ,当消息传出队列已满时,producer 的 send 方法会进行等待,而不是抛出异常(该队列大小由 maxPendingMessages 参数决定);如果设置为 false (默认值),当队列已满时,不能调用 send ,并抛出异常。 | false |
messageRoutingMode | 消息路由逻辑(分区 topic 上的 producer 使用)。 This logic is applied only when no key is set on messages. 可以设置为循环(RoundRobinDistribution ),或是发布所有消息到单个分区 (UseSinglePartition ,默认值)。 | UseSinglePartition |
hashingScheme | The hashing function that determines the partition on which a particular message is published (partitioned topics only). 可以设置为 JavaStringHash (等同于 Java 中的 String.hashCode() )、Murmur3_32Hash (应用了 Murmur3 散列函数)、或 BoostHash (应用了 C++ 中 Boost 库的散列函数)。 | BoostHash |
compressionType | The message data compression type used by the producer. 可以设置为 LZ4 和 Zlib 。 | Compression None |
batchingEnabled | 如果设置为 true ,producer 将批量发送消息。 | true |
batchingMaxPublishDelayMs | 批量发送消息的最大延迟。 | 10 |
batchingMaxMessages | 批量发送消息时,消息大小的最大值。 | 1000 |
properties | Producer 的元数据。 |
生产者示例
This example creates a Node.js producer for the my-topic
topic and sends 10 messages to that topic:
const Pulsar = require('pulsar-client');
(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
// Create a producer
const producer = await client.createProducer({
topic: 'my-topic',
});
// Send messages
for (let i = 0; i < 10; i += 1) {
const msg = `my-message-${i}`;
producer.send({
data: Buffer.from(msg),
});
console.log(`Sent message: ${msg}`);
}
await producer.flush();
await producer.close();
await client.close();
})();
Consumers
Pulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can configure Node.js consumers using a consumer configuration object.
Here is an example:
const consumer = await client.subscribe({
topic: 'my-topic',
subscription: 'my-subscription',
});
const msg = await consumer.receive();
console.log(msg.getData().toString());
consumer.acknowledge(msg);
await consumer.close();
Promise 操作
When you create a new Pulsar consumer, the operation will return
Promise
object and get consumer instance or an error through executor function.
In this example, using await operator instead of executor function.
Consumer operations
Pulsar Node.js consumers have the following methods available:
Method | Description | Return type |
---|---|---|
receive() | Receives a single message from the topic. 当消息可用时,Promise 对象运行 executor 函数并获取消息对象。 | Promise<Object> |
receive(Number) | 在指定的超时时间(以毫秒为单位)内从 topic 接收单条消息。 | Promise<Object> |
acknowledge(Object) | 通过消息对象向 Pulsar broker 确认收到一条消息。 | void |
acknowledgeId(Object) | 通过消息 ID 向 Pulsar broker 确认收到一条消息。 | void |
acknowledgeCumulative(Object) | 确认收到流中到指定消息为止的所有消息,包括指定消息。 acknowledgeCumulative 方法返回 void,并向 broker 异步发送确认。 消息将不会重新发送给 consumer。 共享订阅类型不支持累积确认。 | void |
acknowledgeCumulativeId(Object) | 确认收到流中到指定消息为止所有消息的 ID,包括指定消息的 ID。 | void |
negativeAcknowledge(Message) | Negatively acknowledges a message to the Pulsar broker by message object. | void |
negativeAcknowledgeId(MessageId) | Negatively acknowledges a message to the Pulsar broker by message ID object. | void |
close() | 关闭 consumer,使 consumer 不再从 broker 接收消息。 | Promise<null> |
Consumer configuration
参数名 | Description | 默认值 |
---|---|---|
topic | Consumer 创建订阅并监听消息的目标 Pulsar topic。 | |
subscription | Consumer 的订阅名称。 | |
subscriptionType | 可以设置为独占 、共享 或灾备 。 | Exclusive |
subscriptionInitialPosition | Initial position at which to set cursor when subscribing to a topic at first time. | SubscriptionInitialPosition.Latest |
ackTimeoutMs | 消息确认的超时时间(以毫秒为单位)。 | 0 |
nAckRedeliverTimeoutMs | Delay to wait before redelivering messages that failed to be processed. | 60000 |
receiverQueueSize | 设置 consumer 接收队列的大小,即在应用程序调用 receive 之前允许堆积的消息数。 A value higher than the default of 1000 could increase consumer throughput, though at the expense of more memory utilization. | 1000 |
receiverQueueSizeAcrossPartitions | Set the max total receiver queue size across partitions. 当接收队列总数超过此值时,将减小每个分区接收队列的大小。 | 50000 |
consumerName | Consumer 的名称。 当前(v2.4.1)版本中, 灾备模式按照 consumer 名称排序。 | |
properties | Consumer 的元数据。 | |
listener | A listener that is called for a message received. | |
readCompacted | If enabling readCompacted , a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal. readCompacted can only be enabled on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException . | false |
消费者示例
This example creates a Node.js consumer with the my-subscription
subscription on the my-topic
topic, receives messages, prints the content that arrive, and acknowledges each message to the Pulsar broker for 10 times:
const Pulsar = require('pulsar-client');
(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
// Create a consumer
const consumer = await client.subscribe({
topic: 'my-topic',
subscription: 'my-subscription',
subscriptionType: 'Exclusive',
});
// Receive messages
for (let i = 0; i < 10; i += 1) {
const msg = await consumer.receive();
console.log(msg.getData().toString());
consumer.acknowledge(msg);
}
await consumer.close();
await client.close();
})();
Instead a consumer can be created with listener
to process messages.
// Create a consumer
const consumer = await client.subscribe({
topic: 'my-topic',
subscription: 'my-subscription',
subscriptionType: 'Exclusive',
listener: (msg, msgConsumer) => {
console.log(msg.getData().toString());
msgConsumer.acknowledge(msg);
},
});
Readers
Pulsar readers process messages from Pulsar topics. Readers are different from consumers because with readers you need to explicitly specify which message in the stream you want to begin with (consumers, on the other hand, automatically begin with the most recently unacked message). You can configure Node.js readers using a reader configuration object.
Here is an example:
const reader = await client.createReader({
topic: 'my-topic',
startMessageId: Pulsar.MessageId.earliest(),
});
const msg = await reader.readNext();
console.log(msg.getData().toString());
await reader.close();
Reader operations
Pulsar Node.js readers have the following methods available:
Method | Description | Return type |
---|---|---|
readNext() | 接收 topic 中的下一条消息(类似于 consumer 中的 receive 方法)。 当消息可用时,Promise 对象运行 executor 函数并获取消息对象。 | Promise<Object> |
readNext(Number) | 在指定的超时时间(以毫秒为单位)内从 topic 接收单条消息。 | Promise<Object> |
hasNext() | 返回 broker 是否在目标 topic 中存有下一条消息。 | Boolean |
close() | 关闭 reader,使 reader 不再从 broker 接收消息。 | Promise<null> |
Reader configuration
参数名 | Description | 默认值 |
---|---|---|
topic | Reader 创建订阅并监听消息的目标 Pulsar topic。 | |
startMessageId | Reader 的初始位置,即 reader 处理的第一条消息的位置。 可以设置为 Pulsar.MessageId.earliest (topic 中最早的可用消息),或 Pulsar.MessageId.latest (topic 中最新的可用消息),或是任一其他消息 ID。 | |
receiverQueueSize | 设置 reader 接收队列的大小,即在应用程序调用 readNext 之前允许堆积的消息数。 A value higher than the default of 1000 could increase reader throughput, though at the expense of more memory utilization. | 1000 |
readerName | Reader 的名称。 | |
subscriptionRolePrefix | The subscription role prefix. | |
readCompacted | If enabling readCompacted , a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal. readCompacted can only be enabled on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException . | false |
Reader 示例
This example creates a Node.js reader with the my-topic
topic, reads messages, and prints the content that arrive for 10 times:
const Pulsar = require('pulsar-client');
(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
operationTimeoutSeconds: 30,
});
// Create a reader
const reader = await client.createReader({
topic: 'my-topic',
startMessageId: Pulsar.MessageId.earliest(),
});
// read messages
for (let i = 0; i < 10; i += 1) {
const msg = await reader.readNext();
console.log(msg.getData().toString());
}
await reader.close();
await client.close();
})();
Messages
In Pulsar Node.js client, you have to construct producer message object for producer.
Here is an example message:
const msg = {
data: Buffer.from('Hello, Pulsar'),
partitionKey: 'key1',
properties: {
'foo': 'bar',
},
eventTimestamp: Date.now(),
replicationClusters: [
'cluster1',
'cluster2',
],
}
await producer.send(msg);
The following keys are available for producer message objects:
参数名 | Description |
---|---|
data | 消息数据的实际有效负载。 |
properties | 用于存储应用程序指定消息附带的所有元数据。 |
eventTimestamp | 与消息相关联的时间戳。 |
sequenceId | 消息的序列号。 |
partitionKey | 与消息关联的可选键值(对于 topic 压缩等尤其有用)。 |
replicationClusters | The clusters to which this message will be replicated. Pulsar brokers handle message replication automatically; you should only change this setting if you want to override the broker default. |
消息对象操作
In Pulsar Node.js client, you can receive (or read) message object as consumer (or reader).
The message object have the following methods available:
Method | Description | Return type |
---|---|---|
getTopicName() | Topic 名称的 Getter 方法。 | String |
getProperties() | 属性的 Getter 方法。 | 数组<Object> |
getData() | 消息数据的 Getter方法。 | Buffer |
getMessageId() | 消息 id 对象的 Getter 方法。 | 对象 |
getPublishTimestamp() | 发布时间戳的 Getter 方法。 | 数值 |
getEventTimestamp() | 事件时间戳的 Getter 方法。 | 数值 |
getRedeliveryCount() | Getter method of redelivery count. | 数值 |
getPartitionKey() | 分区键的 Getter 方法。 | String |
消息 ID 对象操作
In Pulsar Node.js client, you can get message id object from message object.
The message id object have the following methods available:
Method | Description | Return type |
---|---|---|
serialize() | 序列化消息 id 到缓存中进行存储。 | Buffer |
toString() | 获取消息 id 字符串。 | String |
The client has static method of message id object. You can access it as Pulsar.MessageId.someStaticMethod
too.
The following static methods are available for the message id object:
Method | Description | Return type |
---|---|---|
earliest() | MessageId 表示存储在 topic 中最早/最旧的可用消息。 | 对象 |
latest() | MessageId 表示存储在 topic 中最晚/最新的可用消息。 | 对象 |
deserialize(Buffer) | 从缓存中反序列化出消息 id。 | 对象 |