Understand schema

This section explains the basic concepts of Pulsar schema and provides additional references.

Schema definition

Pulsar schema is defined in a data structure called SchemaInfo. It is stored and enforced on a per-topic basis and cannot be stored at the namespace or tenant level.

This is a string example of SchemaInfo.

  1. {
  2. "name": "test-string-schema",
  3. "type": "STRING",
  4. "schema": "",
  5. "properties": {}
  6. }

The following table outlines the fields that each SchemaInfo consists of.

FieldDescription
nameSchema name (a string).
typeSchema type that determines how to serialize and deserialize the schema data.
schemaSchema data, which is a sequence of 8-bit unsigned bytes and specific schema type.
propertiesA user-defined property as a string/string map, which can be used by applications to carry any application-specific logic.

Schema type

Pulsar supports various schema types, which are mainly divided into following categories:

Primitive type

The following table outlines the primitive types that Pulsar schema supports, and the conversions between schema types and language-specific primitive types.

Primitive TypeDescriptionJava TypePython TypeGo TypeC++ TypeC# Type
BOOLEANA binary value.booleanboolboolboolbool
INT8A 8-bit signed integer.intintint8int8_tbyte
INT16A 16-bit signed integer.intintint16int16_tshort
INT32A 32-bit signed integer.intintint32int32_tint
INT64A 64-bit signed integer.intintint64int64_tlong
FLOATA single precision (32-bit) IEEE 754 floating-point number.floatfloatfloat32floatfloat
DOUBLEA double-precision (64-bit) IEEE 754 floating-point number.doubledoublefloat64doubledouble
BYTESA sequence of 8-bit unsigned bytes.byte[], ByteBuffer, ByteBufbytes[]bytevoid *byte[], ReadOnlySequence<byte>
STRINGAn Unicode character sequence.stringstrstringstd::stringstring
TIMESTAMP (DATE, TIME)A logic type represents a specific instant in time with millisecond precision.
It stores the number of milliseconds since January 1, 1970, 00:00:00 GMT as an INT64 value.
java.sql.Timestamp (java.sql.Time, java.util.Date)N/AN/AN/ADateTime,TimeSpan
INSTANTA single instantaneous point on the timeline with nanoseconds precision.java.time.InstantN/AN/AN/AN/A
LOCAL_DATEAn immutable date-time object that represents a date, often viewed as year-month-day.java.time.LocalDateN/AN/AN/AN/A
LOCAL_TIMEAn immutable date-time object that represents a time, often viewed as hour-minute-second. Time is represented to nanosecond precision.java.time.LocalDateTimeN/AN/AN/AN/A
LOCAL_DATE_TIMEAn immutable date-time object that represents a date-time, often viewed as year-month-day-hour-minute-second.java.time.LocalTimeN/AN/AN/AN/A

Understand schema - 图1note

Pulsar does not store any schema data in SchemaInfo for primitive types. Some of the primitive schema implementations can use the properties parameter to store implementation-specific tunable settings. For example, a string schema can use properties to store the encoding charset to serialize and deserialize strings.

For more instructions and examples, see Construct a string schema.

Complex type

The following table outlines the complex types that Pulsar schema supports:

Complex TypeDescription
KeyValueRepresents a complex key/value pair.
StructRepresents structured data, including AvroBaseStructSchema, ProtobufNativeSchema and NativeAvroBytesSchema.

KeyValue schema

KeyValue schema helps applications define schemas for both key and value. Pulsar stores the SchemaInfo of the key schema and the value schema together.

Pulsar provides the following methods to encode a single key/value pair in a message:

  • INLINE - Key/Value pairs are encoded together in the message payload.
  • SEPARATED - The Key is stored as a message key, while the value is stored as the message payload. See Construct a key/value schema for more details.

Struct schema

The following table outlines the struct types that Pulsar schema supports:

TypeDescription
AvroBaseStructSchemaPulsar uses Avro Specification to declare the schema definition for AvroBaseStructSchema, which supports AvroSchema, JsonSchema, and ProtobufSchema.

This allows Pulsar to:
- use the same tools to manage schema definitions.
- use different serialization or deserialization methods to handle data.
ProtobufNativeSchemaProtobufNativeSchema is based on protobuf native descriptor.

