Manage topics

Pulsar 提供持久化、非持久化两种主题. 给生产者、消费者提供了一个逻辑上的主题地址. 持久化主题地址命名的结构是: Persistent topic is a logical endpoint for publishing and consuming messages. 持久化主题地址命名的结构是:

  1. persistent://tenant/namespace/topic

Non-persistent topics are used in applications that only consume real-time published messages and do not need persistent guarantee. In this way, it reduces message-publish latency by removing overhead of persisting messages. The topic name structure for non-persistent topics is:

  1. non-persistent://tenant/namespace/topic

Manage topic resources

Whether it is persistent or non-persistent topic, you can obtain the topic resources through pulsar-admin tool, REST API and Java.

Note
In REST API, :schema stands for persistent or non-persistent. :tenant, :namespace, :x are variables, replace them with the real tenant, namespace, and x names when using them.
Take GET /admin/v2/:schema/:tenant/:namespace as an example, to get the list of persistent topics in REST API, use https://pulsar.apache.org/admin/v2/persistent/my-tenant/my-namespace. To get the list of non-persistent topics in REST API, use https://pulsar.apache.org/admin/v2/non-persistent/my-tenant/my-namespace.

列出 topic

You can get the list of topics under a given namespace in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics list \ my-tenant/my-namespace

GET /admin/v2/:schema/:tenant/:namespace

  1. String namespace = "my-tenant/my-namespace";admin.topics().getList(namespace);

授权

You can grant permissions on a client role to perform specific actions on a given topic in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics grant-permission \ --actions produce,consume --role application1 \ persistent://test-tenant/ns1/tp1 \

POST /admin/v2/:schema/:tenant/:namespace/:topic/permissions/:role

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";String role = "test-role";Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce, AuthAction.consume);admin.topics().grantPermission(topic, role, actions);

获取权限

You can fetch permission in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics permissions \ persistent://test-tenant/ns1/tp1 \{ "application1": [ "consume", "produce" ]}

GET /admin/v2/:schema/:tenant/:namespace/:topic/permissions

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getPermissions(topic);

取消权限

You can revoke a permission granted on a client role in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics revoke-permission \ --role application1 \ persistent://test-tenant/ns1/tp1 \{ "application1": [ "consume", "produce" ]}

DELETE /admin/v2/:schema/:tenant/:namespace/:topic/permissions/:role

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";String role = "test-role";admin.topics().revokePermissions(topic, role);

删除 topic

You can delete a topic in the following ways. You cannot delete a topic if any active subscription or producers is connected to the topic.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics delete \ persistent://test-tenant/ns1/tp1 \

DELETE /admin/v2/:schema/:tenant/:namespace/:topic

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().delete(topic);

卸载 topic

You can unload a topic in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics unload \ persistent://test-tenant/ns1/tp1 \

PUT /admin/v2/:schema/:tenant/:namespace/:topic/unload

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().unload(topic);

获取统计信息

