Pulsar REST

Pulsar not only provides REST endpoints to manage resources in Pulsar clusters, but also provides methods to query the state for those resources. In addition, Pulsar REST provides a simple way to interact with Pulsar without using client libraries, which is convenient for applications to use HTTP to interact with Pulsar.

Connection

To connect to Pulsar, you need to specify a URL.

  • Produce messages to non-partitioned or partitioned topics

    1. brokerUrl:{8080/8081}/topics/{persistent/non-persistent}/{my-tenant}/{my-namespace}/{my-topic}
  • Produce messages to specific partitions of partitioned topics

    1. brokerUrl:{8080/8081}/topics/{persistent/non-persistent}/{my-tenant}/{my-namespace}/{my-topic}/partitions/{partition-number}

Producer

Currently, you can produce messages to the following destinations with tools like cURL or Postman via REST.

  • Non-partitioned or partitioned topics

  • Specific partitions of partitioned topics

REST - 图1note

You can only produce messages to topics that already exist in Pulsar via REST.

Consuming and reading messages via REST will be supported in the future.

Message

  • Below is the structure of a request payload.

    ParameterRequired?Description
    schemaVersionNoSchema version of existing schema used for this message

    You need provide one of the following:

    - schemaVersion
    - keySchema/valueSchema

    If both of them are provided, then schemaVersion is used
    keySchema/valueSchemaNoKey schema / Value schema used for this message
    producerNameNoProducer name
    Messages[] SingleMessageYesMessages to be sent
  • Below is the structure of a message.

    ParameterRequired?TypeDescription
    payloadYesStringActual message payload

    Messages are sent in strings and encoded with given schemas on the server side
    propertiesNoMap<String, String>Custom properties
    keyNoStringPartition key
    replicationClustersNoList<String>Clusters to which messages replicate
    eventTimeNoStringMessage event time
    sequenceIdNolongMessage sequence ID
    disableReplicationNobooleanWhether to disable replication of messages
    deliverAtNolongDeliver messages only at or after specified absolute timestamp
    deliverAfterMsNolongDeliver messages only after specified relative delay (in milliseconds)

Schema

  • Currently, Primitive, Avro, JSON, and KeyValue schemas are supported.

  • For Primitive, Avro and JSON schemas, schemas should be provided as the full schema encoded as a string.

  • If the schema is not set, messages are encoded with string schema.

Example

Below is an example of sending messages to topics using JSON schema via REST.

Assume that you send messages representing the following class.

  1. class Seller {
  2. public String state;
  3. public String street;
  4. public long zipCode;
  5. }
  6. class PC {
  7. public String brand;
  8. public String model;
  9. public int year;
  10. public GPU gpu;
  11. public Seller seller;
  12. }

Send messages to topics with JSON schema using the command below.

  1. curl --location --request POST 'brokerUrl:{8080/8081}/topics/{persistent/non-persistent}/{my-tenant}/{my-namespace}/{my-topic}' \
  2. --header 'Content-Type: application/json' \
  3. --data-raw '{
  4. "valueSchema": "{\"name\":\"\",\"schema\":\"eyJ0eXBlIjoicmVjb3JkIiwibmFtZSI6IlBDIiwibmFtZXNwYWNlIjoib3JnLmFwYWNoZS5wdWxzYXIuYnJva2VyLmFkbWluLlRvcGljc1Rlc3QiLCJmaWVsZHMiOlt7Im5hbWUiOiJicmFuZCIsInR5cGUiOlsibnVsbCIsInN0cmluZyJdLCJkZWZhdWx0IjpudWxsfSx7Im5hbWUiOiJncHUiLCJ0eXBlIjpbIm51bGwiLHsidHlwZSI6ImVudW0iLCJuYW1lIjoiR1BVIiwic3ltYm9scyI6WyJBTUQiLCJOVklESUEiXX1dLCJkZWZhdWx0IjpudWxsfSx7Im5hbWUiOiJtb2RlbCIsInR5cGUiOlsibnVsbCIsInN0cmluZyJdLCJkZWZhdWx0IjpudWxsfSx7Im5hbWUiOiJzZWxsZXIiLCJ0eXBlIjpbIm51bGwiLHsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJTZWxsZXIiLCJmaWVsZHMiOlt7Im5hbWUiOiJzdGF0ZSIsInR5cGUiOlsibnVsbCIsInN0cmluZyJdLCJkZWZhdWx0IjpudWxsfSx7Im5hbWUiOiJzdHJlZXQiLCJ0eXBlIjpbIm51bGwiLCJzdHJpbmciXSwiZGVmYXVsdCI6bnVsbH0seyJuYW1lIjoiemlwQ29kZSIsInR5cGUiOiJsb25nIn1dfV0sImRlZmF1bHQiOm51bGx9LHsibmFtZSI6InllYXIiLCJ0eXBlIjoiaW50In1dfQ==\",\"type\":\"JSON\",\"properties\":{\"__jsr310ConversionEnabled\":\"false\",\"__alwaysAllowNull\":\"true\"},\"schemaDefinition\":\"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"PC\\\",\\\"namespace\\\":\\\"org.apache.pulsar.broker.admin.TopicsTest\\\",\\\"fields\\\":[{\\\"name\\\":\\\"brand\\\",\\\"type\\\":[\\\"null\\\",\\\"string\\\"],\\\"default\\\":null},{\\\"name\\\":\\\"gpu\\\",\\\"type\\\":[\\\"null\\\",{\\\"type\\\":\\\"enum\\\",\\\"name\\\":\\\"GPU\\\",\\\"symbols\\\":[\\\"AMD\\\",\\\"NVIDIA\\\"]}],\\\"default\\\":null},{\\\"name\\\":\\\"model\\\",\\\"type\\\":[\\\"null\\\",\\\"string\\\"],\\\"default\\\":null},{\\\"name\\\":\\\"seller\\\",\\\"type\\\":[\\\"null\\\",{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Seller\\\",\\\"fields\\\":[{\\\"name\\\":\\\"state\\\",\\\"type\\\":[\\\"null\\\",\\\"string\\\"],\\\"default\\\":null},{\\\"name\\\":\\\"street\\\",\\\"type\\\":[\\\"null\\\",\\\"string\\\"],\\\"default\\\":null},{\\\"name\\\":\\\"zipCode\\\",\\\"type\\\":\\\"long\\\"}]}],\\\"default\\\":null},{\\\"name\\\":\\\"year\\\",\\\"type\\\":\\\"int\\\"}]}\"}",
  5. // Schema data is just the base 64 encoded schemaDefinition.
  6. "producerName": "rest-producer",
  7. "messages": [
  8. {
  9. "key":"my-key",
  10. "payload":"{\"brand\":\"dell\",\"model\":\"alienware\",\"year\":2021,\"gpu\":\"AMD\",\"seller\":{\"state\":\"WA\",\"street\":\"main street\",\"zipCode\":98004}}",
  11. "eventTime":1603045262772,
  12. "sequenceId":1
  13. },
  14. {
  15. "key":"my-key",
  16. "payload":"{\"brand\":\"asus\",\"model\":\"rog\",\"year\":2020,\"gpu\":\"NVIDIA\",\"seller\":{\"state\":\"CA\",\"street\":\"back street\",\"zipCode\":90232}}",
  17. "eventTime":1603045262772,
  18. "sequenceId":2
  19. }
  20. ]
  21. }
  22. `
  23. // Sample message