This allows Pulsar to:
- use native protobuf-v3 to serialize or deserialize data.
- use AutoConsume to deserialize data.
NativeAvroBytesSchemaNativeAvroBytesSchema wraps a native Avro schema type org.apache.avro.Schema. The result is a schema instance that accepts a serialized Avro payload without validating it against the wrapped Avro schema.

When you migrate or ingest event or messaging data from external systems (such as Kafka and Cassandra), the data is often already serialized in Avro format. The applications producing the data typically have validated the data against their schemas (including compatibility checks) and stored them in a database or a dedicated service (such as schema registry). The schema of each serialized data record is usually retrievable by some metadata attached to that record. In such cases, a Pulsar producer doesn’t need to repeat the schema validation when sending the ingested events to a topic. All it needs to do is pass each message or event with its schema to Pulsar.

Pulsar provides the following methods to use the struct schema.

  • static
  • generic
  • SchemaDefinition

This example shows how to construct a struct schema with these methods and use it to produce and consume messages.

  • static
  • generic
  • SchemaDefinition

You can predefine the struct schema, which can be a POJO in Java, a struct in Go, or classes generated by Avro or Protobuf tools.

Example

Pulsar gets the schema definition from the predefined struct using an Avro library. The schema definition is the schema data stored as a part of the SchemaInfo.

  1. Create the User class to define the messages sent to Pulsar topics.

    1. # If you use Lombok
    2. @Builder
    3. @AllArgsConstructor
    4. @NoArgsConstructor
    5. public static class User {
    6. public String name;
    7. public int age;
    8. }
    9. # If you DON'T use Lombok you will need to add the constructor like this
    10. #
    11. # public static class User {
    12. # String name;
    13. # int age;
    14. # public User() { }
    15. # public User(String name, int age) { this.name = name; this.age = age; } }
    16. #}
  2. Create a producer with a struct schema and send messages.

    1. Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();
    2. producer.newMessage().value(new User("pulsar-user", 1)).send();
  3. Create a consumer with a struct schema and receive messages

    1. Consumer<User> consumer = client.newConsumer(Schema.AVRO(User.class)).subscribe();
    2. User user = consumer.receive().getValue();

Sometimes applications do not have pre-defined structs, and you can use this method to define schema and access data.

You can define the struct schema using the GenericSchemaBuilder, generate a generic struct using GenericRecordBuilder, and consume messages into GenericRecord.

Example

  1. Use RecordSchemaBuilder to build a schema.

    1. RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record("schemaName");
    2. recordSchemaBuilder.field("intField").type(SchemaType.INT32);
    3. SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO);
    4. Consumer<GenericRecord> consumer = client.newConsumer(Schema.generic(schemaInfo))
    5. .topic(topicName)
    6. .subscriptionName(subscriptionName)
    7. .subscribe();
    8. Producer<GenericRecord> producer = client.newProducer(Schema.generic(schemaInfo))
    9. .topic(topicName)
    10. .create();
  2. Use RecordBuilder to build the struct records.

    1. GenericSchemaImpl schema = GenericAvroSchema.of(schemaInfo);
    2. // send message
    3. GenericRecord record = schema.newRecordBuilder().set("intField", 32).build();
    4. producer.newMessage().value(record).send();
    5. // receive message
    6. Message<GenericRecord> msg = consumer.receive();
    7. Assert.assertEquals(msg.getValue().getField("intField"), 32);

You can define the schemaDefinition to generate a struct schema.

Example

  1. Create the User class to define the messages sent to Pulsar topics.

    1. public static class User {
    2. public String name;
    3. public int age;
    4. public User(String name, int age) {
    5. this.name = name;
    6. this.age = age
    7. }
    8. public User() {}
    9. }
  2. Create a producer with a SchemaDefinition and send messages.

    1. SchemaDefinition<User> schemaDefinition = SchemaDefinition.<User>builder().withPojo(User.class).build();
    2. Producer<User> producer = client.newProducer(Schema.AVRO(schemaDefinition)).create();
    3. producer.newMessage().value(new User ("pulsar-user", 1)).send();
  3. Create a consumer with a SchemaDefinition schema and receive messages.

    1. SchemaDefinition<User> schemaDefinition = SchemaDefinition.<User>builder().withPojo(User.class).build();
    2. Consumer<User> consumer = client.newConsumer(Schema.AVRO(schemaDefinition)).subscribe();
    3. User user = consumer.receive().getValue();