You can check the following statistics of a given non-partitioned topic.

  • msgRateIn: The sum of all local and replication publishers’ publish rates (msg/s).

  • msgThroughputIn: The sum of all local and replication publishers’ publish rates (bytes/s).

  • msgRateOut: The sum of all local and replication consumers’ dispatch rates(msg/s).

  • msgThroughputOut: The sum of all local and replication consumers’ dispatch rates (bytes/s).

  • averageMsgSize: The average size (in bytes) of messages published within the last interval.

  • storageSize: The sum of the ledgers’ storage size for this topic. The space used to store the messages for the topic.

  • publishers: The list of all local publishers into the topic. The list ranges from zero to thousands.

    • msgRateIn: The total rate of messages (msg/s) published by this publisher.

    • msgThroughputIn: The total throughput (bytes/s) of the messages published by this publisher.

    • averageMsgSize: The average message size in bytes from this publisher within the last interval.

    • producerId: The internal identifier for this producer on this topic.

    • producerName: The internal identifier for this producer, generated by the client library.

    • address: The IP address and source port for the connection of this producer.

    • connectedSince: The timestamp when this producer is created or reconnected last time.

  • subscriptions: The list of all local subscriptions to the topic.

    • my-subscription: The name of this subscription. It is defined by the client.

      • msgRateOut: The total rate of messages (msg/s) delivered on this subscription.

      • msgThroughputOut: The total throughput (bytes/s) delivered on this subscription.

      • msgBacklog: The number of messages in the subscription backlog.

      • type: The subscription type.

      • msgRateExpired: The rate at which messages were discarded instead of dispatched from this subscription due to TTL.

      • lastExpireTimestamp: The timestamp of the last message expire execution.

      • lastConsumedFlowTimestamp: The timestamp of the last flow command received.

      • lastConsumedTimestamp: The latest timestamp of all the consumed timestamp of the consumers.

      • lastAckedTimestamp: The latest timestamp of all the acked timestamp of the consumers.

      • consumers: The list of connected consumers for this subscription.

        • msgRateOut: The total rate of messages (msg/s) delivered to the consumer.

        • msgThroughputOut: The total throughput (bytes/s) delivered to the consumer.

        • consumerName: The internal identifier for this consumer, generated by the client library.

        • availablePermits: The number of messages that the consumer has space for in the client library’s listen queue. 0 means the client library’s queue is full and receive() isn’t being called. A non-zero value means this consumer is ready for dispatched messages.

        • unackedMessages: The number of unacknowledged messages for the consumer.

        • blockedConsumerOnUnackedMsgs: The flag used to verify if the consumer is blocked due to reaching threshold of the unacknowledged messages.

        • lastConsumedTimestamp: The timestamp when the consumer reads a message the last time.

        • lastAckedTimestamp: The timestamp when the consumer acknowledges a message the last time.

  • replication: This section gives the stats for cross-colo replication of this topic

    • msgRateIn: The total rate (msg/s) of messages received from the remote cluster.

    • msgThroughputIn: The total throughput (bytes/s) received from the remote cluster.

    • msgRateOut: The total rate of messages (msg/s) delivered to the replication-subscriber.

    • msgThroughputOut: The total throughput (bytes/s) delivered to the replication-subscriber.

    • msgRateExpired: The total rate of messages (msg/s) expired.

    • replicationBacklog: The number of messages pending to be replicated to remote cluster.

    • connected: Whether the outbound replicator is connected.

    • replicationDelayInSeconds: How long the oldest message has been waiting to be sent through the connection, if connected is true.

    • inboundConnection: The IP and port of the broker in the remote cluster’s publisher connection to this broker.

    • inboundConnectedSince: The TCP connection being used to publish messages to the remote cluster. 如果没有连接到本地发布者,一分钟后连接将自动关闭。

    • outboundConnection: The address of the outbound replication connection.

    • outboundConnectedSince: The timestamp of establishing outbound connection.

The following is an example of a topic status.

  1. {
  2. "msgRateIn": 4641.528542257553,
  3. "msgThroughputIn": 44663039.74947473,
  4. "msgRateOut": 0,
  5. "msgThroughputOut": 0,
  6. "averageMsgSize": 1232439.816728665,
  7. "storageSize": 135532389160,
  8. "publishers": [
  9. {
  10. "msgRateIn": 57.855383881403576,
  11. "msgThroughputIn": 558994.7078932219,
  12. "averageMsgSize": 613135,
  13. "producerId": 0,
  14. "producerName": null,
  15. "address": null,
  16. "connectedSince": null
  17. }
  18. ],
  19. "subscriptions": {
  20. "my-topic_subscription": {
  21. "msgRateOut": 0,
  22. "msgThroughputOut": 0,
  23. "msgBacklog": 116632,
  24. "type": null,
  25. "msgRateExpired": 36.98245516804671,
  26. "consumers": []
  27. }
  28. },
  29. "replication": {}
  30. }

To get the status of a topic, you can use the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics stats \ persistent://test-tenant/ns1/tp1 \

GET /admin/v2/:schema/:tenant/:namespace/:topic/stats

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getStats(topic);

获取内部统计信息

