Work with schema

Get started with schema

For an overview of Pulsar schema and language-specific code examples, see Schema - Overview and Schema - Get Started.

Work with Python schema

Working with Python schema has slight differences from using other languages. This section introduces the specific reference and examples for using Python clients to work with schema.

Supported schema types

You can use different built-in schema types in Pulsar. All the definitions are in the pulsar.schema package.

SchemaNotes
BytesSchemaGet the raw payload as a bytes object. No serialization/deserialization are performed. This is the default schema mode
StringSchemaEncode/decode payload as a UTF-8 string. Uses str objects
JsonSchemaRequire record definition. Serializes the record into standard JSON payload
AvroSchemaRequire record definition. Serializes in AVRO format

Schema definition reference

The schema definition is done through a class that inherits from pulsar.schema.Record.

This class has a number of fields that can be of either pulsar.schema.Field type or another nested Record. All the fields are specified in the pulsar.schema package. The fields are matching the AVRO field types.

Field TypePython TypeNotes
Booleanbool
Integerint
Longint
Floatfloat
Doublefloat
Bytesbytes
Stringstr
ArraylistNeed to specify record type for items.
MapdictKey is always String. Need to specify value type.

Additionally, any Python Enum type can be used as a valid field type.

Fields parameters

When adding a field, you can use these parameters in the constructor.

ArgumentDefaultNotes
defaultNoneSet a default value for the field, such as a = Integer(default=5).
requiredFalseMark the field as “required”. It is set in the schema accordingly.

Schema definition examples

Simple definition
  1. class Example(Record):
  2. a = String()
  3. b = Integer()
  4. c = Array(String())
  5. i = Map(String())
Using enums
  1. from enum import Enum
  2. class Color(Enum):
  3. red = 1
  4. green = 2
  5. blue = 3
  6. class Example(Record):
  7. name = String()
  8. color = Color
Complex types
  1. class MySubRecord(Record):
  2. x = Integer()
  3. y = Long()
  4. z = String()
  5. class Example(Record):
  6. a = String()
  7. sub = MySubRecord()
Set namespace for Avro schema

Set the namespace for the Avro Record schema using the special field _avro_namespace.

  1. class NamespaceDemo(Record):
  2. _avro_namespace = 'xxx.xxx.xxx'
  3. x = String()
  4. y = Integer()

The schema definition is like this.

  1. {
  2. "name": "NamespaceDemo", "namespace": "xxx.xxx.xxx", "type": "record", "fields": [
  3. {"name": "x", "type": ["null", "string"]},
  4. {"name": "y", "type": ["null", "int"]}
  5. ]
  6. }

Declare and validate schema

Before the producer is created, the Pulsar broker validates that the existing topic schema is the correct type and that the format is compatible with the schema definition of a class. If the format of the topic schema is incompatible with the schema definition, an exception occurs in the producer creation.

Once a producer is created with a certain schema definition, it only accepts objects that are instances of the declared schema class.

Similarly, for a consumer or reader, the consumer returns an object (which is an instance of the schema record class) rather than raw bytes.

Example

  1. consumer = client.subscribe(
  2. topic='my-topic',
  3. subscription_name='my-subscription',
  4. schema=AvroSchema(Example) )
  5. while True:
  6. msg = consumer.receive()
  7. ex = msg.value()
  8. try:
  9. print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c))
  10. # Acknowledge successful processing of the message
  11. consumer.acknowledge(msg)
  12. except Exception:
  13. # Message failed to be processed
  14. consumer.negative_acknowledge(msg)