Auto Schema

If there is no chance to know the schema type of a Pulsar topic in advance, you can use AUTO schemas to produce/consume generic records to/from brokers.

Auto schema contains two categories:

  • AUTO_PRODUCE transfers data from a producer to a Pulsar topic that has a schema and helps the producer validate whether the outbound bytes are compatible with the schema of the topic. For more instructions, see Construct an AUTO_PRODUCE schema.
  • AUTO_CONSUME transfers data from a Pulsar topic that has a schema to a consumer and helps the topic validate whether the out-bound bytes are compatible with the consumer. In other words, the topic deserializes messages into language-specific objects GenericRecord using the SchemaInfo retrieved from brokers. For more instructions, see Construct an AUTO_CONSUME schema.

Schema validation enforcement

Schema validation enforcement enables brokers to reject producers/consumers without a schema.

By default, schema validation enforcement is only disabled (isSchemaValidationEnforced\=false) for producers, which means:

  • A producer without a schema can produce any messages to a topic with schemas, which may result in producing trash data to the topic.
  • Clients that don’t support schema are allowed to produce messages to a topic with schemas.

For how to enable schema validation enforcement, see Manage schema validation.

Schema evolution

Schemas store the details of attributes and types. To satisfy new business needs, schemas undergo evolution over time with versioning.

Understand schema - 图2note

Schema evolution only applies to Avro, JSON, Protobuf, and ProtobufNative schemas.

Schema evolution may impact existing consumers. The following control measures have been designed to serve schema evolution and ensure the downstream consumers can seamlessly handle schema evolution:

For further readings about schema evolution, see Avro documentation and Protobuf documentation.

Schema versioning

Each SchemaInfo stored with a topic has a version. The schema version manages schema changes happening within a topic.

Messages produced with SchemaInfo are tagged with a schema version. When a message is consumed by a Pulsar client, the client can use the schema version to retrieve the corresponding SchemaInfo and use the correct schema to deserialize data. Once a version is assigned to or fetched from a schema, all subsequent messages produced by that producer are tagged with the appropriate version.

Suppose you are using a Pulsar Java client to create a producer and send messages.

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650")
  3. .build();
  4. Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
  5. .topic("sensor-data")
  6. .sendTimeout(3, TimeUnit.SECONDS)
  7. .create();

The table below outlines the possible scenarios when this connection attempt occurs and the result of each scenario:

ScenarioResult
  • No schema exists for the topic.
  • (1) The producer is created with the given schema.
    (2) The schema is transmitted to the broker and stored since there is no existing schema.
    (3) Any consumer created using the same schema or topic can consume messages from the sensor-data topic.
  • A schema already exists.
  • The producer connects using the same schema that is already stored.
  • (1) The schema is transmitted to the broker.
    (2) The broker determines that the schema is compatible.
    (3) The broker attempts to store the schema in BookKeeper but then determines that it’s already stored, so it is used to tag produced messages.
  • A schema already exists.
  • The producer connects using a new schema that is compatible.
  • (1) The schema is transmitted to the broker.
    (2) The broker determines that the schema is compatible and stores the new schema as the current version (with a new version number).

    Schema compatibility check

    The purpose of schema compatibility check is to ensure that existing consumers can process the introduced messages.

    When receiving a SchemaInfo from producers, brokers recognize the schema type and deploy the schema compatibility checker (schemaRegistryCompatibilityCheckers) for that schema type to check if the SchemaInfo is compatible with the schema of the topic by applying the configured compatibility check strategy.

    The default value of schemaRegistryCompatibilityCheckers in the conf/broker.conf file is as follows.

    1. schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck

    Each schema type corresponds to one instance of the schema compatibility checker. Avro, JSON, and Protobuf schemas have their own compatibility checkers, while all the other schema types share the default compatibility checker that disables the schema evolution.

    Schema compatibility check strategy

    Suppose that you have a topic containing three schemas (V1, V2, and V3). V1 is the oldest and V3 is the latest. The following table outlines 8 schema compatibility strategies and how it works.

    Compatibility check strategyDefinitionChanges allowedCheck against which schema
    ALWAYS_COMPATIBLEDisable schema compatibility check.All changes are allowedAll previous versions
    ALWAYS_INCOMPATIBLEDisable schema evolution, that is, any schema change is rejected.No change is allowedN/A
    BACKWARDConsumers using schema V3 can process data written by producers using the last schema version V2.
  • Add optional fields
  • Delete fields
  • Latest version
    BACKWARD_TRANSITIVEConsumers using schema V3 can process data written by producers using all previous schema versions V2 and V1.
  • Add optional fields
  • Delete fields
  • All previous versions
    FORWARDConsumers using the last schema version V2 can process data written by producers using a new schema V3, even though they may not be able to use the full capabilities of the new schema.
  • Add fields
  • Delete optional fields
  • Latest version
    FORWARD_TRANSITIVEConsumers using all previous schema versions V2 or V1 can process data written by producers using a new schema V3.
  • Add fields
  • Delete optional fields
  • All previous versions
    FULLSchemas are both backward and forward compatible.
  • Consumers using the last schema V2 can process data written by producers using the new schema V3.
  • Consumers using the new schema V3 can process data written by producers using the last schema V2.
  • Modify optional fieldsLatest version
    FULL_TRANSITIVEBackward and forward compatible among schema V3, V2, and V1.
  • Consumers using the schema V3 can process data written by producers using schema V2 and V1.
  • Consumers using the schema V2 or V1 can process data written by producers using the schema V3.
  • Modify optional fieldsAll previous versions

    Understand schema - 图3tip

    • The default schema compatibility check strategy varies depending on schema types.
      • For Avro and JSON, the default one is FULL.
      • For others, the default one is ALWAYS_INCOMPATIBLE.
    • For more instructions about how to set the strategy, see Manage schemas.

    Schema AutoUpdate

    By default, schema AutoUpdate is enabled. When a schema passes the schema compatibility check, the producer automatically updates this schema to the topic it produces.

    Producer side

    For a producer, the AutoUpdate happens in the following cases:

    • If a topic doesn’t have a schema (meaning the data is in raw bytes), Pulsar registers the schema automatically.

    • If a topic has a schema and the producer doesn’t carry any schema (meaning it produces raw bytes):

      • If schema validation enforcement is disabled (schemaValidationEnforced\=false) in the namespace that the topic belongs to, the producer is allowed to connect to the topic and produce data.

      • Otherwise, the producer is rejected.

      • If a topic has a schema and the producer carries a schema, see How schema works on producer side for more information.

    Consumer side

    For a consumer, the AutoUpdate happens in the following cases:

    • If a consumer connects to a topic without a schema (meaning it consumes raw bytes), the consumer can connect to the topic successfully without doing any compatibility check.

    • If a consumer connects to a topic with a schema, see How schema works on consumer side for more information.

    Order of upgrading clients

    To adapt to schema evolution and auto-update, you need to upgrade your client applications accordingly. The upgrade order may vary depending on the configured schema compatibility check strategy.

    The following table outlines the mapping between the schema compatibility check strategy and the upgrade order of clients.

    Compatibility check strategyUpgrade orderDescription
    ALWAYS_COMPATIBLEAny orderThe compatibility check is disabled. Consequently, you can upgrade the producers and consumers in any order.
    ALWAYS_INCOMPATIBLEN/AThe schema evolution is disabled.
  • BACKWARD
  • BACKWARD_TRANSITIVE
  • Consumer firstThere is no guarantee that consumers using the old schema can read data produced using the new schema. Consequently, upgrade all consumers first, and then start producing new data.
  • FORWARD
  • FORWARD_TRANSITIVE
  • Producer firstThere is no guarantee that consumers using the new schema can read data produced using the old schema. Consequently, upgrade all producers first to use the new schema and ensure the data that has already been produced using the old schemas are not available to consumers, and then upgrade the consumers.
  • FULL
  • FULL_TRANSITIVE
  • Any orderIt is guaranteed that consumers using the old schema can read data produced using the new schema and consumers using the new schema can read data produced using the old schema. Consequently, you can upgrade the producers and consumers in any order.