You can get the detailed statistics of a topic.

  • entriesAddedCounter: Messages published since this broker loaded this topic.

  • numberOfEntries: The total number of messages being tracked.

  • totalSize: The total storage size in bytes of all messages.

  • currentLedgerEntries: The count of messages written to the ledger that is currently open for writing.

  • currentLedgerSize: The size in bytes of messages written to the ledger that is currently open for writing.

  • lastLedgerCreatedTimestamp: The time when the last ledger is created.

  • lastLedgerCreationFailureTimestamp: The time when the last ledger failed.

  • waitingCursorsCount: The number of cursors that are “caught up” and waiting for a new message to be published.

  • pendingAddEntriesCount: The number of messages that complete (asynchronous) write requests.

  • lastConfirmedEntry: The ledgerid:entryid of the last message that is written successfully. If the entryid is -1, then the ledger is open, yet no entries are written.

  • state: The state of this ledger for writing. The state LedgerOpened means that a ledger is open for saving published messages.

  • ledgers: The ordered list of all ledgers for this topic holding messages.

    • ledgerId: The ID of this ledger.

    • entries: The total number of entries belong to this ledger.

    • size: The size of messages written to this ledger (in bytes).

    • offloaded: Whether this ledger is offloaded.

  • compactedLedger: The ledgers holding un-acked messages after topic compaction.

    • ledgerId: The ID of this ledger.

    • entries: The total number of entries belong to this ledger.

    • size: The size of messages written to this ledger (in bytes).

    • offloaded: Whether this ledger is offloaded. The value is false for the compacted topic ledger.

  • cursors: The list of all cursors on this topic. Each subscription in the topic stats has a cursor.

    • markDeletePosition: All messages before the markDeletePosition are acknowledged by the subscriber.

    • readPosition: The latest position of subscriber for reading message.

    • waitingReadOp: This is true when the subscription has read the latest message published to the topic and is waiting for new messages to be published.

    • pendingReadOps: The counter for how many outstanding read requests to the BookKeepers in progress.

    • messagesConsumedCounter: The number of messages this cursor has acked since this broker loaded this topic.

    • cursorLedger: The ledger being used to persistently store the current markDeletePosition.

    • cursorLedgerLastEntry: The last entryid used to persistently store the current markDeletePosition.

    • individuallyDeletedMessages: If acknowledges are being done out of order, the ranges of messages acknowledged between the markDeletePosition and the read-position shows.

    • lastLedgerSwitchTimestamp: The last time the cursor ledger is rolled over.

    • state: The state of the cursor ledger: Open means you have a cursor ledger for saving updates of the markDeletePosition.

The following is an example of the detailed statistics of a topic.

  1. {
  2. "entriesAddedCounter": 20449518,
  3. "numberOfEntries": 3233,
  4. "totalSize": 331482,
  5. "currentLedgerEntries": 3233,
  6. "currentLedgerSize": 331482,
  7. "lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
  8. "lastLedgerCreationFailureTimestamp": null,
  9. "waitingCursorsCount": 1,
  10. "pendingAddEntriesCount": 0,
  11. "lastConfirmedEntry": "324711539:3232",
  12. "state": "LedgerOpened",
  13. "ledgers": [
  14. {
  15. "ledgerId": 324711539,
  16. "entries": 0,
  17. "size": 0,
  18. "offloaded": true
  19. }
  20. ],
  21. "compactedLedger": {
  22. "ledgerId": 324711540,
  23. "entries": 10,
  24. "size": 100,
  25. "offloaded": false
  26. },
  27. "cursors": {
  28. "my-subscription": {
  29. "markDeletePosition": "324711539:3133",
  30. "readPosition": "324711539:3233",
  31. "waitingReadOp": true,
  32. "pendingReadOps": 0,
  33. "messagesConsumedCounter": 20449501,
  34. "cursorLedger": 324702104,
  35. "cursorLedgerLastEntry": 21,
  36. "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
  37. "lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
  38. "state": "Open"
  39. }
  40. }
  41. }

To get the internal status of a topic, you can use the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics stats-internal \ persistent://test-tenant/ns1/tp1 \

GET /admin/v2/:schema/:tenant/:namespace/:topic/internalStats

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getInternalStats(topic);

查看消息

You can peek a number of messages for a specific subscription of a given topic in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics peek-messages \ --count 10 --subscription my-subscription \ persistent://test-tenant/ns1/tp1 \Message ID: 315674752:0Properties: { "X-Pulsar-publish-time" : "2015-07-13 17:40:28.451" }msg-payload

GET /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/position/:messagePosition

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";String subName = "my-subscription";int numMessages = 1;admin.topics().peekMessages(topic, subName, numMessages);

Get message by ID

You can fetch the message with the given ledger ID and entry ID in the following ways.

pulsar-admin

REST API

Java

  1. $ ./bin/pulsar-admin topics get-message-by-id \ persistent://public/default/my-topic \ -l 10 -e 0

