持久化有助于访问 topic,因为 topic 是发布和消费消息的逻辑端点。 Producer 发布消息到 topic,consumer 订阅 topic,并消费该 topic 上的消息。
以下说明和命令中所使用 topic 的名称结构如下:
persistent://tenant/namespace/topic
持久 topic 资源
列出 topic
以列表的形式列出指定命名空间下所有持久 topic。
pulsar-admin
可以使用 list
命令获取 topic 列表。
$ pulsar-admin persistent list \
my-tenant/my-namespace
REST API
GET /admin/v2/persistent/:tenant/:namespace
Java
String namespace = "my-tenant/my-namespace";
admin.persistentTopics().getList(namespace);
授权
授权给客户端用户,允许其在指定的 topic 上执行某些操作。
pulsar-admin
可以使用 grant-permission
命令授权。
$ pulsar-admin persistent grant-permission \
--actions produce,consume --role application1 \
persistent://test-tenant/ns1/tp1 \
REST API
POST /admin/v2/persistent/:tenant/:namespace/:topic/permissions/:role
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
String role = "test-role";
Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce, AuthAction.consume);
admin.persistentTopics().grantPermission(topic, role, actions);
获取权限
可以使用 permissions
命令获取权限。
pulsar-admin
$ pulsar-admin persistent permissions \
persistent://test-tenant/ns1/tp1 \
{
"application1": [
"consume",
"produce"
]
}
REST API
GET /admin/v2/persistent/:tenant/:namespace/:topic/permissions
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().getPermissions(topic);
取消权限
取消已经授予客户端用户的权限。
pulsar-admin
可以使用 revoke-permission
命令取消权限。
$ pulsar-admin persistent revoke-permission \
--role application1 \
persistent://test-tenant/ns1/tp1 \
{
"application1": [
"consume",
"produce"
]
}
REST API
DELETE /admin/v2/persistent/:tenant/:namespace/:topic/permissions/:role
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
String role = "test-role";
admin.persistentTopics().revokePermissions(topic, role);
删除 topic
It deletes a topic. The topic cannot be deleted if there’s any active subscription or producers connected to it.
pulsar-admin
可以使用 delete
命令删除 topic。
$ pulsar-admin persistent delete \
persistent://test-tenant/ns1/tp1 \
REST API
DELETE /admin/v2/persistent/:tenant/:namespace/:topic
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().delete(topic);
卸载 topic
可以卸载 topic。
pulsar-admin
可以使用 unload
命令卸载 topic。
$ pulsar-admin persistent unload \
persistent://test-tenant/ns1/tp1 \
REST API
PUT /admin/v2/persistent/:tenant/:namespace/:topic/unload
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().unload(topic);
获取统计信息
获取非持久化 topic 的统计数据。
msgRateIn:本地发布者和副本发布者每秒发布消息速率和
msgThroughputIn:同上,但不是每秒发布消息数,而是每秒发布消息的字节数
msgRateOut:本地 consumer 和副本 consumer 每秒分发消息数量和
msgThroughputOut:同上,但不是每秒分发消息数,而是每秒分发消息的字节数
averageMsgSize:最后一次间隔内发布消息的平均字节大小
storageSize: The sum of the ledgers’ storage size for this topic. See
publishers: The list of all local publishers into the topic. There can be zero or thousands
averageMsgSize:此发布者在最后一个间隔内发布消息的平均字节大小
producerId:此 topic 内 producer 的内部标识符
producerName:客户端库为 producer 生成的内部标识符
address:连接到 producer 的 IP 地址和 source 端口
connectedSince:Producer 创建时或上次连接到此 producer 时的时间戳
subscriptions:Topic 所有本地订阅的列表
my-subscription:订阅名称(客户端定义)
msgBacklog: 本订阅在backlog中的消息数量
type:订阅类型
msgRateExpired: 由于TTL,此订阅下没有被发送而是被丢弃的比例。
consumers:连接到此订阅的 consumer 列表
consumerName:客户端库为 consumer 生成的内部标识符
availablePermits:Consumer 在客户端库监听队列中可以容纳的消息数量。 值为0意味着客户端类库的队列已经满了,receive()不再被调用。 非零值意味着 consumer 可以接收消息。
replication:Topic 交叉副本的统计信息。
replicationBacklog: 消息对外复制的backlog
connected:外部复制器是否已经连接
replicationDelayInSeconds:如果已连接,最老的消息已经等待被发送的时长。
inboundConnection:连接到此 broker 的远程集群发布者 broker 的 IP 和端口。
inboundConnectedSince:正在发布消息到远程集群的 TCP 连接。 如果没有连接到本地发布者,一分钟后连接将自动关闭。
{
"msgRateIn": 4641.528542257553,
"msgThroughputIn": 44663039.74947473,
"msgRateOut": 0,
"msgThroughputOut": 0,
"averageMsgSize": 1232439.816728665,
"storageSize": 135532389160,
"publishers": [
{
"msgRateIn": 57.855383881403576,
"msgThroughputIn": 558994.7078932219,
"averageMsgSize": 613135,
"producerId": 0,
"producerName": null,
"address": null,
"connectedSince": null
}
],
"subscriptions": {
"my-topic_subscription": {
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgBacklog": 116632,
"type": null,
"msgRateExpired": 36.98245516804671,
"consumers": []
}
},
"replication": {}
}
pulsar-admin
可以使用 stats
命令获取 topic 的统计信息。
$ pulsar-admin persistent stats \
persistent://test-tenant/ns1/tp1 \
REST API
GET /admin/v2/persistent/:tenant/:namespace/:topic/stats
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().getStats(topic);
获取内部统计信息
获取 topic 的详细统计信息。
entriesAddedCounter:加载自此 broker 起发布的消息到 topic
numberOfEntries:追踪消息总数
totalSize:消息总存储量(以 byte 为单位)
currentLedgerEntries:当前开放写入操作的 ledger 中,写入消息总量
currentLedgerSize:当前开放写入操作的 ledger 中,写入消息总量(以 byte 为单位)
lastLedgerCreatedTimestamp:最后一个 ledger 创建的时间
lastLedgerCreationFailureTimestamp: 最后一次 ledger 创建失败的时间
waitingCursorsCount:与消息发布进度同步,等待新消息发布的游标数量
pendingAddEntriesCount:有等待完成的(异步)写请求的消息数量
lastConfirmedEntry:最后一条成功写入消息的 ledgerid:entryid。 如果 entryid 为 -1,则 ledger 已经允许写入或正在开放写入权限,但还没有写入 entry。
state: The state of this ledger for writing. LedgerOpened means we have a ledger open for saving published messages.
ledgers:topic 中所保存消息的有序 ledger 列表。
cursors: The list of all cursors on this topic. There will be one for every subscription you saw in the topic stats.
markDeletePosition:ack的位置:订阅者确认收到的最后一条消息
readPosition:订阅者最后读取消息的位置
waitingReadOp:当订阅者已经读取了发布到topic的最新消息,并且等待新消息发布时,值为true。
pendingReadOps:已经发往BookKeeper,进行中的读请求数量计数器
messagesConsumedCounter:此broker加载本topic以来,此cursor确认的消息数量。
cursorLedger:被用来持久化存储当前markDeletePosition的ledger
cursorLedgerLastEntry:用来持久化存储当前markDeletePosition的最后一个entryid
individuallyDeletedMessages:在确认不符合顺序的情况下,显示了markDeletePosition和读位置间的消息确认范围
lastLedgerSwitchTimestamp: cursor ledger最后一次滚动的时间
state:cursor ledger的状态:Open代表有一个可用的cursor ledger来保存markDeletePosition的更新。
{
"entriesAddedCounter": 20449518,
"numberOfEntries": 3233,
"totalSize": 331482,
"currentLedgerEntries": 3233,
"currentLedgerSize": 331482,
"lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
"lastLedgerCreationFailureTimestamp": null,
"waitingCursorsCount": 1,
"pendingAddEntriesCount": 0,
"lastConfirmedEntry": "324711539:3232",
"state": "LedgerOpened",
"ledgers": [
{
"ledgerId": 324711539,
"entries": 0,
"size": 0
}
],
"cursors": {
"my-subscription": {
"markDeletePosition": "324711539:3133",
"readPosition": "324711539:3233",
"waitingReadOp": true,
"pendingReadOps": 0,
"messagesConsumedCounter": 20449501,
"cursorLedger": 324702104,
"cursorLedgerLastEntry": 21,
"individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
"lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
"state": "Open"
}
}
}
pulsar-admin
可以使用 stats-internal
命令获取 topic 中数据的统计信息。
$ pulsar-admin persistent stats-internal \
persistent://test-tenant/ns1/tp1 \
REST API
GET /admin/v2/persistent/:tenant/:namespace/:topic/internalStats
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().getInternalStats(topic);
查看消息
查看指定 topic 中某个订阅的 N 条消息。
pulsar-admin
$ pulsar-admin persistent peek-messages \
--count 10 --subscription my-subscription \
persistent://test-tenant/ns1/tp1 \
Message ID: 315674752:0
Properties: { "X-Pulsar-publish-time" : "2015-07-13 17:40:28.451" }
msg-payload
REST API
GET /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/position/:messagePosition
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
String subName = "my-subscription";
int numMessages = 1;
admin.persistentTopics().peekMessages(topic, subName, numMessages);
跳过消息
某个订阅跳过指定topic的N条消息。
pulsar-admin
$ pulsar-admin persistent skip \
--count 10 --subscription my-subscription \
persistent://test-tenant/ns1/tp1 \
REST API
POST /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/skip/:numMessages
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
String subName = "my-subscription";
int numMessages = 1;
admin.persistentTopics().skipMessages(topic, subName, numMessages);
跳过所有消息
某个订阅跳过指定topic的所有消息
pulsar-admin
$ pulsar-admin persistent skip-all \
--subscription my-subscription \
persistent://test-tenant/ns1/tp1 \
REST API
POST /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/skip_all
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
String subName = "my-subscription";
admin.persistentTopics().skipAllMessages(topic, subName);
重置cursor
重置订阅的cursor位置回到X分钟之前被记录的位置。 实际上通过计算时间和X分钟之前的cursor位置,来重置回到那个位置。
pulsar-admin
$ pulsar-admin persistent reset-cursor \
--subscription my-subscription --time 10 \
persistent://test-tenant/ns1/tp1 \
REST API
POST /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/resetcursor/:timestamp
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
String subName = "my-subscription";
long timestamp = 2342343L;
admin.persistentTopics().skipAllMessages(topic, subName, timestamp);
查询topic
定位正在服务于指定topic的broker
pulsar-admin
$ pulsar-admin persistent lookup \
persistent://test-tenant/ns1/tp1 \
"pulsar://broker1.org.com:4480"
REST API
GET /lookup/v2/topic/persistent/:tenant:namespace/:topic
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.lookup().lookupDestination(topic);
获取bundle
给出包含指定topic的bundle范围。
pulsar-admin
$ pulsar-admin persistent bundle-range \
persistent://test-tenant/ns1/tp1 \
"0x00000000_0xffffffff"
REST API
GET /lookup/v2/topic/:topic_domain/:tenant/:namespace/:topic/bundle
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.lookup().getBundleRange(topic);
获取订阅
给出了指定topic的所有订阅的名称。
pulsar-admin
$ pulsar-admin persistent subscriptions \
persistent://test-tenant/ns1/tp1 \
my-subscription
REST API
GET /admin/v2/persistent/:tenant/:namespace/:topic/subscriptions
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().getSubscriptions(topic);
取消订阅
当不再处理更多消息时,可以取消订阅
pulsar-admin
$ pulsar-admin persistent unsubscribe \
--subscription my-subscription \
persistent://test-tenant/ns1/tp1 \
REST API
DELETE /admin/v2/namespaces/:tenant/:namespace/:topic/subscription/:subscription
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
String subscriptionName = "my-subscription";
admin.persistentTopics().deleteSubscription(topic, subscriptionName);
最后一条消息Id
给出提交到持久topic的最后一条消息ID,将在2.3.0中提供此特性。
pulsar-admin topics last-message-id topic-name
REST API
{% endpoint Get /admin/v2/persistent/:tenant/:namespace/:topic/lastMessageId %}
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().getLastMessage(topic);