持久化有助于访问 topic,因为 topic 是发布和消费消息的逻辑端点。 Producer 发布消息到 topic,consumer 订阅 topic,并消费该 topic 上的消息。
以下说明和命令中所使用 topic 的名称结构如下:
persistent://tenant/namespace/topic
持久 topic 资源
列出 topic
以列表的形式列出指定命名空间下所有持久 topic。
pulsar-admin
可以使用 list
命令获取 topic 列表。
$ pulsar-admin persistent list \
my-tenant/my-namespace
REST API
GET /admin/v2/persistent/:tenant/:namespace
Java
String namespace = "my-tenant/my-namespace";
admin.persistentTopics().getList(namespace);
授权
授权给客户端用户,允许其在指定的 topic 上执行某些操作。
pulsar-admin
可以使用 grant-permission
命令授权。
$ pulsar-admin persistent grant-permission \
--actions produce,consume --role application1 \
persistent://test-tenant/ns1/tp1 \
REST API
POST /admin/v2/persistent/:tenant/:namespace/:topic/permissions/:role
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
String role = "test-role";
Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce, AuthAction.consume);
admin.persistentTopics().grantPermission(topic, role, actions);
获取权限
可以使用 permissions
命令获取权限。
pulsar-admin
$ pulsar-admin persistent permissions \
persistent://test-tenant/ns1/tp1 \
{
"application1": [
"consume",
"produce"
]
}
REST API
GET /admin/v2/persistent/:tenant/:namespace/:topic/permissions
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().getPermissions(topic);
取消权限
取消已经授予客户端用户的权限。
pulsar-admin
可以使用 revoke-permission
命令取消权限。
$ pulsar-admin persistent revoke-permission \
--role application1 \
persistent://test-tenant/ns1/tp1 \
{
"application1": [
"consume",
"produce"
]
}
REST API
DELETE /admin/v2/persistent/:tenant/:namespace/:topic/permissions/:role
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
String role = "test-role";
admin.persistentTopics().revokePermissions(topic, role);
删除 topic
It deletes a topic. The topic cannot be deleted if there’s any active subscription or producers connected to it.
pulsar-admin
可以使用 delete
命令删除 topic。
$ pulsar-admin persistent delete \
persistent://test-tenant/ns1/tp1 \
REST API
DELETE /admin/v2/persistent/:tenant/:namespace/:topic
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().delete(topic);
卸载 topic
可以卸载 topic。
pulsar-admin
可以使用 unload
命令卸载 topic。
$ pulsar-admin persistent unload \
persistent://test-tenant/ns1/tp1 \
REST API
PUT /admin/v2/persistent/:tenant/:namespace/:topic/unload
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().unload(topic);
获取统计信息
获取非持久化 topic 的统计数据。
msgRateIn: The sum of all local and replication publishers’ publish rates in messages per second
msgThroughputIn: Same as above, but in bytes per second instead of messages per second
msgRateOut: The sum of all local and replication consumers’ dispatch rates in messages per second
msgThroughputOut: Same as above, but in bytes per second instead of messages per second
averageMsgSize: The average size in bytes of messages published within the last interval
storageSize: The sum of the ledgers’ storage size for this topic. Space used to store the messages for the topic
publishers: The list of all local publishers into the topic. There can be zero or thousands
msgRateIn: Total rate of messages published by this publisher in messages per second
msgThroughputIn: Total throughput of the messages published by this publisher in bytes per second
averageMsgSize: Average message size in bytes from this publisher within the last interval
producerId: Internal identifier for this producer on this topic
producerName: Internal identifier for this producer, generated by the client library
address: IP address and source port for the connection of this producer
connectedSince: Timestamp this producer was created or last reconnected
subscriptions: The list of all local subscriptions to the topic
my-subscription: The name of this subscription (client defined)
msgRateOut: Total rate of messages delivered on this subscription (msg/s)
msgThroughputOut: Total throughput delivered on this subscription (bytes/s)
msgBacklog: Number of messages in the subscription backlog
type: This subscription type
msgRateExpired: The rate at which messages were discarded instead of dispatched from this subscription due to TTL
lastExpireTimestamp: The last message expire execution timestamp
lastConsumedFlowTimestamp: The last flow command received timestamp
lastConsumedTimestamp: The latest timestamp of all the consumed timestamp of the consumers
lastAckedTimestamp: The latest timestamp of all the acked timestamp of the consumers
consumers: The list of connected consumers for this subscription
msgRateOut: Total rate of messages delivered to the consumer (msg/s)
msgThroughputOut: Total throughput delivered to the consumer (bytes/s)
consumerName: Internal identifier for this consumer, generated by the client library
availablePermits: The number of messages this consumer has space for in the client library’s listen queue. 值为0意味着客户端类库的队列已经满了,receive()不再被调用。 非零值意味着 consumer 可以接收消息。
unackedMessages: Number of unacknowledged messages for the consumer
blockedConsumerOnUnackedMsgs: Flag to verify if the consumer is blocked due to reaching threshold of unacked messages
lastConsumedTimestamp: The timestamp of the consumer last consume a message
lastAckedTimestamp: The timestamp of the consumer last ack a message
replication: This section gives the stats for cross-colo replication of this topic
msgRateIn: Total rate of messages received from the remote cluster (msg/s)
msgThroughputIn: Total throughput received from the remote cluster (bytes/s)
msgRateOut: Total rate of messages delivered to the replication-subscriber (msg/s)
msgThroughputOut: Total through delivered to the replication-subscriber (bytes/s)
msgRateExpired: Total rate of messages expired (msg/s)
replicationBacklog: Number of messages pending to be replicated to remote cluster
connected: Whether the outbound replicator is connected
replicationDelayInSeconds: How long the oldest message has been waiting to be sent through the connection, if connected is true
inboundConnection: The IP and port of the broker in the remote cluster’s publisher connection to this broker
inboundConnectedSince: The TCP connection being used to publish messages to the remote cluster. 如果没有连接到本地发布者,一分钟后连接将自动关闭。
outboundConnection: Address of outbound replication connection
outboundConnectedSince: Timestamp of establishing outbound connection
{
"msgRateIn": 4641.528542257553,
"msgThroughputIn": 44663039.74947473,
"msgRateOut": 0,
"msgThroughputOut": 0,
"averageMsgSize": 1232439.816728665,
"storageSize": 135532389160,
"publishers": [
{
"msgRateIn": 57.855383881403576,
"msgThroughputIn": 558994.7078932219,
"averageMsgSize": 613135,
"producerId": 0,
"producerName": null,
"address": null,
"connectedSince": null
}
],
"subscriptions": {
"my-topic_subscription": {
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgBacklog": 116632,
"type": null,
"msgRateExpired": 36.98245516804671,
"consumers": []
}
},
"replication": {}
}
pulsar-admin
可以使用 stats
命令获取 topic 的统计信息。
$ pulsar-admin persistent stats \
persistent://test-tenant/ns1/tp1 \
REST API
GET /admin/v2/persistent/:tenant/:namespace/:topic/stats
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().getStats(topic);
获取内部统计信息
获取 topic 的详细统计信息。
entriesAddedCounter: Messages published since this broker loaded this topic
numberOfEntries: Total number of messages being tracked
totalSize: Total storage size in bytes of all messages
currentLedgerEntries: Count of messages written to the ledger currently open for writing
currentLedgerSize: Size in bytes of messages written to ledger currently open for writing
lastLedgerCreatedTimestamp: time when last ledger was created
lastLedgerCreationFailureTimestamp: time when last ledger was failed
waitingCursorsCount: How many cursors are “caught up” and waiting for a new message to be published
pendingAddEntriesCount: How many messages have (asynchronous) write requests we are waiting on completion
lastConfirmedEntry: The ledgerid:entryid of the last message successfully written. 如果 entryid 为 -1,则 ledger 已经允许写入或正在开放写入权限,但还没有写入 entry。
state: The state of this ledger for writing. LedgerOpened means we have a ledger open for saving published messages.
ledgers: The ordered list of all ledgers for this topic holding its messages
ledgerId: Id of this ledger
entries: Total number of entries belong to this ledger
size: Size of messages written to this ledger (in bytes)
offloaded: Whether this ledger is offloaded
cursors: The list of all cursors on this topic. There will be one for every subscription you saw in the topic stats.
markDeletePosition: All of messages before the markDeletePosition are acknowledged by the subscriber.
readPosition: The latest position of subscriber for reading message
waitingReadOp: This is true when the subscription has read the latest message published to the topic and is waiting on new messages to be published.
pendingReadOps: The counter for how many outstanding read requests to the BookKeepers we have in progress
messagesConsumedCounter: Number of messages this cursor has acked since this broker loaded this topic
cursorLedger: The ledger being used to persistently store the current markDeletePosition
cursorLedgerLastEntry: The last entryid used to persistently store the current markDeletePosition
individuallyDeletedMessages: If Acks are being done out of order, shows the ranges of messages Acked between the markDeletePosition and the read-position
lastLedgerSwitchTimestamp: The last time the cursor ledger was rolled over
state: The state of the cursor ledger: Open means we have a cursor ledger for saving updates of the markDeletePosition.
{
"entriesAddedCounter": 20449518,
"numberOfEntries": 3233,
"totalSize": 331482,
"currentLedgerEntries": 3233,
"currentLedgerSize": 331482,
"lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
"lastLedgerCreationFailureTimestamp": null,
"waitingCursorsCount": 1,
"pendingAddEntriesCount": 0,
"lastConfirmedEntry": "324711539:3232",
"state": "LedgerOpened",
"ledgers": [
{
"ledgerId": 324711539,
"entries": 0,
"size": 0
}
],
"cursors": {
"my-subscription": {
"markDeletePosition": "324711539:3133",
"readPosition": "324711539:3233",
"waitingReadOp": true,
"pendingReadOps": 0,
"messagesConsumedCounter": 20449501,
"cursorLedger": 324702104,
"cursorLedgerLastEntry": 21,
"individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
"lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
"state": "Open"
}
}
}
pulsar-admin
可以使用 stats-internal
命令获取 topic 中数据的统计信息。
$ pulsar-admin persistent stats-internal \
persistent://test-tenant/ns1/tp1 \
REST API
GET /admin/v2/persistent/:tenant/:namespace/:topic/internalStats
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().getInternalStats(topic);
查看消息
查看指定 topic 中某个订阅的 N 条消息。
pulsar-admin
$ pulsar-admin persistent peek-messages \
--count 10 --subscription my-subscription \
persistent://test-tenant/ns1/tp1 \
Message ID: 315674752:0
Properties: { "X-Pulsar-publish-time" : "2015-07-13 17:40:28.451" }
msg-payload
REST API
GET /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/position/:messagePosition
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
String subName = "my-subscription";
int numMessages = 1;
admin.persistentTopics().peekMessages(topic, subName, numMessages);
Get message by ID
It fetches the message with given ledger id and entry id.
pulsar-admin
$ ./bin/pulsar-admin topics get-message-by-id \
persistent://public/default/my-topic \
-l 10 -e 0
REST API
GET /admin/v2/persistent/:tenant/:namespace/:topic/ledger/:ledgerId/entry/:entryId
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
long ledgerId = 10;
long entryId = 10;
admin.persistentTopics().getMessageById(topic, ledgerId, entryId);
跳过消息
某个订阅跳过指定topic的N条消息。
pulsar-admin
$ pulsar-admin persistent skip \
--count 10 --subscription my-subscription \
persistent://test-tenant/ns1/tp1 \
REST API
POST /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/skip/:numMessages
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
String subName = "my-subscription";
int numMessages = 1;
admin.persistentTopics().skipMessages(topic, subName, numMessages);
跳过所有消息
某个订阅跳过指定topic的所有消息
pulsar-admin
$ pulsar-admin persistent skip-all \
--subscription my-subscription \
persistent://test-tenant/ns1/tp1 \
REST API
POST /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/skip_all
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
String subName = "my-subscription";
admin.persistentTopics().skipAllMessages(topic, subName);
重置cursor
重置订阅的cursor位置回到X分钟之前被记录的位置。 实际上通过计算时间和X分钟之前的cursor位置,来重置回到那个位置。
pulsar-admin
$ pulsar-admin persistent reset-cursor \
--subscription my-subscription --time 10 \
persistent://test-tenant/ns1/tp1 \
REST API
POST /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/resetcursor/:timestamp
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
String subName = "my-subscription";
long timestamp = 2342343L;
admin.persistentTopics().skipAllMessages(topic, subName, timestamp);
查询topic
定位正在服务于指定topic的broker
pulsar-admin
$ pulsar-admin persistent lookup \
persistent://test-tenant/ns1/tp1 \
"pulsar://broker1.org.com:4480"
REST API
GET /lookup/v2/topic/persistent/:tenant:namespace/:topic
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.lookup().lookupDestination(topic);
获取bundle
给出包含指定topic的bundle范围。
pulsar-admin
$ pulsar-admin persistent bundle-range \
persistent://test-tenant/ns1/tp1 \
"0x00000000_0xffffffff"
REST API
GET /lookup/v2/topic/:topic_domain/:tenant/:namespace/:topic/bundle
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.lookup().getBundleRange(topic);
获取订阅
给出了指定topic的所有订阅的名称。
pulsar-admin
$ pulsar-admin persistent subscriptions \
persistent://test-tenant/ns1/tp1 \
my-subscription
REST API
GET /admin/v2/persistent/:tenant/:namespace/:topic/subscriptions
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().getSubscriptions(topic);
取消订阅
当不再处理更多消息时,可以取消订阅
pulsar-admin
$ pulsar-admin persistent unsubscribe \
--subscription my-subscription \
persistent://test-tenant/ns1/tp1 \
REST API
DELETE /admin/v2/namespaces/:tenant/:namespace/:topic/subscription/:subscription
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
String subscriptionName = "my-subscription";
admin.persistentTopics().deleteSubscription(topic, subscriptionName);
最后一条消息Id
给出提交到持久topic的最后一条消息ID,将在2.3.0中提供此特性。
pulsar-admin topics last-message-id topic-name
REST API
{% endpoint Get /admin/v2/persistent/:tenant/:namespace/:topic/lastMessageId %}
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().getLastMessage(topic);