GET /admin/v2/:schema/:tenant/:namespace/:topic/ledger/:ledgerId/entry/:entryId

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";long ledgerId = 10;long entryId = 10;admin.topics().getMessageById(topic, ledgerId, entryId);

跳过消息

You can skip a number of messages for a specific subscription of a given topic in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics skip \ --count 10 --subscription my-subscription \ persistent://test-tenant/ns1/tp1 \

POST /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/skip/:numMessages

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";String subName = "my-subscription";int numMessages = 1;admin.topics().skipMessages(topic, subName, numMessages);

跳过所有消息

You can skip all the old messages for a specific subscription of a given topic.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics skip-all \ --subscription my-subscription \ persistent://test-tenant/ns1/tp1 \

POST /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/skip_all

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";String subName = "my-subscription";admin.topics().skipAllMessages(topic, subName);

重置cursor

You can reset a subscription cursor position back to the position which is recorded X minutes before. 实际上通过计算时间和X分钟之前的cursor位置,来重置回到那个位置。 You can reset the cursor in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics reset-cursor \ --subscription my-subscription --time 10 \ persistent://test-tenant/ns1/tp1 \

POST /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/resetcursor/:timestamp

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";String subName = "my-subscription";long timestamp = 2342343L;admin.topics().skipAllMessages(topic, subName, timestamp);

查询topic

You can locate the broker URL which is serving the given topic in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics lookup \ persistent://test-tenant/ns1/tp1 \ "pulsar://broker1.org.com:4480"

GET /lookup/v2/topic/:schema/:tenant:namespace/:topic

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.lookup().lookupDestination(topic);

获取bundle

You can check the range of the bundle which contains given topic in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics bundle-range \ persistent://test-tenant/ns1/tp1 \ "0x00000000_0xffffffff"

GET /lookup/v2/topic/:topic_domain/:tenant/:namespace/:topic/bundle

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.lookup().getBundleRange(topic);

获取订阅

You can check all subscription names for a given topic in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics subscriptions \ persistent://test-tenant/ns1/tp1 \ my-subscription

GET /admin/v2/:schema/:tenant/:namespace/:topic/subscriptions

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getSubscriptions(topic);

取消订阅

When a subscription does not process messages any more, you can unsubscribe it in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics unsubscribe \ --subscription my-subscription \ persistent://test-tenant/ns1/tp1 \

DELETE /admin/v2/namespaces/:tenant/:namespace/:topic/subscription/:subscription

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";String subscriptionName = "my-subscription";admin.topics().deleteSubscription(topic, subscriptionName);

最后一条消息Id

You can get the last committed message ID for a persistent topic. It is available since 2.3.0 release.

pulsar-admin

REST API

Java

  1. pulsar-admin topics last-message-id topic-name

Get /admin/v2/:schema/:tenant/:namespace/:topic/lastMessageId?version=2.7.0

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getLastMessage(topic);

Manage non-partitioned topics

You can use Pulsar admin API to create, delete and check status of non-partitioned topics.

创建

Non-partitioned topics must be explicitly created. When creating a new non-partitioned topic, you need to provide a name for the topic.

By default, 60 seconds after creation, topics are considered inactive and deleted automatically to avoid generating trash data. To disable this feature, set brokerDeleteInactiveTopicsEnabled to false. To change the frequency of checking inactive topics, set brokerDeleteInactiveTopicsFrequencySeconds to a specific value.

For more information about the two parameters, see here.

You can create non-partitioned topics in the following ways.

pulsar-admin

REST API

Java

When you create non-partitioned topics with the create command, you need to specify the topic name as an argument.

  1. $ bin/pulsar-admin topics create \ persistent://my-tenant/my-namespace/my-topic

Note
When you create a non-partitioned topic with the suffix ‘-partition-‘ followed by numeric value like ‘xyz-topic-partition-x’ for the topic name, if a partitioned topic with same suffix ‘xyz-topic-partition-y’ exists, then the numeric value(x) for the non-partitioned topic must be larger than the number of partitions(y) of the partitioned topic. Otherwise, you cannot create such a non-partitioned topic.

PUT /admin/v2/:schema/:tenant/:namespace/:topic

  1. String topicName = "persistent://my-tenant/my-namespace/my-topic";admin.topics().createNonPartitionedTopic(topicName);

