Manage topics
Pulsar 提供持久化与非持久化两种topic。 持久化topic是消息发布、消费的逻辑端点。 持久化主题地址的命名格式如下:
persistent://tenant/namespace/topic
非持久主题应用在仅消费实时发布消息与不需要持久化保证的应用程序。 通过这种方式,它通过删除持久消息的开销来减少消息发布延迟。 非持久化topic地址的命名格式如下:
non-persistent://tenant/namespace/topic
管理主题资源
无论是持久化还是非持久化主题,你可以通过pulsar-admin
工具、REST API 、 Java 获取到主题资源。
备注
REST API 中:schema
分为 persistent(持久化)和 non-persistent(非持久化)。:tenant
,:namespace
,:x
是变量,在使用时候请注意使用真正的租户、命名空间,和x
替换。
以 GET /admin/v2/:schema/:tenant/:namespace 为例, 若要获取 REST API 中持久化主题列表,请参考https://pulsar.apache.org/admin/v2/persistent/my-tenant/my-namespace
。 若要获取 REST API 中非持久化主题的列表,请参考https://pulsar.apache.org/admin/v2/non-persistent/my-tenant/my-namespace
。
列出 topic
你可以通过以下方式获得特定命名空间下的 topic 列表。
pulsar-admin
REST API
Java
$ pulsar-admin topics list \ my-tenant/my-namespace
GET /admin/v2/:schema/:tenant/:namespace
String namespace = "my-tenant/my-namespace";admin.topics().getList(namespace);
授权
通过以下方式可以在客户端角色上授权,以便于在指定的主题上执行具体的操作。
pulsar-admin
REST API
Java
$ pulsar-admin topics grant-permission \ --actions produce,consume --role application1 \ persistent://test-tenant/ns1/tp1 \
POST /admin/v2/:schema/:tenant/:namespace/:topic/permissions/:role
String topic = "persistent://my-tenant/my-namespace/my-topic";String role = "test-role";Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce, AuthAction.consume);admin.topics().grantPermission(topic, role, actions);
获取权限
通过以下方式获取权限认证。
pulsar-admin
REST API
Java
$ pulsar-admin topics permissions \ persistent://test-tenant/ns1/tp1 \{ "application1": [ "consume", "produce" ]}
GET /admin/v2/:schema/:tenant/:namespace/:topic/permissions
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getPermissions(topic);
取消权限
您可以通过以下方式撤销授予客户端角色的权限。
pulsar-admin
REST API
Java
$ pulsar-admin topics revoke-permission \ --role application1 \ persistent://test-tenant/ns1/tp1 \{ "application1": [ "consume", "produce" ]}
DELETE /admin/v2/:schema/:tenant/:namespace/:topic/permissions/:role
String topic = "persistent://my-tenant/my-namespace/my-topic";String role = "test-role";admin.topics().revokePermissions(topic, role);
删除 topic
您可以通过以下方式删除一个主题。 如果某个主题拥有任何活跃的订阅或者生产者,则不能对其删除。
pulsar-admin
REST API
Java
$ pulsar-admin topics delete \ persistent://test-tenant/ns1/tp1 \
DELETE /admin/v2/:schema/:tenant/:namespace/:topic
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().delete(topic);
卸载 topic
通过以下方式卸载主题。
pulsar-admin
REST API
Java
$ pulsar-admin topics unload \ persistent://test-tenant/ns1/tp1 \
PUT /admin/v2/:schema/:tenant/:namespace/:topic/unload
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().unload(topic);
获取统计信息
你可以检查给定的未分区主题的以下统计数据。
msgRateIn:所有本地和副本发布者每秒发布消息速率之和(msg/s)。
msgThroughputIn:所有本地和副本发布者每秒发布消息字节数之和(byte/s)。
msgRateOut:所有本地和副本消费者每秒调度消息率之和(msg/s)。
msgThroughputOut:所有本地和副本消费者每秒调度消息字节数之和(byte/s)。
averageMsgSize:在最近时间间隔内发布消息的平均大小(以字节为单位)。
storageSize: The sum of the ledgers’ storage size for this topic. 用于存储该主题消息的空间。
publishers: The list of all local publishers into the topic. 该列表的范围从零到千不等。
msgRateIn:发布者每秒发布消息的总速率(msg/s)。
msgThroughputIn:发布者发布消息的总吞吐量(byte/s)。
averageMsgSize:发布者在最近时间间隔内发布消息的平均大小(以字节为单位)。
producerId:该主题对应的生产者内部识别号。
producerName:由客户端生成的生产者内部识别名称。
address:连接生产者所需的 IP 地址和端口号。
connectedSince:生产者上次创建或者重新连接时的时间戳。
subscriptions:该主题下的所有本地订阅列表。
my-subscription:当前订阅的订阅名称。 通过客户端去定义。
msgRateOut:在此订阅中发送消息的总速率(msg/s)。
msgThroughputOut:在此订阅上发送消息的总吞吐量(byte/s)。
msgBacklog:在此订阅上积压的消息数量。
type:订阅类型。
msgRateExpired:由于 TTL 的原因,消息被丢弃而不是发送到此订阅中的比例。
lastExpireTimestamp:最后一条执行消息过期的时间戳。
lastConsumedFlowTimestamp:收到的最后一条流量指令的时间戳。
lastConsumedTimestamp:消费者所有已消费消息的最新时间戳。
lastAckedTimestamp:消费者所有已被 ack 消息的最新时间戳
consumers:连接到此订阅的消费者列表。
msgRateOut:发送给消费者的消息总速率(msg/s)。
msgThroughputOut: 发送给消费者的总吞吐量(byte/s)。
consumerName:由客户端生成的消费者内部标识符。
availablePermits:消费者在客户端库的监听队列中有空闲的消息数量。
0
意味着客户端库的队列已经满了,receive()
方法不会再接收消息。 非 0 值意味着该消费者可以接收消息。unackedMessages:消费者未确认消息的数量。
blockedConsumerOnUnackedMsgs:验证消费者是否因达到未确认消息数的阀值而被阻塞。
lastConsumedTimestamp:消费者最后一次读取消息的时间戳。
lastAckedTimestamp:消费者最后一次确认消息的时间戳。
replication: This section gives the stats for cross-colo replication of this topic
msgRateIn:从远程集群中收到消息的总速率(msg/s)。
msgThroughputIn:从远程集群中收到消息的总吞吐量(byte/s)。
msgRateOut:发送给副本订阅的消息总速率(msg/s)。
msgThroughputOut:发送给副本订阅的消息总吞吐量(byte/s)。
msgRateExpired:过期消息的总速率(msg/s)。
replicationBacklog:待复制到远程集群的消息数量。
connected:验证外部副本连接器是否已经连接。
replicationDelayInSeconds:如果连接是
true
的话,最早消息的已等待被发送的时长。inboundConnection:远程集群中的 broker 要连接到此 broker 的 IP 和端口。
inboundConnectedSince: The TCP connection being used to publish messages to the remote cluster. 如果没有连接到本地发布者,一分钟后连接将自动关闭。
outboundConnection:外部副本连接的地址。
outboundConnectedSince:建立对外连接时的时间戳。
下面是主题状态的示例:
{
"msgRateIn": 4641.528542257553,
"msgThroughputIn": 44663039.74947473,
"msgRateOut": 0,
"msgThroughputOut": 0,
"averageMsgSize": 1232439.816728665,
"storageSize": 135532389160,
"publishers": [
{
"msgRateIn": 57.855383881403576,
"msgThroughputIn": 558994.7078932219,
"averageMsgSize": 613135,
"producerId": 0,
"producerName": null,
"address": null,
"connectedSince": null
}
],
"subscriptions": {
"my-topic_subscription": {
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgBacklog": 116632,
"type": null,
"msgRateExpired": 36.98245516804671,
"consumers": []
}
},
"replication": {}
}
使用以下方式来获取一个主题的状态:
pulsar-admin
REST API
Java
$ pulsar-admin topics stats \ persistent://test-tenant/ns1/tp1 \
GET /admin/v2/:schema/:tenant/:namespace/:topic/stats
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getStats(topic);
获取内部统计信息
主题中的详细统计数据如下:
entriesAddedCounter:自此 broker 加载该主题以来发布的消息总量。
numberOfEntries:追踪消息的总数。
totalSize:消息的总存储大小(以字节为单位)。
currentLedgerEntries:当前打开写入操作的 ledger 中,写入的消息总数。
currentLedgerSize:当前打开写入操作的 ledger 中,写入的消息字节大小。
lastLedgerCreatedTimestamp:最后一个 ledger 创建的时间。
lastLedgerCreationFailureTimestamp: 最后一个 ledger 失败的时间。
waitingCursorsCount:等待新消息发布并标记为 “caught up”的游标数量。
pendingAddEntriesCount:完成(异步)写请求的消息数。
lastConfirmedEntry:最后一条成功写入消息的 ledger:entryid。 如果 entryid 为
-1
,即 ledger 是开启状态,但没有写入任何的 entry 。state: The state of this ledger for writing.
LedgerOpened
意味着 ledger 是开启状态,可以保存已发布的消息。ledgers:主题中所保存消息的有序 ledger 列表。
ledgerId: 此 ledger 的 ID。
entries:属于该 ledger 的 entry 总数。
size:写入该 ledger 的消息大小(以字节为单位)。
offloaded:该 ledger 是否已卸载。
metadata: 该 ledger 的元数据。
schemaLedgers:该主题模式下所有 ledger 的有序列表。
ledgerId: 此 ledger 的 ID。
entries:属于该 ledger 的 entry 总数。
size:写入该 ledger 的消息大小(以字节为单位)。
offloaded:该 ledger 是否已卸载。
metadata: 该 ledger 的元数据。
compactedLedger: The ledgers holding un-acked messages after topic compaction.
ledgerId: 此 ledger 的 ID。
entries:属于该 ledger 的 entry 总数。
size:写入该 ledger 的消息大小(以字节为单位)。
offloaded:该 ledger 是否已卸载。 The value is
false
for the compacted topic ledger.
cursors: The list of all cursors on this topic. Each subscription in the topic stats has a cursor.
markDeletePosition: All messages before the markDeletePosition are acknowledged by the subscriber.
readPosition: The latest position of subscriber for reading message.
waitingReadOp: This is true when the subscription has read the latest message published to the topic and is waiting for new messages to be published.
pendingReadOps: The counter for how many outstanding read requests to the BookKeepers in progress.
messagesConsumedCounter: The number of messages this cursor has acked since this broker loaded this topic.
cursorLedger: The ledger being used to persistently store the current markDeletePosition.
cursorLedgerLastEntry: The last entryid used to persistently store the current markDeletePosition.
individuallyDeletedMessages: If acknowledges are being done out of order, the ranges of messages acknowledged between the markDeletePosition and the read-position shows.
lastLedgerSwitchTimestamp: The last time the cursor ledger is rolled over.
state: The state of the cursor ledger:
Open
means you have a cursor ledger for saving updates of the markDeletePosition.
下面是关于一个主题的详细统计示例。
{
"entriesAddedCounter":0,
"numberOfEntries":0,
"totalSize":0,
"currentLedgerEntries":0,
"currentLedgerSize":0,
"lastLedgerCreatedTimestamp":"2021-01-22T21:12:14.868+08:00",
"lastLedgerCreationFailureTimestamp":null,
"waitingCursorsCount":0,
"pendingAddEntriesCount":0,
"lastConfirmedEntry":"3:-1",
"state":"LedgerOpened",
"ledgers":[
{
"ledgerId":3,
"entries":0,
"size":0,
"offloaded":false,
"metadata":null
}
],
"cursors":{
"test":{
"markDeletePosition":"3:-1",
"readPosition":"3:-1",
"waitingReadOp":false,
"pendingReadOps":0,
"messagesConsumedCounter":0,
"cursorLedger":4,
"cursorLedgerLastEntry":1,
"individuallyDeletedMessages":"[]",
"lastLedgerSwitchTimestamp":"2021-01-22T21:12:14.966+08:00",
"state":"Open",
"numberOfEntriesSinceFirstNotAckedMessage":0,
"totalNonContiguousDeletedMessagesRange":0,
"properties":{
}
}
},
"schemaLedgers":[
{
"ledgerId":1,
"entries":11,
"size":10,
"offloaded":false,
"metadata":null
}
],
"compactedLedger":{
"ledgerId":-1,
"entries":-1,
"size":-1,
"offloaded":false,
"metadata":null
}
}
可以使用以下方式来获取一个主题的内部状态。
pulsar-admin
REST API
Java
$ pulsar-admin topics stats-internal \ persistent://test-tenant/ns1/tp1 \
GET /admin/v2/:schema/:tenant/:namespace/:topic/internalStats
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getInternalStats(topic);
查看消息
可以使用以下方式为某一主题的特定订阅提供一些信息。
pulsar-admin
REST API
Java
$ pulsar-admin topics peek-messages \ --count 10 --subscription my-subscription \ persistent://test-tenant/ns1/tp1 \Message ID: 315674752:0Properties: { "X-Pulsar-publish-time" : "2015-07-13 17:40:28.451" }msg-payload
GET /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/position/:messagePosition
String topic = "persistent://my-tenant/my-namespace/my-topic";String subName = "my-subscription";int numMessages = 1;admin.topics().peekMessages(topic, subName, numMessages);
Get message by ID
可以使用以下方式获取给定 ledger ID 和 entry ID 的信息。
pulsar-admin
REST API
Java
$ ./bin/pulsar-admin topics get-message-by-id \ persistent://public/default/my-topic \ -l 10 -e 0
GET /admin/v2/:schema/:tenant/:namespace/:topic/ledger/:ledgerId/entry/:entryId
String topic = "persistent://my-tenant/my-namespace/my-topic";long ledgerId = 10;long entryId = 10;admin.topics().getMessageById(topic, ledgerId, entryId);
跳过消息
可以使用以下方式跳过某一主题的特定订阅的一些信息。
pulsar-admin
REST API
Java
$ pulsar-admin topics skip \ --count 10 --subscription my-subscription \ persistent://test-tenant/ns1/tp1 \
POST /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/skip/:numMessages
String topic = "persistent://my-tenant/my-namespace/my-topic";String subName = "my-subscription";int numMessages = 1;admin.topics().skipMessages(topic, subName, numMessages);
跳过所有消息
跳过某一主题的特定订阅的所有旧消息。
pulsar-admin
REST API
Java
$ pulsar-admin topics skip-all \ --subscription my-subscription \ persistent://test-tenant/ns1/tp1 \
POST /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/skip_all
String topic = "persistent://my-tenant/my-namespace/my-topic";String subName = "my-subscription";admin.topics().skipAllMessages(topic, subName);
重置cursor
可以将一个订阅的游标位置重新设置为 X 分钟前记录的位置。 实际上通过计算时间和X分钟之前的cursor位置,来重置回到那个位置。 你可以用下面方式重置 cursor。
pulsar-admin
REST API
Java
$ pulsar-admin topics reset-cursor \ --subscription my-subscription --time 10 \ persistent://test-tenant/ns1/tp1 \
POST /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/resetcursor/:timestamp
String topic = "persistent://my-tenant/my-namespace/my-topic";String subName = "my-subscription";long timestamp = 2342343L;admin.topics().skipAllMessages(topic, subName, timestamp);
查询topic
可以通过以下方式找到服务于特定主题的 broker URL。
pulsar-admin
REST API
Java
$ pulsar-admin topics lookup \ persistent://test-tenant/ns1/tp1 \ "pulsar://broker1.org.com:4480"
GET /lookup/v2/topic/:schema/:tenant:namespace/:topic
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.lookup().lookupDestination(topic);
获取bundle
可以通过以下方式检查包含给定主题的 bundle 范围。
pulsar-admin
REST API
Java
$ pulsar-admin topics bundle-range \ persistent://test-tenant/ns1/tp1 \ "0x00000000_0xffffffff"
GET /lookup/v2/topic/:topic_domain/:tenant/:namespace/:topic/bundle
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.lookup().getBundleRange(topic);
获取订阅
可以通过以下方式查看某个主题的所有订阅名称。
pulsar-admin
REST API
Java
$ pulsar-admin topics subscriptions \ persistent://test-tenant/ns1/tp1 \ my-subscription
GET /admin/v2/:schema/:tenant/:namespace/:topic/subscriptions
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getSubscriptions(topic);
最后一条消息Id
可以获得一个持久化主题最后提交的消息 ID 。 2.3.0 往后版本都可用。
pulsar-admin
REST API
Java
pulsar-admin topics last-message-id topic-name
Get /admin/v2/:schema/:tenant/:namespace/:topic/lastMessageId?version=2.9.2
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getLastMessage(topic);
配置去重快照间隔
获取去重快照间隔
获取主题级去重快照间隔可以使用以下方法之一。
Pulsar-admin API
REST API
Java API
pulsar-admin topics get-deduplication-snapshot-interval options
{@inject: endpoint|GET|/admin/v2/topics/:tenant/:namespace/:topic/deduplicationSnapshotInterval?version=[[pulsar:version_number]]}
admin.topics().getDeduplicationSnapshotInterval(topic)
设置去重快照间隔
设置主题级去重快照间隔,可以使用以下方法之一。
前提条件
brokerDeduplicationEnabled
必须设置为true
。
Pulsar-admin API
REST API
Java API
pulsar-admin topics set-deduplication-snapshot-interval options
{@inject: endpoint|POST|/admin/v2/topics/:tenant/:namespace/:topic/deduplicationSnapshotInterval?version=[[pulsar:version_number]]}
{ "interval": 1000}
admin.topics().setDeduplicationSnapshotInterval(topic, 1000)
移除去重快照间隔
移除主题级去重快照间隔,可以使用以下方法之一。
Pulsar-admin API
REST API
Java API
pulsar-admin topics remove-deduplication-snapshot-interval options
{@inject: endpoint|DELETE|/admin/v2/topics/:tenant/:namespace/:topic/deduplicationSnapshotInterval?version=[[pulsar:version_number]]}
admin.topics().removeDeduplicationSnapshotInterval(topic)
配置非活跃主题策略
获取非活跃主题策略
获取主题级非活跃主题策略,可以使用以下方法之一。
Pulsar-admin API
REST API
Java API
pulsar-admin topics get-inactive-topic-policies options
{@inject: endpoint|GET|/admin/v2/topics/:tenant/:namespace/:topic/inactiveTopicPolicies?version=[[pulsar:version_number]]}
admin.topics().getInactiveTopicPolicies(topic)
设置非活跃主题策略
设置主题级非活跃主题策略,可以使用以下方法之一。
Pulsar-admin API
REST API
Java API
pulsar-admin topics set-inactive-topic-policies options
{@inject: endpoint|POST|/admin/v2/topics/:tenant/:namespace/:topic/inactiveTopicPolicies?version=[[pulsar:version_number]]}
admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies)
移除非活跃主题策略
移除主题级非活跃主题策略,可以使用以下方法之一。
Pulsar-admin API
REST API
Java API
pulsar-admin topics remove-inactive-topic-policies options
{@inject: endpoint|DELETE|/admin/v2/topics/:tenant/:namespace/:topic/inactiveTopicPolicies?version=[[pulsar:version_number]]}
admin.topics().removeInactiveTopicPolicies(topic)
配置卸载策略
获取卸载策略
获取主题级卸载策略,可以使用以下方法之一。
Pulsar-admin API
REST API
Java API
pulsar-admin topics get-offload-policies options
{@inject: endpoint|GET|/admin/v2/topics/:tenant/:namespace/:topic/offloadPolicies?version=[[pulsar:version_number]]}
admin.topics().getOffloadPolicies(topic)
设置卸载策略
设置主题级卸载策略,可以使用以下方法之一。
Pulsar-admin API
REST API
Java API
pulsar-admin topics set-offload-policies options
{@inject: endpoint|POST|/admin/v2/topics/:tenant/:namespace/:topic/offloadPolicies?version=[[pulsar:version_number]]}
admin.topics().setOffloadPolicies(topic, offloadPolicies)
移除卸载策略
移除主题级卸载策略,可以使用以下方法之一。
Pulsar-admin API
REST API
Java API
pulsar-admin topics remove-offload-policies options
{@inject: endpoint|DELETE|/admin/v2/topics/:tenant/:namespace/:topic/offloadPolicies?version=[[pulsar:version_number]]}
admin.topics().removeOffloadPolicies(topic)
管理非分区主题
可以使用 Pulsar admin API 来创建、删除和检查非分区主题的状态。
创建
必须明确创建非分区主题。 当创建一个新的非分区主题时,需要为该主题提供一个名称。
默认情况下,创建 60 秒后,主题会被视为不活跃,并自动删除,以避免生成垃圾数据。 To disable this feature, set brokerDeleteInactiveTopicsEnabled
to false
. 设置 brokerDeleteInactiveTopicsFrequencySeconds
为特殊值以改变检查非活动主题的频率。
关于这两个参数的更多信息,请参阅 这里。
可以通过以下方式创建非分区主题。
pulsar-admin
REST API
Java
When you create non-partitioned topics with the create command, you need to specify the topic name as an argument.
$ bin/pulsar-admin topics create \ persistent://my-tenant/my-namespace/my-topic
备注
当你创建一个后缀为 ‘’-partition-‘ 的非分区主题时,主题名称后跟数字值,如 ‘xyz-topic-partition-x’ ,如果存在 ‘xyz-topic-partition-y’ 的分区主题,那么非分区主题的数字值(x)必须大于分区主题的分区数(y)。 否则就无法创建未分区主题。
PUT /admin/v2/:schema/:tenant/:namespace/:topic
String topicName = "persistent://my-tenant/my-namespace/my-topic";admin.topics().createNonPartitionedTopic(topicName);
删除
可以通过以下方式来删除非分区主题。
pulsar-admin
REST API
Java
$ bin/pulsar-admin topics delete \ persistent://my-tenant/my-namespace/my-topic
DELETE /admin/v2/:schema/:tenant/:namespace/:topic
admin.topics().delete(topic);
获取资源列表
你可以通过以下方式获得特定命名空间下的 topic 列表。
pulsar-admin
REST API
Java
$ pulsar-admin topics list tenant/namespacepersistent://tenant/namespace/topic1persistent://tenant/namespace/topic2
GET /admin/v2/:schema/:tenant/:namespace
admin.topics().getList(namespace);
统计信息
检查某个主题的当前统计数据。 The following is an example. 关于每个统计数据的描述,参见 get stats。
{
"msgRateIn": 4641.528542257553,
"msgThroughputIn": 44663039.74947473,
"msgRateOut": 0,
"msgThroughputOut": 0,
"averageMsgSize": 1232439.816728665,
"storageSize": 135532389160,
"publishers": [
{
"msgRateIn": 57.855383881403576,
"msgThroughputIn": 558994.7078932219,
"averageMsgSize": 613135,
"producerId": 0,
"producerName": null,
"address": null,
"connectedSince": null
}
],
"subscriptions": {
"my-topic_subscription": {
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgBacklog": 116632,
"type": null,
"msgRateExpired": 36.98245516804671,
"consumers": []
}
},
"replication": {}
}
可以通过以下方式检查某个主题及其相关生产者和消费者的当前统计数据。
pulsar-admin
REST API
Java
$ pulsar-admin topics stats \ persistent://test-tenant/namespace/topic \ --get-precise-backlog
GET /admin/v2/:schema/:tenant/:namespace/:topic/stats
admin.topics().getStats(topic, false /* is precise backlog */);
管理分区主题
可以使用 Pulsar admin API 来创建、更新、删除和检查分区主题的状态。
创建
必须明确创建分区主题。 当创建一个新的分区主题时,需要为该主题提供一个名称和分区数量。
默认情况下,创建 60 秒后,主题会被视为不活跃,并自动删除,以避免生成垃圾数据。 To disable this feature, set brokerDeleteInactiveTopicsEnabled
to false
. 设置 brokerDeleteInactiveTopicsFrequencySeconds
为特殊值以改变检查非活动主题的频率。
关于这两个参数的更多信息,请参阅 这里。
可以通过以下方式创建分区主题。
pulsar-admin
REST API
Java
When you create partitioned topics with the create-partitioned-topic command, you need to specify the topic name as an argument and the number of partitions using the -p
or --partitions
flag.
$ bin/pulsar-admin topics create-partitioned-topic \ persistent://my-tenant/my-namespace/my-topic \ --partitions 4
备注
如果一个非分区主题的后缀是 ‘-partition-‘ 并且后面跟着一个数字值,比如 ‘xyz-topic-partition-10’,那么就不能创建名为 ‘xyz-topic’ 的分区主题,因为分区主题的分区可以覆盖现有的非分区主题。 必须删除上述的非分区主题,才能创建该分区主题。
PUT /admin/v2/:schema/:tenant/:namespace/:topic/partitions
String topicName = "persistent://my-tenant/my-namespace/my-topic";int numPartitions = 4;admin.topics().createPartitionedTopic(topicName, numPartitions);
Create missed partitions
在主题的 auto-creation 是禁用状态并且有一个没有任何分区的主题时,可以使用 create-missed-partitions 命令为主题创建分区。
pulsar-admin
REST API
Java
可以用 create-missed-partitions 命令指定主题名称作为参数来创建 miss 分区。
$ bin/pulsar-admin topics create-missed-partitions \ persistent://my-tenant/my-namespace/my-topic \
POST /admin/v2/:schema/:tenant/:namespace/:topic
String topicName = "persistent://my-tenant/my-namespace/my-topic";admin.topics().createMissedPartitions(topicName);
获取元数据
已分区的主题与元数据相关联,可以将其看作一个 JSON 对象。 以下元数据字段是可用的。
字段 | 说明 |
---|---|
分区 | 主题中的分区数量。 |
pulsar-admin
REST API
Java
可以通过 get-partitioned-topic-metadata 子命令检查分区主题的分区数量。
$ pulsar-admin topics get-partitioned-topic-metadata \ persistent://my-tenant/my-namespace/my-topic{ "partitions": 4}
GET /admin/v2/:schema/:tenant/:namespace/:topic/partitions
String topicName = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getPartitionedTopicMetadata(topicName);
更新
如果 主题是非全局的,你可以更新现有已分区主题的分区数量。 然而,你只能添加分区号。 减少分区的数量就会删除对应主题,在 Pulsar 中是不支持的。
生产者和消费者可以自动找到新创建的分区。
pulsar-admin
REST API
Java
可以使用 update-partitioned-topic 命令更新分区主题。
$ pulsar-admin topics update-partitioned-topic \ persistent://my-tenant/my-namespace/my-topic \ --partitions 8
POST /admin/v2/:schema/:tenant/:cluster/:namespace/:destination/partitions
admin.topics().updatePartitionedTopic(topic, numPartitions);
删除
可以使用 delete-partitioned-topic 命令、REST API 或者 Java 删除分区主题。
pulsar-admin
REST API
Java
$ bin/pulsar-admin topics delete-partitioned-topic \ persistent://my-tenant/my-namespace/my-topic
DELETE /admin/v2/:schema/:topic/:namespace/:destination/partitions
admin.topics().delete(topic);
获取资源列表
你可以通过以下方式获取给定命名空间下的分区主题列表。
pulsar-admin
REST API
Java
$ pulsar-admin topics list-partitioned-topics tenant/namespacepersistent://tenant/namespace/topic1persistent://tenant/namespace/topic2
GET /admin/v2/:schema/:tenant/:namespace
admin.topics().getPartitionedTopicList(namespace);
统计信息
可以查看某个主题的当前统计数据。 The following is an example. 关于每个统计数据的描述,参见 get stats。
请注意,在订阅的 JSON 对象中, chuckedMessageRate
已被废弃。 请使用 chunkedMessageRate
。 两者目前都将发送到 JSON 中。
{
"msgRateIn" : 999.992947159793,
"msgThroughputIn" : 1070918.4635439808,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 270318763,
"msgInCounter" : 252489,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"averageMsgSize" : 1070.926056966454,
"msgChunkPublished" : false,
"storageSize" : 270316646,
"backlogSize" : 200921133,
"publishers" : [ {
"msgRateIn" : 999.992947159793,
"msgThroughputIn" : 1070918.4635439808,
"averageMsgSize" : 1070.3333333333333,
"chunkedMessageRate" : 0.0,
"producerId" : 0
} ],
"subscriptions" : {
"test" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"chuckedMessageRate" : 0,
"chunkedMessageRate" : 0,
"msgBacklog" : 144318,
"msgBacklogNoDelayed" : 144318,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"msgRateExpired" : 0.0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"consumers" : [ ],
"isDurable" : true,
"isReplicated" : false
}
},
"replication" : { },
"metadata" : {
"partitions" : 3
},
"partitions" : { }
}
你可以通过以下方式检查给定分区主题及其当前连接的生产者和消费者的统计信息。
pulsar-admin
REST API
Java
$ pulsar-admin topics partitioned-stats \ persistent://test-tenant/namespace/topic \ --per-partition
GET /admin/v2/:schema/:tenant/:namespace/:topic/partitioned-stats
admin.topics().getPartitionedStats(topic, true /* per partition */, false /* is precise backlog */);
Internal stats
你可以检查主题的详细统计数据。 The following is an example. 有关每个统计信息的说明,详见获取内部统计信息。
{
"entriesAddedCounter": 20449518,
"numberOfEntries": 3233,
"totalSize": 331482,
"currentLedgerEntries": 3233,
"currentLedgerSize": 331482,
"lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
"lastLedgerCreationFailureTimestamp": null,
"waitingCursorsCount": 1,
"pendingAddEntriesCount": 0,
"lastConfirmedEntry": "324711539:3232",
"state": "LedgerOpened",
"ledgers": [
{
"ledgerId": 324711539,
"entries": 0,
"size": 0
}
],
"cursors": {
"my-subscription": {
"markDeletePosition": "324711539:3133",
"readPosition": "324711539:3233",
"waitingReadOp": true,
"pendingReadOps": 0,
"messagesConsumedCounter": 20449501,
"cursorLedger": 324702104,
"cursorLedgerLastEntry": 21,
"individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
"lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
"state": "Open"
}
}
}
你可以通过以下方式获取分区主题的内部统计信息。
pulsar-admin
REST API
Java
$ pulsar-admin topics stats-internal \ persistent://test-tenant/namespace/topic
GET /admin/v2/:schema/:tenant/:namespace/:topic/internalStats
admin.topics().getInternalStats(topic);
发布到分区主题
默认情况下,Pulsar 主题由单个 broker 提供服务,这限制了主题的最大吞吐量。 分区主题可以跨越多个 broker,从而实现更高的吞吐量。
你可以使用 Pulsar 客户端库发布到分区主题。 发布到分区主题时,必须指定路由模式。 如果在创建新的生产者时没有指定任何路由方式,则使用轮询路由模式。
Routing mode
You can specify the routing mode in the ProducerConfiguration object that you use to configure your producer. 路由模式决定了每条消息应该发往哪个分区(内部主题)。
以下 MessageRoutingMode 选项是可用的。
发送模式 | 说明 |
---|---|
RoundRobinPartition | 如果没有提供 key,生产者将在所有分区中以轮训策略进行发布消息,以达到最大的吞吐量。 请注意轮训并不是作用于每条单独的消息,而是作用于延迟处理的批次边界,以确保批处理有效。 如果在消息上指定了 key ,分区生产者会根据 key 的 hash 值将消息分配给对应的分区。 这是默认的模式。 |
SinglePartition | 如果消息没有指定 key,生产者会随机挑选一个分区,并发布所有消息到该分区。 如果在消息上指定了 key ,分区生产者会根据 key 的 hash 值将消息分配给对应的分区。 |
CustomPartition | 使用自定义消息路由器实现来决定特定消息的分区。 你可以通过使用 Java 客户端和实现 MessageRouter 接口来创建一个自定义路由模式。 |
如下所示:
String pulsarBrokerRootUrl = "pulsar://localhost:6650";
String topic = "persistent://my-tenant/my-namespace/my-topic";
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.send("Partitioned topic message".getBytes());
Custom message router
要使用自定义消息路由器,您需要提供MessageRouter 接口的实现,该接口只有一个choosePartition
方法:
public interface MessageRouter extends Serializable {
int choosePartition(Message msg);
}
下面的路由模式表示所有的消息都会被发送到分区10:
public class AlwaysTenRouter implements MessageRouter {
public int choosePartition(Message msg) {
return 10;
}
}
通过该实现,你可以发送:
String pulsarBrokerRootUrl = "pulsar://localhost:6650";
String topic = "persistent://my-tenant/my-cluster-my-namespace/my-topic";
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.messageRouter(new AlwaysTenRouter())
.create();
producer.send("Partitioned topic message".getBytes());
使用 Key 时如何选择分区
If a message has a key, it supersedes the round robin routing policy. 以下示例说明当使用key时如何选择分区。
// 如果消息存在key,轮询路由策略将被替换
if (msg.hasKey()) {
return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());
}
if (isBatchingEnabled) { // 如果开启批处理,请在 `partitionSwitchMs` 边界上选择分区。
long currentMs = clock.millis();
return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartiations());
other.
return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartiations());
}
管理订阅
你可以使用 Pulsar admin Apl 创建、检查和删除订阅。
创建订阅
You can create a subscription for a topic using one of the following methods.
pulsar-admin
REST API
Java
pulsar-admin topics create-subscription \--subscription my-subscription \persistent://test-tenant/ns1/tp1
PUT /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subscription
String topic = "persistent://my-tenant/my-namespace/my-topic";String subscriptionName = "my-subscription";admin.topics().createSubscription(topic, subscriptionName, MessageId.latest);
### 获取订阅 You can check all subscription names for a given topic using one of the following methods.
pulsar-admin
REST API
Java
pulsar-admin topics subscriptions \persistent://test-tenant/ns1/tp1 \my-subscription
GET /admin/v2/:schema/:tenant/:namespace/:topic/subscriptions
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getSubscriptions(topic);
### 取消订阅 When a subscription does not process messages any more, you can unsubscribe it using one of the following methods.
pulsar-admin
REST API
Java
pulsar-admin topics unsubscribe \--subscription my-subscription \persistent://test-tenant/ns1/tp1
DELETE /admin/v2/namespaces/:tenant/:namespace/:topic/subscription/:subscription
String topic = "persistent://my-tenant/my-namespace/my-topic";String subscriptionName = "my-subscription";admin.topics().deleteSubscription(topic, subscriptionName);