Pulsar WebSocket API
通过 Pulsar 的WebSocket API,用户可以简单便捷地与 Pulsar 进行交互,WebSocket API 不依赖于官方客户端库。 通过 WebSocket ,您可以发布和消费消息,并使用在 客户端特征矩阵 页面上可用的功能。
You can use Pulsar WebSocket API with any WebSocket client library. See examples for Python and Node.js below.
运行 WebSocket 服务
推荐使用单机模式的 Pulsar 进行开发,在本地开发环境中启用 WebSocket 服务。
在非单机模式下,有两种方法可以部署 WebSocket 服务:
嵌入 Pulsar Broker
在这种模式下,WebSocket 服务会使用已经在 broker 中运行的 HTTP 服务。 要启用此模式,需在安装目录下的 conf/broker.conf 文件中设置webSocketServiceEnabled 参数。
webSocketServiceEnabled=true
作为一个独立的组件
在这种模式下,WebSocket 会作为单独的服务在 Pulsar broker 上运行。 运行此模式,需在 conf/websocket.conf 文件中进行配置。 You’ll need to set at least the following parameters:
下面是一个示例:
configurationStoreServers=zk1:2181,zk2:2181,zk3:2181
webServicePort=8080
clusterName=my-cluster
安全设置
在 WebSocket 服务上启用 TLS 加密:
tlsEnabled=true
tlsCertificateFilePath=/path/to/broker.cert.pem
tlsKeyFilePath=/path/to/broker.key-pk8.pem
tlsTrustCertsFilePath=/path/to/ca.cert.pem
启动 Broker
配置完成后,你可以使用 pulsar-daemon 命令来启动服务:
$ bin/pulsar-daemon start websocket
API 手册:
Pulsar 的 WebSocket API 提供三个端点,用于生产消息、消费消息和阅读消息。
所有通过 WebSocket API 的数据都使用 JSON 进行交互。
认证
Browser javascript WebSocket client
使用查询参数 token
传送身份验证令牌。
ws://broker-service-url:8080/path?token=token
Producer 端
Producer 端需要在 URL 中指定租户、命名空间和 topic,例如:
ws://broker-service-url:8080/ws/v2/producer/persistent/:tenant/:namespace/:topic
查询参数
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
sendTimeoutMillis | long | 否 | 发送超时(默认值:30秒) |
batchingEnabled | boolean | 否 | 启用批量缓存消息(默认值:false) |
batchingMaxMessages | int | 否 | 批量消息数最大值(默认值:1000) |
maxPendingMessages | int | 否 | 设置消息内部队列的最大值(默认值:1000) |
batchingMaxPublishDelay | long | 否 | 批量处理消息的时间(默认:10毫秒) |
messageRoutingMode | string | 否 | 分区 producer 的消息路由模式:SingleSection 或 RoundRobinSection |
compressionType | string | 否 | 压缩类型:LZ4 或 ZLIB |
producerName | string | 否 | Specify the name for the producer. Pulsar will enforce only one producer with same name can be publishing on a topic |
initialSequenceId | long | 否 | 设置 producer 发布消息序列 id 的标准。 |
hashingScheme | string | 否 | 在分区 topic 上发布消息时使用的哈希函数:JavaStringHash 或 Murmur3_32Hash |
token | string | 否 | 身份验证令牌,这用于浏览器的 javascript 客户端 |
发布消息
{
"payload": "SGVsbG8gV29ybGQ=",
"properties": {"key1": "value1", "key2": "value2"},
"context": "1"
}
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
payload | string | 是 | Base-64 编码的负载 |
properties | 键值对 | 否 | 应用程序定义的属性 |
context | string | 否 | 应用程序定义的请求标识符 |
key | string | 否 | 分区 topic 中使用的分区 |
replicationClusters | 数组 | 否 | 根据名称允许添加到集群列表的副本 |
响应成功示例
{
"result": "ok",
"messageId": "CAAQAw==",
"context": "1"
}
响应失败示例
{
"result": "send-error:3",
"errorMsg": "Failed to de-serialize from JSON",
"context": "1"
}
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
result | string | 是 | 发送成功则为 ok ,否则抛出异常 |
messageId | string | 是 | 已发布消息的 Message ID |
context | string | 否 | 应用程序定义的请求标识符 |
Consumer 端
Concumer 端要求在 URL 中指定租户、命名空间、topic 和订阅:
ws://broker-service-url:8080/ws/v2/consumer/persistent/:tenant/:namespace/:topic/:subscription
查询参数
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
ackTimeoutMillis | long | 否 | 设置未完成消息确认的超时时间(默认值:0) |
subscriptionType | string | 否 | 订阅类型:独占 、灾备 、共享 、key共享 |
receiverQueueSize | int | 否 | Consumer 接收队列的大小(默认:1000) |
consumerName | string | 否 | Consumer 的名称 |
priorityLevel | int | 否 | 指定 consumer 的优先级 |
maxRedeliverCount | int | 否 | 为 consumer 指定 maxRedeliverCount(默认值:0)。 启用 Dead Letter Topic。 |
deadLetterTopic | string | 否 | 为 consumer 指定 deadLetterTopic(默认值:{topic}-{subscription}-DLQ)。 启用 Dead Letter Topic。 |
pullMode | boolean | 否 | Enable pull mode (default: false). See “Flow Control” below. |
negativeAckRedeliveryDelay | int | 否 | When a message is negatively acknowledged, it will be redelivered to the DLQ. |
token | string | 否 | 身份验证令牌,这用于浏览器的 javascript 客户端 |
注意:以上参数(pullMode
除外)适用于 WebSocket 服务的内部 consumer。 因此,即使客户端没有在 WebSocket 上消费,只要消息进入接收队列,就会受到传递设置的约束。
接收消息
Server will push messages on the WebSocket session:
{
"messageId": "CAAQAw==",
"payload": "SGVsbG8gV29ybGQ=",
"properties": {"key1": "value1", "key2": "value2"},
"publishTime": "2016-08-30 16:45:57.785",
"redeliveryCount": 4
}
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
messageId | string | 是 | 消息 ID |
payload | string | 是 | Base-64 编码的负载 |
publishTime | string | 是 | 发布时间戳 |
redeliveryCount | number | 是 | 此消息已经发送的次数 |
properties | 键值对 | 否 | 应用程序定义的属性 |
key | string | 否 | Producer 设置的原始路由密钥 |
ACK 确认消息
Consumer needs to acknowledge the successful processing of the message to have the Pulsar broker delete it.
{
"messageId": "CAAQAw=="
}
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
messageId | string | 是 | 处理消息的消息ID |
ACK 否认消息
{
"type": "negativeAcknowledge",
"messageId": "CAAQAw=="
}
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
messageId | string | 是 | 处理消息的消息ID |
流量控制
推送模式
默认情况下(pullMode=false
),consumer 端使用 receiverQueueSize
参数设置内部接收队列的大小,并限制传递到 WebSocket 客户端的未确认消息数。 在这种模式下,如果不发送消息确认,发送到 WebSocket 客户端的消息达到 receiverQueueSize
时,Pulsar WebSocket 将停止发送消息。
拉取模式
如果设置 pullMode
为 true
,则 WebSocket 客户端需要使用 permit
命令允许 Pulsar WebSocket 服务发送更多消息。
{
"type": "permit",
"permitMessages": 100
}
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
type | string | 是 | Type of command. Must be permit |
permitMessages | int | 是 | 允许的消息数量 |
注意:在这种模式下,可以在不同的连接中确认消息。
检查是否到达主题的结尾
消费者可以通过发送 isEndOfTopic
请求来检查它是否已经到达主题的结尾。
Request
{
"type": "isEndOfTopic"
}
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
type | string | 是 | Type of command. Must be isEndOfTopic |
Response
{
"endOfTopic": "true/false"
}
Reader 端
The reader endpoint requires you to specify a tenant, namespace, and topic in the URL:
ws://broker-service-url:8080/ws/v2/reader/persistent/:tenant/:namespace/:topic
查询参数
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
readerName | string | 否 | Reader name |
receiverQueueSize | int | 否 | Consumer 接收队列的大小(默认:1000) |
messageId | int or enum | 否 | Message ID to start from, earliest or latest (default: latest ) |
token | string | 否 | 身份验证令牌,这用于浏览器的 javascript 客户端 |
接收消息
Server will push messages on the WebSocket session:
{
"messageId": "CAAQAw==",
"payload": "SGVsbG8gV29ybGQ=",
"properties": {"key1": "value1", "key2": "value2"},
"publishTime": "2016-08-30 16:45:57.785",
"redeliveryCount": 4
}
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
messageId | string | 是 | 消息 ID |
payload | string | 是 | Base-64 编码的负载 |
publishTime | string | 是 | 发布时间戳 |
redeliveryCount | number | 是 | 此消息已经发送的次数 |
properties | 键值对 | 否 | 应用程序定义的属性 |
key | string | 否 | Producer 设置的原始路由密钥 |
ACK 确认消息
In WebSocket, Reader needs to acknowledge the successful processing of the message to have the Pulsar WebSocket service update the number of pending messages. If you don’t send acknowledgements, Pulsar WebSocket service will stop sending messages after reaching the pendingMessages limit.
{
"messageId": "CAAQAw=="
}
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
messageId | string | 是 | 处理消息的消息ID |
检查是否到达主题的结尾
消费者可以通过发送 isEndOfTopic
请求来检查它是否已经到达主题的结尾。
Request
{
"type": "isEndOfTopic"
}
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
type | string | 是 | Type of command. Must be isEndOfTopic |
Response
{
"endOfTopic": "true/false"
}
错误代码
In case of error the server will close the WebSocket session using the following error codes:
Error Code | Error Message |
---|---|
1 | Failed to create producer |
2 | Failed to subscribe |
3 | Failed to deserialize from JSON |
4 | Failed to serialize to JSON |
5 | Failed to authenticate client |
6 | Client is not authorized |
7 | Invalid payload encoding |
8 | Unknown error |
应用程序负责在后台重新建立 WebSocket 连接。
客户端示例
Below you’ll find code examples for the Pulsar WebSocket API in Python and Node.js.
Python
This example uses the websocket-client package. You can install it using pip:
$ pip install websocket-client
You can also download it from PyPI.
Python producer
Here’s an example Python producer that sends a simple message to a Pulsar topic:
#!/usr/bin/python
#coding=utf-8
import websocket, base64, json
# If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
enable_TLS = False
scheme = 'ws'
if enable_TLS:
scheme = 'wss'
TOPIC = scheme + '://localhost:8080/ws/v2/producer/persistent/public/default/my-topic'
ws = websocket.create_connection(TOPIC)
# Send one message as JSON
ws.send(json.dumps({
'payload' : base64.b64encode('Hello World'),
'properties': {
'key1' : 'value1',
'key2' : 'value2'
},
'context' : 5
}))
response = json.loads(ws.recv())
if response['result'] == 'ok':
print 'Message published successfully'
else:
print 'Failed to publish message:', response
ws.close()
Python consumer
Here’s an example Python consumer that listens on a Pulsar topic and prints the message ID whenever a message arrives:
#!/usr/bin/python
#coding=utf-8
import websocket, base64, json
# If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
enable_TLS = False
scheme = 'ws'
if enable_TLS:
scheme = 'wss'
TOPIC = scheme + '://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub'
ws = websocket.create_connection(TOPIC)
while True:
msg = json.loads(ws.recv())
if not msg: break
print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload']))
# Acknowledge successful processing
ws.send(json.dumps({'messageId' : msg['messageId']}))
ws.close()
Python reader
Here’s an example Python reader that listens on a Pulsar topic and prints the message ID whenever a message arrives:
#!/usr/bin/python
#coding=utf-8
import websocket, base64, json
# If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
enable_TLS = False
scheme = 'ws'
if enable_TLS:
scheme = 'wss'
TOPIC = scheme + '://localhost:8080/ws/v2/reader/persistent/public/default/my-topic'
ws = websocket.create_connection(TOPIC)
while True:
msg = json.loads(ws.recv())
if not msg: break
print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload']))
# Acknowledge successful processing
ws.send(json.dumps({'messageId' : msg['messageId']}))
ws.close()
Node.js
This example uses the ws package. You can install it using npm:
$ npm install ws
Node.js producer
Here’s an example Node.js producer that sends a simple message to a Pulsar topic:
const WebSocket = require('ws');
// 如果设置启用TLS 为 true,您必须设置 tls 启用为 conf/websocket.conf。
const enableTLS = false;
const topic = `${enableTLS ? 'wss' : 'ws'}://localhost:8080/ws/v2/producer/persistent/public/default/my-topic`;
const ws = new WebSocket(topic);
var message = {
"payload" : new Buffer("Hello World").toString('base64'),
"properties": {
"key1" : "value1",
"key2" : "value2"
},
"context" : "1"
};
ws.on('open', function() {
// Send one message
ws.send(JSON.stringify(message));
});
ws.on('message', function(message) {
console.log('received ack: %s', message);
});
Node.js consumer
Here’s an example Node.js consumer that listens on the same topic used by the producer above:
const WebSocket = require('ws');
// 如果设置启用TLS 为 true,您必须设置 tls 启用为 conf/websocket.conf。
const enableTLS = false;
const topic = `${enableTLS ? 'wss' : 'ws'}://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub`;
const ws = new WebSocket(topic);
ws.on('message', function(message) {
var receiveMsg = JSON.parse(message);
console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
var ackMsg = {"messageId" : receiveMsg.messageId};
ws.send(JSON.stringify(ackMsg));
});
NodeJS reader
const WebSocket = require('ws');
// 如果设置启用TLS 为 true,您必须设置 tls 启用为 conf/websocket.conf。
const enableTLS = false;
const topic = `${enableTLS ? 'wss' : 'ws'}://localhost:8080/ws/v2/reader/persistent/public/default/my-topic`;
const ws = new WebSocket(topic);
ws.on('message', function(message) {
var receiveMsg = JSON.parse(message);
console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
var ackMsg = {"messageId" : receiveMsg.messageId};
ws.send(JSON.stringify(ackMsg));
});