删除

You can delete non-partitioned topics in the following ways.

pulsar-admin

REST API

Java

  1. $ bin/pulsar-admin topics delete \ persistent://my-tenant/my-namespace/my-topic

DELETE /admin/v2/:schema/:tenant/:namespace/:topic

  1. admin.topics().delete(topic);

获取资源列表

You can get the list of topics under a given namespace in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics list tenant/namespacepersistent://tenant/namespace/topic1persistent://tenant/namespace/topic2

GET /admin/v2/:schema/:tenant/:namespace

  1. admin.topics().getList(namespace);

统计信息

You can check the current statistics of a given topic. The following is an example. For description of each stats, refer to get stats.

  1. {
  2. "msgRateIn": 4641.528542257553,
  3. "msgThroughputIn": 44663039.74947473,
  4. "msgRateOut": 0,
  5. "msgThroughputOut": 0,
  6. "averageMsgSize": 1232439.816728665,
  7. "storageSize": 135532389160,
  8. "publishers": [
  9. {
  10. "msgRateIn": 57.855383881403576,
  11. "msgThroughputIn": 558994.7078932219,
  12. "averageMsgSize": 613135,
  13. "producerId": 0,
  14. "producerName": null,
  15. "address": null,
  16. "connectedSince": null
  17. }
  18. ],
  19. "subscriptions": {
  20. "my-topic_subscription": {
  21. "msgRateOut": 0,
  22. "msgThroughputOut": 0,
  23. "msgBacklog": 116632,
  24. "type": null,
  25. "msgRateExpired": 36.98245516804671,
  26. "consumers": []
  27. }
  28. },
  29. "replication": {}
  30. }

You can check the current statistics of a given topic and its connected producers and consumers in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics stats \ persistent://test-tenant/namespace/topic \ --get-precise-backlog

GET /admin/v2/:schema/:tenant/:namespace/:topic/stats

  1. admin.topics().getStats(topic, false /* is precise backlog */);

管理分区主题

You can use Pulsar admin API to create, update, delete and check status of partitioned topics.

创建

Partitioned topics must be explicitly created. When creating a new partitioned topic, you need to provide a name and the number of partitions for the topic.

By default, 60 seconds after creation, topics are considered inactive and deleted automatically to avoid generating trash data. To disable this feature, set brokerDeleteInactiveTopicsEnabled to false. To change the frequency of checking inactive topics, set brokerDeleteInactiveTopicsFrequencySeconds to a specific value.

For more information about the two parameters, see here.

You can create partitioned topics in the following ways.

pulsar-admin

REST API

Java

When you create partitioned topics with the create-partitioned-topic command, you need to specify the topic name as an argument and the number of partitions using the -p or --partitions flag.

  1. $ bin/pulsar-admin topics create-partitioned-topic \ persistent://my-tenant/my-namespace/my-topic \ --partitions 4

Note
If a non-partitioned topic with the suffix ‘-partition-‘ followed by a numeric value like ‘xyz-topic-partition-10’, you can not create a partitioned topic with name ‘xyz-topic’, because the partitions of the partitioned topic could override the existing non-partitioned topic. To create such partitioned topic, you have to delete that non-partitioned topic first.

PUT /admin/v2/:schema/:tenant/:namespace/:topic/partitions

  1. String topicName = "persistent://my-tenant/my-namespace/my-topic";int numPartitions = 4;admin.topics().createPartitionedTopic(topicName, numPartitions);

Create missed partitions

When topic auto-creation is disabled, and you have a partitioned topic without any partitions, you can use the create-missed-partitions command to create partitions for the topic.

pulsar-admin

REST API

Java

You can create missed partitions with the create-missed-partitions command and specify the topic name as an argument.

  1. $ bin/pulsar-admin topics create-missed-partitions \ persistent://my-tenant/my-namespace/my-topic \

POST /admin/v2/:schema/:tenant/:namespace/:topic

  1. String topicName = "persistent://my-tenant/my-namespace/my-topic";admin.topics().createMissedPartitions(topicName);

获取元数据

Partitioned topics are associated with metadata, you can view it as a JSON object. The following metadata field is available.

字段Description
分区The number of partitions into which the topic is divided.

pulsar-admin

REST API

Java

