Get started

This guide walks you through the quickest way to get started with the following methods to manage topics.

  • pulsar-admin
  • REST API
  • Java

pulsar-admin CLI is a command-line tool and is available in the bin folder of your Pulsar installation.

REST API belongs to HTTP calls, which are made against the admin APIs provided by brokers. In addition, both the Java admin API and pulsar-admin CLI use the REST API.

Java admin API is a programmable interface written in Java.

Check the detailed steps below.

  • pulsar-admin
  • REST API
  • Java

To manage topics using pulsar-admin CLI, complte the following steps.

  1. Set the service URL.

  2. Create a partitioned topic.

  3. Update the number of a partition.

  4. Produce messages to the topic.

  5. Check the stats of the topic.

  6. Delete the topic.

Prerequisites

  • Install and start Pulsar standalone. This tutorial runs Pulsar 2.11 as an example.

Steps

Step 1: Set the service URLs to point to the broker service in client.conf.

  1. webServiceUrl=http://localhost:8080/
  2. brokerServiceUrl=pulsar://localhost:6650/

Step 2: Create a persistent topic named test-topic-1 with 6 partitions.

Input

  1. bin/pulsar-admin topics create-partitioned-topic \
  2. persistent://public/default/test-topic-1 \
  3. --partitions 6

Output

There is no output. You can check the status of the topic in Step 5.

Step 3: Update the number of the partition to 8.

Input

  1. bin/pulsar-admin topics update-partitioned-topic \
  2. persistent://public/default/test-topic-1 \
  3. --partitions 8

Output

There is no output. You can check the number of partitions in Step 5.

Step 4: Produce some messages to the partitioned topic test-topic-1.

Input

  1. bin/pulsar-perf produce -u pulsar://localhost:6650 -r 1000 -i 1000 persistent://public/default/test-topic-1

