Understand schema

This chapter explains the basic concepts of Pulsar schema, focuses on the topics of particular importance, and provides additional background.

SchemaInfo

Pulsar schema is defined in a data structure called SchemaInfo.

The SchemaInfo is stored and enforced on a per-topic basis and cannot be stored at the namespace or tenant level.

A SchemaInfo consists of the following fields:

FieldDescription
nameSchema name (a string).
typeSchema type, which determines how to interpret the schema data.
  • Predefined schema: see here.
  • Customized schema: it is left as an empty string.
  • schemapayload)Schema data, which is a sequence of 8-bit unsigned bytes and schema-type specific.
    propertiesIt is a user defined properties as a string/string map. Applications can use this bag for carrying any application specific logics. Possible properties might be the Git hash associated with the schema, an environment string like dev or prod.

    Example

    This is the SchemaInfo of a string.

    1. {
    2. "name": "test-string-schema",
    3. "type": "STRING",
    4. "schema": "",
    5. "properties": {}
    6. }

    Schema type

    Pulsar supports various schema types, which are mainly divided into two categories:

    • Primitive type

    • Complex type

    Primitive type

    Currently, Pulsar supports the following primitive types:

    Primitive TypeDescription
    BOOLEANA binary value
    INT8A 8-bit signed integer
    INT16A 16-bit signed integer
    INT32A 32-bit signed integer
    INT64A 64-bit signed integer
    FLOATA single precision (32-bit) IEEE 754 floating-point number
    DOUBLEA double-precision (64-bit) IEEE 754 floating-point number
    BYTESA sequence of 8-bit unsigned bytes
    STRINGA Unicode character sequence
    TIMESTAMP (DATE, TIME)A logic type represents a specific instant in time with millisecond precision.
    It stores the number of milliseconds since January 1, 1970, 00:00:00 GMT as an INT64 value
    INSTANTA single instantaneous point on the time-line with nanoseconds precision
    LOCAL_DATEAn immutable date-time object that represents a date, often viewed as year-month-day
    LOCAL_TIMEAn immutable date-time object that represents a time, often viewed as hour-minute-second. Time is represented to nanosecond precision.
    LOCAL_DATE_TIMEAn immutable date-time object that represents a date-time, often viewed as year-month-day-hour-minute-second

    For primitive types, Pulsar does not store any schema data in SchemaInfo. The type in SchemaInfo is used to determine how to serialize and deserialize the data.

    Some of the primitive schema implementations can use properties to store implementation-specific tunable settings. For example, a string schema can use properties to store the encoding charset to serialize and deserialize strings.

    The conversions between Pulsar schema types and language-specific primitive types are as below.

    Schema TypeJava TypePython TypeGo Type
    BOOLEANbooleanboolbool
    INT8byteint8
    INT16shortint16
    INT32intint32
    INT64longint64
    FLOATfloatfloatfloat32
    DOUBLEdoublefloatfloat64
    BYTESbyte[], ByteBuffer, ByteBufbytes[]byte
    STRINGstringstrstring
    TIMESTAMPjava.sql.Timestamp
    TIMEjava.sql.Time
    DATEjava.util.Date
    INSTANTjava.time.Instant
    LOCAL_DATEjava.time.LocalDate
    LOCAL_TIMEjava.time.LocalDateTime
    LOCAL_DATE_TIMEjava.time.LocalTime

    Example

    This example demonstrates how to use a string schema.

    1. Create a producer with a string schema and send messages.

      1. Producer<String> producer = client.newProducer(Schema.STRING).create();
      2. producer.newMessage().value("Hello Pulsar!").send();
    1. Create a consumer with a string schema and receive messages.

      1. Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();
      2. consumer.receive();

    Complex type

    Currently, Pulsar supports the following complex types:

    Complex TypeDescription
    keyvalueRepresents a complex type of a key/value pair.
    structHandles structured data. It supports AvroBaseStructSchema and ProtobufNativeSchema.

    keyvalue

    Keyvalue schema helps applications define schemas for both key and value.

    For SchemaInfo of keyvalue schema, Pulsar stores the SchemaInfo of key schema and the SchemaInfo of value schema together.

    Pulsar provides the following methods to encode a key/value pair in messages:

    • INLINE

    • SEPARATED

    You can choose the encoding type when constructing the key/value schema.

    • INLINE
    • SEPARATED

    Key/value pairs are encoded together in the message payload.

    Key is encoded in the message key and the value is encoded in the message payload.

    Example

    This example shows how to construct a key/value schema and then use it to produce and consume messages.

    1. Construct a key/value schema with INLINE encoding type.

      1. Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
      2. Schema.INT32,
      3. Schema.STRING,
      4. KeyValueEncodingType.INLINE
      5. );
    1. Optionally, construct a key/value schema with SEPARATED encoding type.

      1. Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
      2. Schema.INT32,
      3. Schema.STRING,
      4. KeyValueEncodingType.SEPARATED
      5. );
    1. Produce messages using a key/value schema.

      1. Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
      2. Schema.INT32,
      3. Schema.STRING,
      4. KeyValueEncodingType.SEPARATED
      5. );
      6. Producer<KeyValue<Integer, String>> producer = client.newProducer(kvSchema)
      7. .topic(TOPIC)
      8. .create();
      9. final int key = 100;
      10. final String value = "value-100";
      11. // send the key/value message
      12. producer.newMessage()
      13. .value(new KeyValue(key, value))
      14. .send();
    1. Consume messages using a key/value schema.

      1. Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
      2. Schema.INT32,
      3. Schema.STRING,
      4. KeyValueEncodingType.SEPARATED
      5. );
      6. Consumer<KeyValue<Integer, String>> consumer = client.newConsumer(kvSchema)
      7. ...
      8. .topic(TOPIC)
      9. .subscriptionName(SubscriptionName).subscribe();
      10. // receive key/value pair
      11. Message<KeyValue<Integer, String>> msg = consumer.receive();
      12. KeyValue<Integer, String> kv = msg.getValue();

    struct

    This section describes the details of type and usage of the struct schema.

    Type

    struct schema supports AvroBaseStructSchema and ProtobufNativeSchema.

    TypeDescription
    AvroBaseStructSchemaPulsar uses Avro Specification to declare the schema definition for AvroBaseStructSchema, which supports AvroSchema, JsonSchema, and ProtobufSchema.

    This allows Pulsar:
    - to use the same tools to manage schema definitions
    - to use different serialization or deserialization methods to handle data
    ProtobufNativeSchemaProtobufNativeSchema is based on protobuf native Descriptor.

    This allows Pulsar:
    - to use native protobuf-v3 to serialize or deserialize data
    - to use AutoConsume to deserialize data.
    Usage

    Pulsar provides the following methods to use the struct schema:

    • static

    • generic

    • SchemaDefinition

    • static

    • generic
    • SchemaDefinition

    You can predefine the struct schema, which can be a POJO in Java, a struct in Go, or classes generated by Avro or Protobuf tools.

    Example

    Pulsar gets the schema definition from the predefined struct using an Avro library. The schema definition is the schema data stored as a part of the SchemaInfo.

    1. Create the User class to define the messages sent to Pulsar topics.

      1. @Builder
      2. @AllArgsConstructor
      3. @NoArgsConstructor
      4. public static class User {
      5. String name;
      6. int age;
      7. }
    1. Create a producer with a struct schema and send messages.

      1. Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();
      2. producer.newMessage().value(User.builder().name("pulsar-user").age(1).build()).send();
    1. Create a consumer with a struct schema and receive messages

      1. Consumer<User> consumer = client.newConsumer(Schema.AVRO(User.class)).subscribe();
      2. User user = consumer.receive();

    Sometimes applications do not have pre-defined structs, and you can use this method to define schema and access data.

    You can define the struct schema using the GenericSchemaBuilder, generate a generic struct using GenericRecordBuilder and consume messages into GenericRecord.

    Example

    1. Use RecordSchemaBuilder to build a schema.

      1. RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record("schemaName");
      2. recordSchemaBuilder.field("intField").type(SchemaType.INT32);
      3. SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO);
      4. Producer<GenericRecord> producer = client.newProducer(Schema.generic(schemaInfo)).create();
    1. Use RecordBuilder to build the struct records.

      1. producer.newMessage().value(schema.newRecordBuilder()
      2. .set("intField", 32)
      3. .build()).send();

    You can define the schemaDefinition to generate a struct schema.

    Example

    1. Create the User class to define the messages sent to Pulsar topics.

      1. @Builder
      2. @AllArgsConstructor
      3. @NoArgsConstructor
      4. public static class User {
      5. String name;
      6. int age;
      7. }
    1. Create a producer with a SchemaDefinition and send messages.

      1. SchemaDefinition<User> schemaDefinition = SchemaDefinition.<User>builder().withPojo(User.class).build();
      2. Producer<User> producer = client.newProducer(Schema.AVRO(schemaDefinition)).create();
      3. producer.newMessage().value(User.builder().name("pulsar-user").age(1).build()).send();
    1. Create a consumer with a SchemaDefinition schema and receive messages

      1. SchemaDefinition<User> schemaDefinition = SchemaDefinition.<User>builder().withPojo(User.class).build();
      2. Consumer<User> consumer = client.newConsumer(Schema.AVRO(schemaDefinition)).subscribe();
      3. User user = consumer.receive().getValue();

    Auto Schema

    If you don’t know the schema type of a Pulsar topic in advance, you can use AUTO schema to produce or consume generic records to or from brokers.

    Auto Schema TypeDescription
    AUTO_PRODUCEThis is useful for transferring data from a producer to a Pulsar topic that has a schema.
    AUTO_CONSUMEThis is useful for transferring data from a Pulsar topic that has a schema to a consumer.

    AUTO_PRODUCE

    AUTO_PRODUCE schema helps a producer validate whether the bytes sent by the producer is compatible with the schema of a topic.

    Example

    Suppose that:

    • You have a producer processing messages from a Kafka topic K.

    • You have a Pulsar topic P, and you do not know its schema type.

    • Your application reads the messages from K and writes the messages to P.

    In this case, you can use AUTO_PRODUCE to verify whether the bytes produced by K can be sent to P or not.

    1. Produce<byte[]> pulsarProducer = client.newProducer(Schema.AUTO_PRODUCE())
    2. .create();
    3. byte[] kafkaMessageBytes = ;
    4. pulsarProducer.produce(kafkaMessageBytes);

    AUTO_CONSUME

    AUTO_CONSUME schema helps a Pulsar topic validate whether the bytes sent by a Pulsar topic is compatible with a consumer, that is, the Pulsar topic deserializes messages into language-specific objects using the SchemaInfo retrieved from broker-side.

    Currently, AUTO_CONSUME supports AVRO, JSON and ProtobufNativeSchema schemas. It deserializes messages into GenericRecord.

    Example

    Suppose that:

    • You have a Pulsar topic P.

    • You have a consumer (for example, MySQL) receiving messages from the topic P.

    • Your application reads the messages from P and writes the messages to MySQL.

    In this case, you can use AUTO_CONSUME to verify whether the bytes produced by P can be sent to MySQL or not.

    1. Consumer<GenericRecord> pulsarConsumer = client.newConsumer(Schema.AUTO_CONSUME())
    2. .subscribe();
    3. Message<GenericRecord> msg = consumer.receive() ;
    4. GenericRecord record = msg.getValue();

    Native Avro Schema

    When migrating or ingesting event or message data from external systems (such as Kafka and Cassandra), the events are often already serialized in Avro format. The applications producing the data typically have validated the data against their schemas (including compatibility checks) and stored them in a database or a dedicated service (such as a schema registry). The schema of each serialized data record is usually retrievable by some metadata attached to that record. In such cases, a Pulsar producer doesn’t need to repeat the schema validation step when sending the ingested events to a topic. All it needs to do is passing each message or event with its schema to Pulsar.

    Hence, we provide Schema.NATIVE_AVRO to wrap a native Avro schema of type org.apache.avro.Schema. The result is a schema instance of Pulsar that accepts a serialized Avro payload without validating it against the wrapped Avro schema.

    Example

    1. org.apache.avro.Schema nativeAvroSchema = ;
    2. Producer<byte[]> producer = pulsarClient.newProducer().topic("ingress").create();
    3. byte[] content = ;
    4. producer.newMessage(Schema.NATIVE_AVRO(nativeAvroSchema)).value(content).send();

    Schema version

    Each SchemaInfo stored with a topic has a version. Schema version manages schema changes happening within a topic.

    Messages produced with a given SchemaInfo is tagged with a schema version, so when a message is consumed by a Pulsar client, the Pulsar client can use the schema version to retrieve the corresponding SchemaInfo and then use the SchemaInfo to deserialize data.

    Schemas are versioned in succession. Schema storage happens in a broker that handles the associated topics so that version assignments can be made.

    Once a version is assigned/fetched to/for a schema, all subsequent messages produced by that producer are tagged with the appropriate version.

    Example

    The following example illustrates how the schema version works.

    Suppose that a Pulsar Java client created using the code below attempts to connect to Pulsar and begins to send messages:

    1. PulsarClient client = PulsarClient.builder()
    2. .serviceUrl("pulsar://localhost:6650")
    3. .build();
    4. Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
    5. .topic("sensor-data")
    6. .sendTimeout(3, TimeUnit.SECONDS)
    7. .create();

    The table below lists the possible scenarios when this connection attempt occurs and what happens in each scenario:

    ScenarioWhat happens
  • No schema exists for the topic.
  • (1) The producer is created using the given schema. (2) Since no existing schema is compatible with the SensorReading schema, the schema is transmitted to the broker and stored. (3) Any consumer created using the same schema or topic can consume messages from the sensor-data topic.
  • A schema already exists.
  • The producer connects using the same schema that is already stored.
  • (1) The schema is transmitted to the broker. (2) The broker determines that the schema is compatible. (3) The broker attempts to store the schema in BookKeeper but then determines that it’s already stored, so it is used to tag produced messages.

    How does schema work

    Pulsar schemas are applied and enforced at the topic level (schemas cannot be applied at the namespace or tenant level).

    Producers and consumers upload schemas to brokers, so Pulsar schemas work on the producer side and the consumer side.

    Producer side

    This diagram illustrates how does schema work on the Producer side.

    Schema works at the producer side

    1. The application uses a schema instance to construct a producer instance.

      The schema instance defines the schema for the data being produced using the producer instance.

      Take AVRO as an example, Pulsar extract schema definition from the POJO class and construct the SchemaInfo that the producer needs to pass to a broker when it connects.

    2. The producer connects to the broker with the SchemaInfo extracted from the passed-in schema instance.

    3. The broker looks up the schema in the schema storage to check if it is already a registered schema.

    4. If yes, the broker skips the schema validation since it is a known schema, and returns the schema version to the producer.

    5. If no, the broker verifies whether a schema can be automatically created in this namespace:

    • If isAllowAutoUpdateSchema sets to true, then a schema can be created, and the broker validates the schema based on the schema compatibility check strategy defined for the topic.
    • If isAllowAutoUpdateSchema sets to false, then a schema can not be created, and the producer is rejected to connect to the broker.

    Tip:

    isAllowAutoUpdateSchema can be set via Pulsar admin API or REST API.

    For how to set isAllowAutoUpdateSchema via Pulsar admin API, see Manage AutoUpdate Strategy.

    1. If the schema is allowed to be updated, then the compatible strategy check is performed.
    • If the schema is compatible, the broker stores it and returns the schema version to the producer.

      All the messages produced by this producer are tagged with the schema version.

    • If the schema is incompatible, the broker rejects it.

    Consumer side

    This diagram illustrates how does Schema work on the consumer side.

    Schema works at the consumer side

    1. The application uses a schema instance to construct a consumer instance.

      The schema instance defines the schema that the consumer uses for decoding messages received from a broker.

    2. The consumer connects to the broker with the SchemaInfo extracted from the passed-in schema instance.

    3. The broker determines whether the topic has one of them (a schema/data/a local consumer and a local producer).

    4. If a topic does not have all of them (a schema/data/a local consumer and a local producer):

      • If isAllowAutoUpdateSchema sets to true, then the consumer registers a schema and it is connected to a broker.
      • If isAllowAutoUpdateSchema sets to false, then the consumer is rejected to connect to a broker.
    5. If a topic has one of them (a schema/data/a local consumer and a local producer), then the schema compatibility check is performed.

      • If the schema passes the compatibility check, then the consumer is connected to the broker.
      • If the schema does not pass the compatibility check, then the consumer is rejected to connect to the broker.
    6. The consumer receives messages from the broker.

      If the schema used by the consumer supports schema versioning (for example, AVRO schema), the consumer fetches the SchemaInfo of the version tagged in messages and uses the passed-in schema and the schema tagged in messages to decode the messages.