Manage Schemas

Schemas - 图1tip

This page only shows some frequently used operations.

  • For the latest and complete information about Pulsar admin, including commands, flags, descriptions, and more, see Pulsar admin docs.

  • For the latest and complete information about REST API, including parameters, responses, samples, and more, see REST API doc.

  • For the latest and complete information about Java admin API, including classes, methods, descriptions, and more, see Java admin API doc.

Manage schema

Upload a schema

To upload (register) a new schema for a topic, you can use one of the following methods.

  • Admin CLI
  • REST API
  • Java

Use the upload subcommand.

  1. pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>

The schema-definition-file is in JSON format.

  1. {
  2. "type": "<schema-type>",
  3. "schema": "<an-utf8-encoded-string-of-schema-definition-data>",
  4. "properties": {} // the properties associated with the schema
  5. }

Send a POST request to the endpoint documented here: POST /admin/v2/schemas/:tenant/:namespace/:topic/schema/SchemasResource_postSchema

Below is an example with CURL with a payload stored on the schema.json file, Pulsar broker running on localhost and the topic my-tenant/my-ns/my-topic:

  1. curl -X POST -H 'Content-Type: application/json' -d @schema.json http://localhost:8080/admin/v2/schemas/my-tenant/my-ns/my-topic/schema

The post payload is in JSON format.

  1. {
  2. "type": "<schema-type>",
  3. "schema": "<an-utf8-encoded-string-of-schema-definition-data>",
  4. "properties": {} // the properties associated with the schema
  5. }

The method on PulsarAdmin client is:

  1. void createSchema(String topic, PostSchemaPayload schemaPayload)

Here is an example of PostSchemaPayload:

  1. PulsarAdmin admin = …;
  2. PostSchemaPayload payload = new PostSchemaPayload();
  3. payload.setType("INT8");
  4. payload.setSchema("");
  5. admin.createSchema("my-tenant/my-ns/my-topic", payload);

If the schema is a primitive schema, the schema field must be blank. If the schema is a struct schema, this field must be a JSON string of the Avro schema definition.

The payload includes the following fields:

FieldDescription
type
  • Allowed values for primitive-type schemas are listed on the following page: Primitive types
  • Allowed values for struct-type schemas are AVRO, PROTOBUF, PROTOBUF_NATIVE and JSON.
  • schemaThe schema definition data, which is encoded in UTF 8 charset.
  • If the schema type is AVRO, PROTOBUF or JSON schema, this field should be an Avro schema definition in JSON format.
  • If the schema type is PROTOBUF_NATIVE schema, this field should contain a Protobuf descriptor.
  • Otherwise, this field should be blank.
  • propertiesThe additional properties associated with the schema.

    The following is an example for a JSON schema.

    Example

    1. {
    2. "type": "JSON",
    3. "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file3\",\"type\":[\"string\",\"null\"],\"default\":\"dfdf\"}]}",
    4. "properties": {}
    5. }

    Get the latest schema

    To get the latest schema for a topic, you can use one of the following methods.

    • Admin CLI
    • REST API
    • Java

    Use the get subcommand.

    1. pulsar-admin schemas get <topic-name>

    Example output:

    1. {
    2. "version": 0,
    3. "type": "String",
    4. "timestamp": 0,
    5. "data": "string",
    6. "properties": {
    7. "property1": "string",
    8. "property2": "string"
    9. }
    10. }

    Send a GET request to this endpoint: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema/SchemasResource_getSchema

    Here is an example of a response, which is returned in JSON format.

    1. {
    2. "version": "<the-version-number-of-the-schema>",
    3. "type": "<the-schema-type>",
    4. "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
    5. "data": "<an-utf8-encoded-string-of-schema-definition-data>",
    6. "properties": {} // the properties associated with the schema
    7. }
    1. SchemaInfo createSchema(String topic)

    Here is an example of SchemaInfo:

    1. PulsarAdmin admin = …;
    2. SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic");

    Get a specific schema

    To get a specific version of a schema, you can use one of the following methods.

    • Admin CLI
    • REST API
    • Java

    Use the get subcommand.

    1. pulsar-admin schemas get <topic-name> --version <version>

    Send a GET request to a schema endpoint: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema/:version/SchemasResource_getSchema

    Here is an example of a response, which is returned in JSON format.

    1. {
    2. "version": "<the-version-number-of-the-schema>",
    3. "type": "<the-schema-type>",
    4. "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
    5. "data": "<an-utf8-encoded-string-of-schema-definition-data>",
    6. "properties": {} // the properties associated with the schema
    7. }
    1. SchemaInfo createSchema(String topic, long version)

    Here is an example of SchemaInfo:

    1. PulsarAdmin admin = …;
    2. SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);

    Extract a schema

    To extract (provide) a schema via a topic, use the following method.

    • Admin CLI

    Use the extract subcommand.

    1. pulsar-admin schemas extract --classname <class-name> --jar <absolute-jar-path> --type <type-name>

    Delete a schema

    Schemas - 图2note

    In any case, the delete action deletes all versions of a schema registered for a topic.

    To delete a schema for a topic, you can use one of the following methods.

    • Admin CLI
    • REST API
    • Java

    Use the delete subcommand.

    1. pulsar-admin schemas delete <topic-name>

    Send a DELETE request to a schema endpoint: DELETE /admin/v2/schemas/:tenant/:namespace/:topic/schema/SchemasResource_deleteSchema

    Here is an example of a response returned in JSON format.

    1. {
    2. "version": "<the-latest-version-number-of-the-schema>",
    3. }
    1. void deleteSchema(String topic)

    Here is an example of deleting a schema.

    1. PulsarAdmin admin = …;
    2. admin.deleteSchema("my-tenant/my-ns/my-topic");

    Manage schema AutoUpdate

    Enable schema AutoUpdate

    To enable/enforce schema auto-update at the namespace level, you can use one of the following methods.

    • Admin CLI
    • REST API
    • Java

    Use the set-is-allow-auto-update-schema subcommand.

    1. bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable tenant/namespace

    Send a POST request to a namespace endpoint: POST /admin/v2/namespaces/:tenant/:namespace/isAllowAutoUpdateSchema/Namespaces_setIsAllowAutoUpdateSchema

    The post payload is in JSON format.

    1. {
    2. isAllowAutoUpdateSchema”: true
    3. }

    Here is an example to enable schema auto-update for a tenant/namespace.

    1. admin.namespaces().setIsAllowAutoUpdateSchema("my-namspace", true);

    Disable schema AutoUpdate

    Schemas - 图3note

    When schema auto-update is disabled, you can only register a new schema.

    To disable schema auto-update at the namespace level, you can use one of the following commands.

    • Admin CLI
    • REST API
    • Java

    Use the set-is-allow-auto-update-schema subcommand.

    1. bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace

    Send a POST request to a namespace endpoint: POST /admin/v2/namespaces/:tenant/:namespace/isAllowAutoUpdateSchema/Namespaces_setIsAllowAutoUpdateSchema

    The post payload is in JSON format.

    1. {
    2. isAllowAutoUpdateSchema”: false
    3. }

    Here is an example to enable schema auto-unpdate of a tenant/namespace.

    1. admin.namespaces().setIsAllowAutoUpdateSchema("my-namspace", false);

    Manage schema validation enforcement

    Enable schema validation enforcement

    To enforce schema validation enforcement at the cluster level, you can configure isSchemaValidationEnforced to true in the conf/broker.conf file.

    To enable schema validation enforcement at the namespace level, you can use one of the following commands.

    • Admin CLI
    • REST API
    • Java

    Use the set-schema-validation-enforce subcommand.

    1. bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace

    Send a POST request to a namespace endpoint: POST /admin/v2/namespaces/:tenant/:namespace/schemaValidationEnforced/Namespaces_setSchemaValidationEnforced

    The post payload is in JSON format.

    1. {
    2. schemaValidationEnforced”: true
    3. }

    Here is an example to enable schema validation enforcement for a tenant/namespace.

    1. admin.namespaces().setSchemaValidationEnforced("my-namspace", true);

    Disable schema validation enforcement

    To disable schema validation enforcement at the namespace level, you can use one of the following commands.

    • Admin CLI
    • REST API
    • Java

    Use the set-schema-validation-enforce subcommand.

    1. bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace

    Send a POST request to a namespace endpoint: POST /admin/v2/namespaces/:tenant/:namespace/schemaValidationEnforced/Namespaces_setSchemaValidationEnforced

    The post payload is in JSON format.

    1. {
    2. schemaValidationEnforced”: false
    3. }

    Here is an example to enable schema validation enforcement for a tenant/namespace.

    1. admin.namespaces().setSchemaValidationEnforced("my-namspace", false);

    Manage schema compatibility strategy

    The schema compatibility check strategy configured at different levels has priority: topic level > namespace level > cluster level. In other words:

    • If you set the strategy at both topic and namespace levels, the topic-level strategy is used.
    • If you set the strategy at both namespace and cluster levels, the namespace-level strategy is used.

    Set schema compatibility strategy

    Set topic-level schema compatibility strategy

    To set a schema compatibility check strategy at the topic level, you can use one of the following methods.

    • Admin CLI
    • REST API
    • Java

    Use the pulsar-admin topicPolicies set-schema-compatibility-strategy command.

    1. pulsar-admin topicPolicies set-schema-compatibility-strategy <strategy> <topicName>

    Send a PUT request to this endpoint: PUT /admin/v2/topics/:tenant/:namespace/:topic/PersistentTopics_setSchemaCompatibilityStrategy

    1. void setSchemaCompatibilityStrategy(String topic, SchemaCompatibilityStrategy strategy)

    Here is an example of setting a schema compatibility check strategy at the topic level.

    1. PulsarAdmin admin = …;
    2. admin.topicPolicies().setSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);

    Set namespace-level schema compatibility strategy

    To set schema compatibility check strategy at the namespace level, you can use one of the following methods.

    • Admin CLI
    • REST API
    • Java

    Use the pulsar-admin namespaces set-schema-compatibility-strategy command.

    1. pulsar-admin namespaces set-schema-compatibility-strategy options

    Send a PUT request to this endpoint: PUT /admin/v2/namespaces/:tenant/:namespace/schemaCompatibilityStrategy/Namespaces_setSchemaCompatibilityStrategy

    Use the setSchemaCompatibilityStrategy method.

    1. admin.namespaces().setSchemaCompatibilityStrategy("test", SchemaCompatibilityStrategy.FULL);

    Set cluster-level schema compatibility strategy

    To set schema compatibility check strategy at the cluster level, set schemaCompatibilityStrategy in the conf/broker.conf file.

    The following is an example:

    1. schemaCompatibilityStrategy=ALWAYS_INCOMPATIBLE

    Get schema compatibility strategy

    Get topic-level schema compatibility strategy

    To get the topic-level schema compatibility check strategy, you can use one of the following methods.

    • Admin CLI
    • REST API
    • Java

    Use the pulsar-admin topicPolicies get-schema-compatibility-strategy command.

    1. pulsar-admin topicPolicies get-schema-compatibility-strategy <topicName>

    Send a GET request to this endpoint: GET /admin/v2/topics/:tenant/:namespace/:topic/schemaCompatibilityStrategy/PersistentTopics_getSchemaCompatibilityStrategy

    1. SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String topic, boolean applied)

    Here is an example of getting the topic-level schema compatibility check strategy.

    1. PulsarAdmin admin = …;
    2. // get the current applied schema compatibility strategy
    3. admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", true);
    4. // only get the schema compatibility strategy from topic policies
    5. admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", false);

    Get namespace-level schema compatibility strategy

    You can get schema compatibility check strategy at namespace level using one of the following methods.

    • Admin CLI
    • REST API
    • Java

    Use the pulsar-admin namespaces get-schema-compatibility-strategy command.

    1. pulsar-admin namespaces get-schema-compatibility-strategy options

    Send a GET request to this endpoint: GET /admin/v2/namespaces/:tenant/:namespace/schemaCompatibilityStrategy/Namespaces_getSchemaCompatibilityStrategy

    Use the getSchemaCompatibilityStrategy method.

    1. admin.namespaces().getSchemaCompatibilityStrategy("test", SchemaCompatibilityStrategy.FULL);