The Pulsar Node.js client
Pulsar Node.js 客户端可用于在 Node.js 中创建 Pulsar producer、consumer 和 reader。
Node.js 客户端中 producer、consumer 和 reader 中的所有方法都对线程可用。
For 1.3.0 or later versions, type definitions used in TypeScript are available.
安装
You can install the pulsar-client
library via npm.
要求
Pulsar Node.js client library is based on the C++ client library. Follow these instructions and install the Pulsar C++ client library.
兼容性
Compatibility between each version of the Node.js client and the C++ client is as follows:
Node.js 客户端 | C++ client |
---|---|
1.0.0 | 2.3.0 及以上 |
1.1.0 | 2.4.0 or later |
1.2.0 | 2.5.0 or later |
If an incompatible version of the C++ client is installed, you may fail to build or run this library.
使用 npm 安装
Install the pulsar-client
library via npm:
$ npm install pulsar-client
Note
因为这个库使用
node-addon-api
模块来包装 C++ 库,因此只能在 Node.js 10.x 或更高版本中运行。
连接 URL
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
Pulsar protocol URLs are assigned to specific clusters, use the pulsar
scheme and have a default port of 6650. Here is an example for localhost
:
pulsar://localhost:6650
A URL for a production Pulsar cluster may look something like this:
pulsar://pulsar.us-west.example.com:6650
If you are using TLS encryption or TLS Authentication, the URL looks like this:
pulsar+ssl://pulsar.us-west.example.com:6651
创建客户端
In order to interact with Pulsar, you first need a client object. You can create a client instance using a new
operator and the Client
method, passing in a client options object (more on configuration below).
Here is an example:
const Pulsar = require('pulsar-client');
(async () => {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
await client.close();
})();
客户端配置
The following configurable parameters are available for Pulsar clients:
参数名 | 说明 | 默认值 |
---|---|---|
serviceUrl | The connection URL for the Pulsar cluster. 更多详细信息,参阅这里。 | |
authentication | Configure the authentication provider. (default: no authentication). 更多详细信息,参阅 TLS 认证。 | |
operationTimeoutSeconds | Node.js 客户端操作的超时时间(创建 producer、订阅/取消订阅 topic)。 Retries occur until this threshold is reached, at which point the operation fails. | 30 |
ioThreads | 用于处理 Pulsar broker 连接的线程数。 | 1 |
messageListenerThreads | consumer 和 readers 监听消息使用的线程数。 | 1 |
concurrentLookupRequest | The number of concurrent lookup requests that can be sent on each broker connection. Setting a maximum helps to keep from overloading brokers. 只有当客户端需要生产及/或订阅数千 Pulsar topic 时,才需要修改预设的默认值:50000 。 | 50000 |
tlsTrustCertsFilePath | 信任的 TLS 证书的文件路径。 | |
tlsValidateHostname | 设置是否启用 TLS 主机名验证。 | false |
tlsAllowInsecureConnection | 设置是否让 Pulsar 客户端从 broker 接受不信任的 TLS 证书。 | false |
statsIntervalInSeconds | 两次信息统计之间的时间间隔。 当 statsInterval 为正数时启用信息统计。 最小值为 1 秒。 | 600 |
log | A function that is used for logging. | console.log |
Producers
Pulsar producers publish messages to Pulsar topics. You can configure Node.js producers using a producer configuration object.
Here is an example:
const producer = await client.createProducer({
topic: 'my-topic',
});
await producer.send({
data: Buffer.from("Hello, Pulsar"),
});
await producer.close();
Promise 操作
When you create a new Pulsar producer, the operation returns
Promise
object and get producer instance or an error through executor function.
In this example, using await operator instead of executor function.
Producer operations
Pulsar Node.js producers have the following methods available:
Method | 说明 | Return type |
---|---|---|
send(Object) | Publishes a message to the producer’s topic. When the message is successfully acknowledged by the Pulsar broker, or an error is thrown, the Promise object whose result is the message ID runs executor function. | Promise<Object> |
flush() | 从发送队列发送消息给 Pulsar broker。 When the message is successfully acknowledged by the Pulsar broker, or an error is thrown, the Promise object runs executor function. | Promise<null> |
close() | Closes the producer and releases all resources allocated to it. Once close() is called, no more messages are accepted from the publisher. This method returns a Promise object. It runs the executor function when all pending publish requests are persisted by Pulsar. If an error is thrown, no pending writes are retried. | Promise<null> |
getProducerName() | Getter method of the producer name. | string |
getTopic() | Getter method of the name of the topic. | string |
Producer 配置
参数名 | 说明 | 默认值 |
---|---|---|
topic | The Pulsar topic to which the producer publishes messages. | |
producerName | A name for the producer. If you do not explicitly assign a name, Pulsar automatically generates a globally unique name. If you choose to explicitly assign a name, it needs to be unique across all Pulsar clusters, otherwise the creation operation throws an error. | |
sendTimeoutMs | When publishing a message to a topic, the producer waits for an acknowledgment from the responsible Pulsar broker. If a message is not acknowledged within the threshold set by this parameter, an error is thrown. If you set sendTimeoutMs to -1, the timeout is set to infinity (and thus removed). Removing the send timeout is recommended when using Pulsar’s message de-duplication feature. | 30000 |
initialSequenceId | 消息的初始序列号 Producer 在发送消息时,将序列号添加到消息中。 只要发送消息,序列号就会相应增加。 | |
maxPendingMessages | The maximum size of the queue holding pending messages (i.e. messages waiting to receive an acknowledgment from the broker). By default, when the queue is full all calls to the send method fails unless blockIfQueueFull is set to true . | 1000 |
maxPendingMessagesAcrossPartitions | 所有分区等待队列空间总和的最大值。 | 50000 |
blockIfQueueFull | If set to true , the producer’s send method waits when the outgoing message queue is full rather than failing and throwing an error (the size of that queue is dictated by the maxPendingMessages parameter); if set to false (the default), send operations fails and throw a error when the queue is full. | false |
messageRoutingMode | 消息路由逻辑(分区 topic 上的 producer 使用)。 This logic is applied only when no key is set on messages. 可以设置为循环(RoundRobinDistribution ),或是发布所有消息到单个分区 (UseSinglePartition ,默认值)。 | UseSinglePartition |
hashingScheme | The hashing function that determines the partition on which a particular message is published (partitioned topics only). 可以设置为 JavaStringHash (等同于 Java 中的 String.hashCode() )、Murmur3_32Hash (应用了 Murmur3 散列函数)、或 BoostHash (应用了 C++ 中 Boost 库的散列函数)。 | BoostHash |
compressionType | The message data compression type used by the producer. The available options are LZ4 , and Zlib , ZSTD, SNAPPY. | Compression None |
batchingEnabled | 如果设置为 true ,producer 将批量发送消息。 | true |
batchingMaxPublishDelayMs | 批量发送消息的最大延迟。 | 10 |
batchingMaxMessages | 批量发送消息时,消息大小的最大值。 | 1000 |
properties | Producer 的元数据。 |
生产者示例
This example creates a Node.js producer for the my-topic
topic and sends 10 messages to that topic:
const Pulsar = require('pulsar-client');
(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
// Create a producer
const producer = await client.createProducer({
topic: 'my-topic',
});
// Send messages
for (let i = 0; i < 10; i += 1) {
const msg = `my-message-${i}`;
producer.send({
data: Buffer.from(msg),
});
console.log(`Sent message: ${msg}`);
}
await producer.flush();
await producer.close();
await client.close();
})();
Consumers
Pulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can configure Node.js consumers using a consumer configuration object.
Here is an example:
const consumer = await client.subscribe({
topic: 'my-topic',
subscription: 'my-subscription',
});
const msg = await consumer.receive();
console.log(msg.getData().toString());
consumer.acknowledge(msg);
await consumer.close();
Promise 操作
When you create a new Pulsar consumer, the operation returns
Promise
object and get consumer instance or an error through executor function.
In this example, using await operator instead of executor function.
消费者操作
Pulsar Node.js consumers have the following methods available:
Method | 说明 | Return type |
---|---|---|
receive() | Receives a single message from the topic. 当消息可用时,Promise 对象运行 executor 函数并获取消息对象。 | Promise<Object> |
receive(Number) | 在指定的超时时间(以毫秒为单位)内从 topic 接收单条消息。 | Promise<Object> |
acknowledge(Object) | 通过消息对象向 Pulsar broker 确认收到一条消息。 | void |
acknowledgeId(Object) | 通过消息 ID 向 Pulsar broker 确认收到一条消息。 | void |
acknowledgeCumulative(Object) | 确认收到流中到指定消息为止的所有消息,包括指定消息。 The acknowledgeCumulative method returns void, and send the ack to the broker asynchronously. After that, the messages are not redelivered to the consumer. 共享订阅类型不支持累积确认。 | void |
acknowledgeCumulativeId(Object) | 确认收到流中到指定消息为止所有消息的 ID,包括指定消息的 ID。 | void |
negativeAcknowledge(Message) | Negatively acknowledges a message to the Pulsar broker by message object. | void |
negativeAcknowledgeId(MessageId) | Negatively acknowledges a message to the Pulsar broker by message ID object. | void |
close() | 关闭 consumer,使 consumer 不再从 broker 接收消息。 | Promise<null> |
unsubscribe() | Unsubscribes the subscription. | Promise<null> |
Consumer configuration
参数名 | 说明 | 默认值 |
---|---|---|
topic | The Pulsar topic on which the consumer establishes a subscription and listen for messages. | |
topics | The array of topics. | |
topicsPattern | The regular expression for topics. | |
subscription | Consumer 的订阅名称。 | |
subscriptionType | Available options are Exclusive , Shared , Key_Shared , and Failover . | Exclusive |
subscriptionInitialPosition | Initial position at which to set cursor when subscribing to a topic at first time. | SubscriptionInitialPosition.Latest |
ackTimeoutMs | 消息确认的超时时间(以毫秒为单位)。 | 0 |
nAckRedeliverTimeoutMs | Delay to wait before redelivering messages that failed to be processed. | 60000 |
receiverQueueSize | 设置 consumer 接收队列的大小,即在应用程序调用 receive 之前允许堆积的消息数。 A value higher than the default of 1000 could increase consumer throughput, though at the expense of more memory utilization. | 1000 |
receiverQueueSizeAcrossPartitions | Set the max total receiver queue size across partitions. This setting is used to reduce the receiver queue size for individual partitions if the total exceeds this value. | 50000 |
consumerName | Consumer 的名称。 当前(v2.4.1)版本中, 灾备模式按照 consumer 名称排序。 | |
properties | Consumer 的元数据。 | |
listener | A listener that is called for a message received. | |
readCompacted | If enabling readCompacted , a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal. readCompacted can only be enabled on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException . | false |
消费者示例
This example creates a Node.js consumer with the my-subscription
subscription on the my-topic
topic, receives messages, prints the content that arrive, and acknowledges each message to the Pulsar broker for 10 times:
const Pulsar = require('pulsar-client');
(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
// Create a consumer
const consumer = await client.subscribe({
topic: 'my-topic',
subscription: 'my-subscription',
subscriptionType: 'Exclusive',
});
// Receive messages
for (let i = 0; i < 10; i += 1) {
const msg = await consumer.receive();
console.log(msg.getData().toString());
consumer.acknowledge(msg);
}
await consumer.close();
await client.close();
})();
Instead a consumer can be created with listener
to process messages.
// Create a consumer
const consumer = await client.subscribe({
topic: 'my-topic',
subscription: 'my-subscription',
subscriptionType: 'Exclusive',
listener: (msg, msgConsumer) => {
console.log(msg.getData().toString());
msgConsumer.acknowledge(msg);
},
});
Reader
Pulsar readers process messages from Pulsar topics. Readers are different from consumers because with readers you need to explicitly specify which message in the stream you want to begin with (consumers, on the other hand, automatically begin with the most recently unacked message). You can configure Node.js readers using a reader configuration object.
Here is an example:
const reader = await client.createReader({
topic: 'my-topic',
startMessageId: Pulsar.MessageId.earliest(),
});
const msg = await reader.readNext();
console.log(msg.getData().toString());
await reader.close();
Reader operations
Pulsar Node.js readers have the following methods available:
Method | 说明 | Return type |
---|---|---|
readNext() | 接收 topic 中的下一条消息(类似于 consumer 中的 receive 方法)。 当消息可用时,Promise 对象运行 executor 函数并获取消息对象。 | Promise<Object> |
readNext(Number) | 在指定的超时时间(以毫秒为单位)内从 topic 接收单条消息。 | Promise<Object> |
hasNext() | 返回 broker 是否在目标 topic 中存有下一条消息。 | Boolean |
close() | 关闭 reader,使 reader 不再从 broker 接收消息。 | Promise<null> |
Reader configuration
参数名 | 说明 | 默认值 |
---|---|---|
topic | The Pulsar topic on which the reader establishes a subscription and listen for messages. | |
startMessageId | Reader 的初始位置,即 reader 处理的第一条消息的位置。 可以设置为 Pulsar.MessageId.earliest (topic 中最早的可用消息),或 Pulsar.MessageId.latest (topic 中最新的可用消息),或是任一其他消息 ID。 | |
receiverQueueSize | 设置 reader 接收队列的大小,即在应用程序调用 readNext 之前允许堆积的消息数。 A value higher than the default of 1000 could increase reader throughput, though at the expense of more memory utilization. | 1000 |
readerName | Reader 的名称。 | |
subscriptionRolePrefix | The subscription role prefix. | |
readCompacted | If enabling readCompacted , a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal. readCompacted can only be enabled on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException . | false |
Reader 示例
This example creates a Node.js reader with the my-topic
topic, reads messages, and prints the content that arrive for 10 times:
const Pulsar = require('pulsar-client');
(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
operationTimeoutSeconds: 30,
});
// Create a reader
const reader = await client.createReader({
topic: 'my-topic',
startMessageId: Pulsar.MessageId.earliest(),
});
// read messages
for (let i = 0; i < 10; i += 1) {
const msg = await reader.readNext();
console.log(msg.getData().toString());
}
await reader.close();
await client.close();
})();
消息
In Pulsar Node.js client, you have to construct producer message object for producer.
Here is an example message:
const msg = {
data: Buffer.from('Hello, Pulsar'),
partitionKey: 'key1',
properties: {
'foo': 'bar',
},
eventTimestamp: Date.now(),
replicationClusters: [
'cluster1',
'cluster2',
],
}
await producer.send(msg);
The following keys are available for producer message objects:
参数名 | 说明 | |
---|---|---|
data | 消息数据的实际有效负载。 | |
properties | 用于存储应用程序指定消息附带的所有元数据。 | |
eventTimestamp | 与消息相关联的时间戳。 | |
sequenceId | 消息的序列号。 | |
partitionKey | 与消息关联的可选键值(对于 topic 压缩等尤其有用)。 | |
replicationClusters | The clusters to which this message is replicated. Pulsar brokers handle message replication automatically; you should only change this setting if you want to override the broker default. | |
deliverAt | The absolute timestamp at or after which the message is delivered. | |
deliverAfter | The relative delay after which the message is delivered. |
消息对象操作
In Pulsar Node.js client, you can receive (or read) message object as consumer (or reader).
The message object have the following methods available:
Method | 说明 | Return type |
---|---|---|
getTopicName() | Topic 名称的 Getter 方法。 | String |
getProperties() | 属性的 Getter 方法。 | 数组<Object> |
getData() | 消息数据的 Getter方法。 | Buffer |
getMessageId() | 消息 id 对象的 Getter 方法。 | 对象 |
getPublishTimestamp() | 发布时间戳的 Getter 方法。 | 数值 |
getEventTimestamp() | 事件时间戳的 Getter 方法。 | 数值 |
getRedeliveryCount() | Getter method of redelivery count. | 数值 |
getPartitionKey() | 分区键的 Getter 方法。 | String |
消息 ID 对象操作
In Pulsar Node.js client, you can get message id object from message object.
The message id object have the following methods available:
Method | 说明 | Return type |
---|---|---|
serialize() | 序列化消息 id 到缓存中进行存储。 | Buffer |
toString() | 获取消息 id 字符串。 | String |
The client has static method of message id object. You can access it as Pulsar.MessageId.someStaticMethod
too.
The following static methods are available for the message id object:
Method | 说明 | Return type |
---|---|---|
earliest() | MessageId 表示存储在 topic 中最早/最旧的可用消息。 | 对象 |
latest() | MessageId 表示存储在 topic 中最晚/最新的可用消息。 | 对象 |
deserialize(Buffer) | 从缓存中反序列化出消息 id。 | 对象 |
End-to-end encryption
End-to-end encryption allows applications to encrypt messages at producers and decrypt at consumers.
Configuration
If you want to use the end-to-end encryption feature in the Node.js client, you need to configure publicKeyPath
and privateKeyPath
for both producer and consumer.
publicKeyPath: "./public.pem"
privateKeyPath: "./private.pem"
教程
This section provides step-by-step instructions on how to use the end-to-end encryption feature in the Node.js client.
前提条件
- Pulsar C++ client 2.7.1 or later
步骤
Create both public and private key pairs.
Input
openssl genrsa -out private.pem 2048
openssl rsa -in private.pem -pubout -out public.pem
Create a producer to send encrypted messages.
Input
const Pulsar = require('pulsar-client');
(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
operationTimeoutSeconds: 30,
});
// Create a producer
const producer = await client.createProducer({
topic: 'persistent://public/default/my-topic',
sendTimeoutMs: 30000,
batchingEnabled: true,
publicKeyPath: "./public.pem",
privateKeyPath: "./private.pem",
encryptionKey: "encryption-key"
});
console.log(producer.ProducerConfig)
// Send messages
for (let i = 0; i < 10; i += 1) {
const msg = `my-message-${i}`;
producer.send({
data: Buffer.from(msg),
});
console.log(`Sent message: ${msg}`);
}
await producer.flush();
await producer.close();
await client.close();
})();
Create a consumer to receive encrypted messages.
Input
const Pulsar = require('pulsar-client');
(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://172.25.0.3:6650',
operationTimeoutSeconds: 30
});
// Create a consumer
const consumer = await client.subscribe({
topic: 'persistent://public/default/my-topic',
subscription: 'sub1',
subscriptionType: 'Shared',
ackTimeoutMs: 10000,
publicKeyPath: "./public.pem",
privateKeyPath: "./private.pem"
});
console.log(consumer)
// Receive messages
for (let i = 0; i < 10; i += 1) {
const msg = await consumer.receive();
console.log(msg.getData().toString());
consumer.acknowledge(msg);
}
await consumer.close();
await client.close();
})();
Run the consumer to receive encrypted messages.
Input
node consumer.js
In a new terminal tab, run the producer to produce encrypted messages.
Input
node producer.js
Now you can see the producer sends messages and the consumer receives messages successfully.
输出
This is from the producer side.
Sent message: my-message-0
Sent message: my-message-1
Sent message: my-message-2
Sent message: my-message-3
Sent message: my-message-4
Sent message: my-message-5
Sent message: my-message-6
Sent message: my-message-7
Sent message: my-message-8
Sent message: my-message-9
This is from the consumer side.
my-message-0
my-message-1
my-message-2
my-message-3
my-message-4
my-message-5
my-message-6
my-message-7
my-message-8
my-message-9