You can check the number of partitions in a partitioned topic with the get-partitioned-topic-metadata subcommand.

  1. $ pulsar-admin topics get-partitioned-topic-metadata \ persistent://my-tenant/my-namespace/my-topic{ "partitions": 4}

GET /admin/v2/:schema/:tenant/:namespace/:topic/partitions

  1. String topicName = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getPartitionedTopicMetadata(topicName);

更新

You can update the number of partitions for an existing partitioned topic if the topic is non-global. However, you can only add the partition number. Decrementing the number of partitions would delete the topic, which is not supported in Pulsar.

Producers and consumers can find the newly created partitions automatically.

pulsar-admin

REST API

Java

You can update partitioned topics with the update-partitioned-topic command.

  1. $ pulsar-admin topics update-partitioned-topic \ persistent://my-tenant/my-namespace/my-topic \ --partitions 8

POST /admin/v2/:schema/:tenant/:cluster/:namespace/:destination/partitions

  1. admin.topics().updatePartitionedTopic(topic, numPartitions);

删除

You can delete partitioned topics with the delete-partitioned-topic command, REST API and Java.

pulsar-admin

REST API

Java

  1. $ bin/pulsar-admin topics delete-partitioned-topic \ persistent://my-tenant/my-namespace/my-topic

DELETE /admin/v2/:schema/:topic/:namespace/:destination/partitions

  1. admin.topics().delete(topic);

获取资源列表

You can get the list of topics under a given namespace in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics list tenant/namespacepersistent://tenant/namespace/topic1persistent://tenant/namespace/topic2

GET /admin/v2/:schema/:tenant/:namespace

  1. admin.topics().getList(namespace);

统计信息

You can check the current statistics of a given partitioned topic. The following is an example. For description of each stats, refer to get stats.

  1. {
  2. "msgRateIn" : 999.992947159793,
  3. "msgThroughputIn" : 1070918.4635439808,
  4. "msgRateOut" : 0.0,
  5. "msgThroughputOut" : 0.0,
  6. "bytesInCounter" : 270318763,
  7. "msgInCounter" : 252489,
  8. "bytesOutCounter" : 0,
  9. "msgOutCounter" : 0,
  10. "averageMsgSize" : 1070.926056966454,
  11. "msgChunkPublished" : false,
  12. "storageSize" : 270316646,
  13. "backlogSize" : 200921133,
  14. "publishers" : [ {
  15. "msgRateIn" : 999.992947159793,
  16. "msgThroughputIn" : 1070918.4635439808,
  17. "averageMsgSize" : 1070.3333333333333,
  18. "chunkedMessageRate" : 0.0,
  19. "producerId" : 0
  20. } ],
  21. "subscriptions" : {
  22. "test" : {
  23. "msgRateOut" : 0.0,
  24. "msgThroughputOut" : 0.0,
  25. "bytesOutCounter" : 0,
  26. "msgOutCounter" : 0,
  27. "msgRateRedeliver" : 0.0,
  28. "chuckedMessageRate" : 0,
  29. "msgBacklog" : 144318,
  30. "msgBacklogNoDelayed" : 144318,
  31. "blockedSubscriptionOnUnackedMsgs" : false,
  32. "msgDelayed" : 0,
  33. "unackedMessages" : 0,
  34. "msgRateExpired" : 0.0,
  35. "lastExpireTimestamp" : 0,
  36. "lastConsumedFlowTimestamp" : 0,
  37. "lastConsumedTimestamp" : 0,
  38. "lastAckedTimestamp" : 0,
  39. "consumers" : [ ],
  40. "isDurable" : true,
  41. "isReplicated" : false
  42. }
  43. },
  44. "replication" : { },
  45. "metadata" : {
  46. "partitions" : 3
  47. },
  48. "partitions" : { }
  49. }

You can check the current statistics of a given partitioned topic and its connected producers and consumers in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics partitioned-stats \ persistent://test-tenant/namespace/topic \ --per-partition

GET /admin/v2/:schema/:tenant/:namespace/:topic/partitioned-stats

  1. admin.topics().getPartitionedStats(topic, true /* per partition */, false /* is precise backlog */);

Internal stats