Output

  1. 2023-03-07T15:33:56,832+0800 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Starting Pulsar perf producer with config: {
  2. "confFile" : "/Users/yu/apache-pulsar-2.11.0/conf/client.conf",
  3. "serviceURL" : "pulsar://localhost:6650",
  4. "authPluginClassName" : "",
  5. "authParams" : "",
  6. "tlsTrustCertsFilePath" : "",
  7. "tlsAllowInsecureConnection" : false,
  8. "tlsHostnameVerificationEnable" : false,
  9. "maxConnections" : 1,
  10. "statsIntervalSeconds" : 1000,
  11. "ioThreads" : 1,
  12. "enableBusyWait" : false,
  13. "listenerName" : null,
  14. "listenerThreads" : 1,
  15. "maxLookupRequest" : 50000,
  16. "topics" : [ "persistent://public/default/test-topic-1" ],
  17. "numTestThreads" : 1,
  18. "msgRate" : 1000,
  19. "msgSize" : 1024,
  20. "numTopics" : 1,
  21. "numProducers" : 1,
  22. "separator" : "-",
  23. "sendTimeout" : 0,
  24. "producerName" : null,
  25. "adminURL" : "http://localhost:8080/",
  26. ...
  27. 2023-03-07T15:35:03,769+0800 [Thread-0] INFO org.apache.pulsar.testclient.PerformanceProducer - Aggregated latency stats --- Latency: mean: 8.931 ms - med: 3.775 - 95pct: 32.144 - 99pct: 98.432 - 99.9pct: 216.088 - 99.99pct: 304.807 - 99.999pct: 349.391 - Max: 351.235

Step 5: Check the internal stats of the partitioned topic test-topic-1.

Input

  1. bin/pulsar-admin topics partitioned-stats-internal \
  2. persistent://public/default/test-topic-1

Output

Below is a part of the output. For detailed explanations of topic stats, see Pulsar statistics.

  1. {
  2. "metadata" : {
  3. "partitions" : 8
  4. },
  5. "partitions" : {
  6. "persistent://public/default/test-topic-1-partition-1" : {
  7. "entriesAddedCounter" : 4213,
  8. "numberOfEntries" : 4213,
  9. "totalSize" : 8817693,
  10. "currentLedgerEntries" : 4212,
  11. "currentLedgerSize" : 8806289,
  12. "lastLedgerCreatedTimestamp" : "2023-03-07T15:33:59.367+08:00",
  13. "waitingCursorsCount" : 0,
  14. "pendingAddEntriesCount" : 0,
  15. "lastConfirmedEntry" : "65:4211",
  16. "state" : "LedgerOpened",
  17. "ledgers" : [ {
  18. "ledgerId" : 49,
  19. "entries" : 1,
  20. "size" : 11404,
  21. "offloaded" : false,
  22. "underReplicated" : false
  23. }, {
  24. "ledgerId" : 65,
  25. "entries" : 0,
  26. "size" : 0,
  27. "offloaded" : false,
  28. "underReplicated" : false
  29. } ],
  30. "cursors" : {
  31. "test-subscriptio-1" : {
  32. "markDeletePosition" : "49:-1",
  33. "readPosition" : "49:0",
  34. "waitingReadOp" : false,
  35. "pendingReadOps" : 0,
  36. "messagesConsumedCounter" : 0,
  37. "cursorLedger" : -1,
  38. "cursorLedgerLastEntry" : -1,
  39. "individuallyDeletedMessages" : "[]",
  40. "lastLedgerSwitchTimestamp" : "2023-03-06T16:41:32.801+08:00",
  41. "state" : "NoLedger",
  42. "numberOfEntriesSinceFirstNotAckedMessage" : 1,
  43. "totalNonContiguousDeletedMessagesRange" : 0,
  44. "subscriptionHavePendingRead" : false,
  45. "subscriptionHavePendingReplayRead" : false,
  46. "properties" : { }
  47. },
  48. "test-subscription-1" : {
  49. "markDeletePosition" : "49:-1",
  50. "readPosition" : "49:0",
  51. "waitingReadOp" : false,
  52. "pendingReadOps" : 0,
  53. "messagesConsumedCounter" : 0,
  54. "cursorLedger" : -1,
  55. "cursorLedgerLastEntry" : -1,
  56. "individuallyDeletedMessages" : "[]",
  57. "lastLedgerSwitchTimestamp" : "2023-03-06T16:41:32.801+08:00",
  58. "state" : "NoLedger",
  59. "numberOfEntriesSinceFirstNotAckedMessage" : 1,
  60. "totalNonContiguousDeletedMessagesRange" : 0,
  61. "subscriptionHavePendingRead" : false,
  62. "subscriptionHavePendingReplayRead" : false,
  63. "properties" : { }
  64. }
  65. },
  66. "schemaLedgers" : [ ],
  67. "compactedLedger" : {
  68. "ledgerId" : -1,
  69. "entries" : -1,
  70. "size" : -1,
  71. "offloaded" : false,
  72. "underReplicated" : false
  73. }
  74. },
  75. ...

Step 6: Delete the topic test-topic-1.

Input

  1. bin/pulsar-admin topics delete-partitioned-topic persistent://public/default/test-topic-1

Output

There is no output. You can verify whether the test-topic-1 exists or not using the following command.

Input

List topics in public/default namespace.

  1. bin/pulsar-admin topics list public/default

To manage topics using REST API, complete the following steps.

  1. Create a partitioned topic

  2. Update the number of a partition.

  3. Produce messages to the topic.

  4. Check the stats of the topic.

  5. Delete the topic.

Prerequisites

  • Install and start Pulsar standalone. This tutorial runs Pulsar 2.11 as an example.

Steps

Step 1: Create a persistent topic named test-topic-2 with 4 partitions.

Input

  1. curl -X PUT http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitions -H 'Content-Type: application/json' -d "4"

Output

There is no output. You can check the topic in Step 4.

Step 2: Update the number of the partition to 5.

Input

  1. curl -X POST http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitions -H 'Content-Type: application/json' -d "5"

Output

There is no output. You can check the status of the topic in Step 4.

Step 3: Produce some messages to the partitioned topic test-topic-2.

Input

  1. bin/pulsar-perf produce -u pulsar://localhost:6650 -r 1000 -i 1000 persistent://public/default/test-topic-2

Output

  1. 2023-03-08T15:47:06,268+0800 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Starting Pulsar perf producer with config: {
  2. "confFile" : "/Users/yu/apache-pulsar-2.11.0/conf/client.conf",
  3. "serviceURL" : "pulsar://localhost:6650",
  4. "authPluginClassName" : "",
  5. "authParams" : "",
  6. "tlsTrustCertsFilePath" : "",
  7. "tlsAllowInsecureConnection" : false,
  8. "tlsHostnameVerificationEnable" : false,
  9. "maxConnections" : 1,
  10. "statsIntervalSeconds" : 1000,
  11. "ioThreads" : 1,
  12. "enableBusyWait" : false,
  13. "listenerName" : null,
  14. "listenerThreads" : 1,
  15. "maxLookupRequest" : 50000,
  16. "topics" : [ "persistent://public/default/test-topic-2" ],
  17. "numTestThreads" : 1,
  18. "msgRate" : 1000,
  19. "msgSize" : 1024,
  20. "numTopics" : 1,
  21. "numProducers" : 1,
  22. "separator" : "-",
  23. "sendTimeout" : 0,
  24. "producerName" : null,
  25. "adminURL" : "http://localhost:8080/",
  26. "deprecatedAuthPluginClassName" : null,
  27. "maxOutstanding" : 0,
  28. "maxPendingMessagesAcrossPartitions" : 0,
  29. "partitions" : null,
  30. "numMessages" : 0,
  31. "compression" : "NONE",
  32. "payloadFilename" : null,
  33. "payloadDelimiter" : "\\n",
  34. "batchTimeMillis" : 1.0,
  35. "batchMaxMessages" : 1000,
  36. "batchMaxBytes" : 4194304,
  37. "testTime" : 0,
  38. "warmupTimeSeconds" : 1.0,
  39. "encKeyName" : null,
  40. "encKeyFile" : null,
  41. "delay" : 0,
  42. "exitOnFailure" : false,
  43. "messageKeyGenerationMode" : null,
  44. "producerAccessMode" : "Shared",
  45. "formatPayload" : false,
  46. "formatterClass" : "org.apache.pulsar.testclient.DefaultMessageFormatter",
  47. "transactionTimeout" : 10,
  48. "numMessagesPerTransaction" : 50,
  49. "isEnableTransaction" : false,
  50. "isAbortTransaction" : false,
  51. "histogramFile" : null
  52. }
  53. ...
  54. 2023-03-08T15:53:28,178+0800 [Thread-0] INFO org.apache.pulsar.testclient.PerformanceProducer - Aggregated latency stats --- Latency: mean: 4.481 ms - med: 2.918 - 95pct: 10.710 - 99pct: 38.928 - 99.9pct: 112.689 - 99.99pct: 154.241 - 99.999pct: 193.249 - Max: 241.717

Step 4: Check the internal stats of the topic test-topic-2.

Input

  1. curl -X GET http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitioned-internalStats

Output

For detailed explanations of topic stats, see Pulsar statistics.

  1. {"metadata":{"partitions":5},"partitions":{"persistent://public/default/test-topic-2-partition-3":{"entriesAddedCounter":47087,"numberOfEntries":47087,"totalSize":80406959,"currentLedgerEntries":47087,"currentLedgerSize":80406959,"lastLedgerCreatedTimestamp":"2023-03-08T15:47:07.273+08:00","waitingCursorsCount":0,"pendingAddEntriesCount":0,"lastConfirmedEntry":"117:47086","state":"LedgerOpened","ledgers":[{"ledgerId":117,"entries":0,"size":0,"offloaded":false,"underReplicated":false}],"cursors":{},"schemaLedgers":[],"compactedLedger":{"ledgerId":-1,"entries":-1,"size":-1,"offloaded":false,"underReplicated":false}},"persistent://public/default/test-topic-2-partition-2":{"entriesAddedCounter":46995,"numberOfEntries":46995,"totalSize":80445417,"currentLedgerEntries":46995,"currentLedgerSize":80445417,"lastLedgerCreatedTimestamp":"2023-03-08T15:47:07.43+08:00","waitingCursorsCount":0,"pendingAddEntriesCount":0,"lastConfirmedEntry":"118:46994","state":"LedgerOpened","ledgers":[{"ledgerId":118,"entries":0,"size":0,"offloaded":false,"underReplicated":false}],...

Step 5: Delete the topic test-topic-2.

Input

  1. curl -X DELETE http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitions

Output

There is no output. You can verify whether the test-topic-2 exists or not using the following command.

Input

List topics in public/default namespace.

  1. curl -X GET http://localhost:8080/admin/v2/persistent/public/default

To manage topics using Java admin API, complete following steps.

  1. Initiate a Pulsar Java client.

  2. Create a partitioned topic

  3. Update the number of a partition.

  4. Produce messages to the topic.

  5. Check the stats of the topic.

  6. Delete the topic.

Prerequisites

  • Prepare a Java project and add the following dependency to your POM file.

    1. <dependency>
    2. <groupId>org.apache.pulsar</groupId>
    3. <artifactId>pulsar-client-admin</artifactId>
    4. <version>3.3.2</version>
    5. </dependency>

Steps

Step 1: Initiate a Pulsar Java client in your Java project.

Input

  1. String url = "http://localhost:8080";
  2. PulsarAdmin admin = PulsarAdmin.builder()
  3. .serviceHttpUrl(url)
  4. .build();

Step 2: Create a partitioned topic test-topic-1 with 4 partitions.

Input

  1. admin.topics().createPartitionedTopic("persistent://public/default/test-topic-1", 4);

Step 3: Update the number of the partition to 5.

Input

  1. admin.topics().updatePartitionedTopic("test-topic-1", 5);

Step 4: Produce some messages to the topic test-topic-1.

Input

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650")
  3. .build();
  4. Producer<String> producer = client.newProducer(Schema.STRING)
  5. .topic("test-topic-1")
  6. .blockIfQueueFull(true)
  7. .create();
  8. for (int i = 0; i < 100; ++i) {
  9. producer.newMessage().value("test").send();
  10. }
  11. producer.close();
  12. client.close();

Step 5: Check the stats of the topic test-topic-1.

Input

  1. PartitionedTopicStats stats = admin.topics().getPartitionedStats("persistent://public/default/test-topic-1",false);
  2. System.out.println(stats.getMsgInCounter());

Output

  1. 100

Step 6: Delete the topic test-topic-1.

Input

  1. admin.topics().deletePartitionedTopic("test-topic-1");