您可以使用Pulsar的 admin API来创建和管理分区主题。
以下说明和命令中所使用 topic 的名称结构如下:
persistent://tenant/namespace/topic
分区主题资源
创建
Partitioned topics in Pulsar must be explicitly created. When creating a new partitioned topic you need to provide a name for the topic as well as the desired number of partitions.
Note
By default, after 60 seconds of creation, topics are considered inactive and deleted automatically to prevent from generating trash data.
To disable this feature, set
brokerDeleteInactiveTopicsEnabled
tofalse
.To change the frequency of checking inactive topics, set
brokerDeleteInactiveTopicsFrequencySeconds
to your desired value.For more information about these two parameters, see here.
pulsar-admin
你可以使用create-partitioned-topic
命令创建partitioned topic,并指定topic的名字;使用-p
或 --partitions
标志指定分区数。
Here’s an example:
$ bin/pulsar-admin topics create-partitioned-topic \
persistent://my-tenant/my-namespace/my-topic \
--partitions 4
Note
If there already exists a non partitioned topic with suffix ‘-partition-‘ followed by numeric value like ‘xyz-topic-partition-10’, then you can not create partitioned topic with name ‘xyz-topic’ as the partitions of the partitioned topic could override the existing non partitioned topic. You have to delete that non partitioned topic first then create the partitioned topic.
REST API
PUT /admin/v2/persistent/:tenant/:namespace/:topic/partitions
Java
String topicName = "persistent://my-tenant/my-namespace/my-topic";
int numPartitions = 4;
admin.persistentTopics().createPartitionedTopic(topicName, numPartitions);
Create missed partitions
Try to create partitions for partitioned topic. The partitions of partition topic has to be created, can be used by repair partitions when topic auto creation is disabled
pulsar-admin
You can create missed partitions using the create-missed-partitions
command and specifying the topic name as an argument.
Here’s an example:
$ bin/pulsar-admin topics create-missed-partitions \
persistent://my-tenant/my-namespace/my-topic \
REST API
POST /admin/v2/persistent/:tenant/:namespace/:topic
Java
String topicName = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().createMissedPartitions(topicName);
获取元数据
Partitioned topics have metadata associated with them that you can fetch as a JSON object. The following metadata fields are currently available:
字段 | 含义 |
---|---|
partitions | Topic 分区的数量 |
pulsar-admin
You can see the number of partitions in a partitioned topic using the get-partitioned-topic-metadata
subcommand. Here’s an example:
$ pulsar-admin topics get-partitioned-topic-metadata \
persistent://my-tenant/my-namespace/my-topic
{
"partitions": 4
}
REST API
GET /admin/v2/persistent/:tenant/:namespace/:topic/partitions
Java
String topicName = "persistent://my-tenant/my-namespace/my-topic";
admin.persistentTopics().getPartitionedTopicMetadata(topicName);
更新
You can update the number of partitions on an existing partitioned topic if the topic is non-global. To update, the new number of partitions must be greater than the existing number.
Decrementing the number of partitions would deleting the topic, which is not supported in Pulsar.
已创建的分区生产者和消费者,将自动查找新创建的分区。
pulsar-admin
Partitioned topics can be updated using the update-partitioned-topic
command.
$ pulsar-admin topics update-partitioned-topic \
persistent://my-tenant/my-namespace/my-topic \
--partitions 8
REST API
POST /admin/v2/persistent/:tenant/:cluster/:namespace/:destination/partitions
Java
admin.persistentTopics().updatePartitionedTopic(persistentTopic, numPartitions);
删除
pulsar-admin
Partitioned topics can be deleted using the delete-partitioned-topic
command, specifying the topic by name:
$ bin/pulsar-admin topics delete-partitioned-topic \
persistent://my-tenant/my-namespace/my-topic
REST API
DELETE /admin/v2/persistent/:topic/:namespace/:destination/partitions
Java
admin.persistentTopics().delete(persistentTopic);
获取资源列表
It provides a list of persistent topics existing under a given namespace.
pulsar-admin
$ pulsar-admin topics list tenant/namespace
persistent://tenant/namespace/topic1
persistent://tenant/namespace/topic2
REST API
GET /admin/v2/persistent/:tenant/:namespace
Java
admin.persistentTopics().getList(namespace);
统计信息
It shows current statistics of a given partitioned topic. Here’s an example payload:
{
"msgRateIn": 4641.528542257553,
"msgThroughputIn": 44663039.74947473,
"msgRateOut": 0,
"msgThroughputOut": 0,
"averageMsgSize": 1232439.816728665,
"storageSize": 135532389160,
"publishers": [
{
"msgRateIn": 57.855383881403576,
"msgThroughputIn": 558994.7078932219,
"averageMsgSize": 613135,
"producerId": 0,
"producerName": null,
"address": null,
"connectedSince": null
}
],
"subscriptions": {
"my-topic_subscription": {
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgBacklog": 116632,
"type": null,
"msgRateExpired": 36.98245516804671,
"consumers": []
}
},
"replication": {}
}
The following stats are available:
统计信息 | Description |
---|---|
msgRateIn | The sum of all local and replication publishers’ publish rates in messages per second |
msgThroughputIn | Same as msgRateIn but in bytes per second instead of messages per second |
msgRateOut | The sum of all local and replication consumers’ dispatch rates in messages per second |
msgThroughputOut | Same as msgRateOut but in bytes per second instead of messages per second |
averageMsgSize | Average message size, in bytes, from this publisher within the last interval |
storageSize | The sum of the ledgers’ storage size for this topic |
publishers | The list of all local publishers into the topic. There can be anywhere from zero to thousands. |
producerId | Internal identifier for this producer on this topic |
producerName | Internal identifier for this producer, generated by the client library |
address | IP address and source port for the connection of this producer |
connectedSince | Timestamp this producer was created or last reconnected |
subscriptions | The list of all local subscriptions to the topic |
my-subscription | The name of this subscription (client defined) |
msgBacklog | The count of messages in backlog for this subscription |
msgBacklogNoDelayed | The count of messages in backlog without delayed messages for this subscription |
type | This subscription type |
msgRateExpired | The rate at which messages were discarded instead of dispatched from this subscription due to TTL |
consumers | The list of connected consumers for this subscription |
consumerName | Internal identifier for this consumer, generated by the client library |
availablePermits | The number of messages this consumer has space for in the client library’s listen queue. A value of 0 means the client library’s queue is full and receive() isn’t being called. 非零值意味着 consumer 可以接收消息。 |
replication | This section gives the stats for cross-colo replication of this topic |
replicationBacklog | The outbound replication backlog in messages |
connected | Whether the outbound replicator is connected |
replicationDelayInSeconds | How long the oldest message has been waiting to be sent through the connection, if connected is true |
inboundConnection | The IP and port of the broker in the remote cluster’s publisher connection to this broker |
inboundConnectedSince | The TCP connection being used to publish messages to the remote cluster. 如果没有连接到本地发布者,一分钟后连接将自动关闭。 |
pulsar-admin
The stats for the partitioned topic and its connected producers and consumers can be fetched by using the partitioned-stats
command, specifying the topic by name:
$ pulsar-admin topics partitioned-stats \
persistent://test-tenant/namespace/topic \
--per-partition
REST API
GET /admin/v2/persistent/:tenant/:namespace/:topic/partitioned-stats
Java
admin.topics().getPartitionedStats(persistentTopic, true /* per partition */, false /* is precise backlog */);
Internal stats
获取 topic 的详细统计信息。
统计信息 | Description |
---|---|
entriesAddedCounter | Messages published since this broker loaded this topic |
numberOfEntries | Total number of messages being tracked |
totalSize | Total storage size in bytes of all messages |
currentLedgerEntries | Count of messages written to the ledger currently open for writing |
currentLedgerSize | Size in bytes of messages written to ledger currently open for writing |
lastLedgerCreatedTimestamp | Time when last ledger was created |
lastLedgerCreationFailureTimestamp | time when last ledger was failed |
waitingCursorsCount | How many cursors are caught up and waiting for a new message to be published |
pendingAddEntriesCount | How many messages have (asynchronous) write requests we are waiting on completion |
lastConfirmedEntry | The ledgerid:entryid of the last message successfully written. 如果 entryid 为 -1,则 ledger 已经允许写入或正在开放写入权限,但还没有写入 entry。 |
state | The state of the cursor ledger. Open means we have a cursor ledger for saving updates of the markDeletePosition. |
ledgers | The ordered list of all ledgers for this topic holding its messages |
cursors | The list of all cursors on this topic. There will be one for every subscription you saw in the topic stats. |
markDeletePosition | The ack position: the last message the subscriber acknowledged receiving |
readPosition | The latest position of subscriber for reading message |
waitingReadOp | This is true when the subscription has read the latest message published to the topic and is waiting on new messages to be published. |
pendingReadOps | The counter for how many outstanding read requests to the BookKeepers we have in progress |
messagesConsumedCounter | Number of messages this cursor has acked since this broker loaded this topic |
cursorLedger | The ledger being used to persistently store the current markDeletePosition |
cursorLedgerLastEntry | The last entryid used to persistently store the current markDeletePosition |
individuallyDeletedMessages | If Acks are being done out of order, shows the ranges of messages Acked between the markDeletePosition and the read-position |
lastLedgerSwitchTimestamp | The last time the cursor ledger was rolled over |
{
"entriesAddedCounter": 20449518,
"numberOfEntries": 3233,
"totalSize": 331482,
"currentLedgerEntries": 3233,
"currentLedgerSize": 331482,
"lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
"lastLedgerCreationFailureTimestamp": null,
"waitingCursorsCount": 1,
"pendingAddEntriesCount": 0,
"lastConfirmedEntry": "324711539:3232",
"state": "LedgerOpened",
"ledgers": [
{
"ledgerId": 324711539,
"entries": 0,
"size": 0
}
],
"cursors": {
"my-subscription": {
"markDeletePosition": "324711539:3133",
"readPosition": "324711539:3233",
"waitingReadOp": true,
"pendingReadOps": 0,
"messagesConsumedCounter": 20449501,
"cursorLedger": 324702104,
"cursorLedgerLastEntry": 21,
"individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
"lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
"state": "Open"
}
}
}
pulsar-admin
The internal stats for the partitioned topic can be fetched by using the stats-internal
command, specifying the topic by name:
$ pulsar-admin topics stats-internal \
persistent://test-tenant/namespace/topic
REST API
GET /admin/v2/persistent/:tenant/:namespace/:topic/internalStats
Java
admin.persistentTopics().getInternalStats(persistentTopic);