You can check the detailed statistics of a topic. The following is an example. For description of each stats, refer to get internal stats.

  1. {
  2. "entriesAddedCounter": 20449518,
  3. "numberOfEntries": 3233,
  4. "totalSize": 331482,
  5. "currentLedgerEntries": 3233,
  6. "currentLedgerSize": 331482,
  7. "lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
  8. "lastLedgerCreationFailureTimestamp": null,
  9. "waitingCursorsCount": 1,
  10. "pendingAddEntriesCount": 0,
  11. "lastConfirmedEntry": "324711539:3232",
  12. "state": "LedgerOpened",
  13. "ledgers": [
  14. {
  15. "ledgerId": 324711539,
  16. "entries": 0,
  17. "size": 0
  18. }
  19. ],
  20. "cursors": {
  21. "my-subscription": {
  22. "markDeletePosition": "324711539:3133",
  23. "readPosition": "324711539:3233",
  24. "waitingReadOp": true,
  25. "pendingReadOps": 0,
  26. "messagesConsumedCounter": 20449501,
  27. "cursorLedger": 324702104,
  28. "cursorLedgerLastEntry": 21,
  29. "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
  30. "lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
  31. "state": "Open"
  32. }
  33. }
  34. }

You can get the internal stats for the partitioned topic in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics stats-internal \ persistent://test-tenant/namespace/topic

GET /admin/v2/:schema/:tenant/:namespace/:topic/internalStats

  1. admin.topics().getInternalStats(topic);

发布到分区主题

By default, Pulsar topics are served by a single broker, which limits the maximum throughput of a topic. Partitioned topics can span multiple brokers and thus allow for higher throughput.

You can publish to partitioned topics using Pulsar client libraries. When publishing to partitioned topics, you must specify a routing mode. If you do not specify any routing mode when you create a new producer, the round robin routing mode is used.

Routing mode

You can specify the routing mode in the ProducerConfiguration object that you use to configure your producer. The routing mode determines which partition(internal topic) that each message should be published to.

The following MessageRoutingMode options are available.

发送模式Description
RoundRobinPartitionIf no key is provided, the producer publishes messages across all partitions in round-robin policy to achieve the maximum throughput. Round-robin is not done per individual message, round-robin is set to the same boundary of batching delay to ensure that batching is effective. If a key is specified on the message, the partitioned producer hashes the key and assigns message to a particular partition. 这是默认的模式。
SinglePartitionIf no key is provided, the producer picks a single partition randomly and publishes all messages into that partition. If a key is specified on the message, the partitioned producer hashes the key and assigns message to a particular partition.
CustomPartitionUse custom message router implementation that is called to determine the partition for a particular message. You can create a custom routing mode by using the Java client and implementing the MessageRouter interface.

如下所示:

  1. String pulsarBrokerRootUrl = "pulsar://localhost:6650";
  2. String topic = "persistent://my-tenant/my-namespace/my-topic";
  3. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
  4. Producer<byte[]> producer = pulsarClient.newProducer()
  5. .topic(topic)
  6. .messageRoutingMode(MessageRoutingMode.SinglePartition)
  7. .create();
  8. producer.send("Partitioned topic message".getBytes());

Custom message router

要使用自定义消息路由器,您需要提供MessageRouter 接口的实现,该接口只有一个choosePartition方法:

  1. public interface MessageRouter extends Serializable {
  2. int choosePartition(Message msg);
  3. }

下面的路由模式表示所有的消息都会被发送到分区10:

  1. public class AlwaysTenRouter implements MessageRouter {
  2. public int choosePartition(Message msg) {
  3. return 10;
  4. }
  5. }

With that implementation, you can send

  1. String pulsarBrokerRootUrl = "pulsar://localhost:6650";
  2. String topic = "persistent://my-tenant/my-cluster-my-namespace/my-topic";
  3. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
  4. Producer<byte[]> producer = pulsarClient.newProducer()
  5. .topic(topic)
  6. .messageRouter(new AlwaysTenRouter())
  7. .create();
  8. producer.send("Partitioned topic message".getBytes());

使用 Key 时如何选择分区

If a message has a key, it supersedes the round robin routing policy. The following example illustrates how to choose the partition when using a key.

  1. // 如果消息存在key,轮询路由策略将被替换
  2. if (msg.hasKey()) {
  3. return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());
  4. }
  5. if (isBatchingEnabled) { // 如果开启批处理,请在 `partitionSwitchMs` 边界上选择分区。
  6. long currentMs = clock.millis();
  7. return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartiations());
  8. other.
  9. return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartiations());
  10. }