Get started
This hands-on tutorial provides instructions and examples on how to construct schemas. For instructions on administrative tasks, see Manage schema.
Construct a schema
bytes
This example demonstrates how to construct a bytes schema using language-specific clients and use it to produce and consume messages.
- Java
- C++
- Python
- Go
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("my-topic")
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-sub")
.subscribe();
producer.newMessage().value("message".getBytes()).send();
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
SchemaInfo schemaInfo = SchemaInfo(SchemaType::BYTES, "Bytes", "");
Producer producer;
client.createProducer("topic-bytes", ProducerConfiguration().setSchema(schemaInfo), producer);
std::array<char, 1024> buffer;
producer.send(MessageBuilder().setContent(buffer.data(), buffer.size()).build());
Consumer consumer;
res = client.subscribe("topic-bytes", "my-sub", ConsumerConfiguration().setSchema(schemaInfo), consumer);
Message msg;
consumer.receive(msg, 3000);
producer = client.create_producer(
'bytes-schema-topic',
schema=BytesSchema())
producer.send(b"Hello")
consumer = client.subscribe(
'bytes-schema-topic',
'sub',
schema=BytesSchema())
msg = consumer.receive()
data = msg.value()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
Schema: pulsar.NewBytesSchema(nil),
})
id, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: []byte("message"),
})
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
Schema: pulsar.NewBytesSchema(nil),
SubscriptionName: "my-sub",
Type: pulsar.Exclusive,
})
string
This example demonstrates how to construct a string schema using language-specific clients and use it to produce and consume messages.
- Java
- C++
- Python
- Go
Producer<String> producer = client.newProducer(Schema.STRING).create();
producer.newMessage().value("Hello Pulsar!").send();
Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();
Message<String> message = consumer.receive();
SchemaInfo schemaInfo = SchemaInfo(SchemaType::STRING, "String", "");
Producer producer;
client.createProducer("topic-string", ProducerConfiguration().setSchema(schemaInfo), producer);
producer.send(MessageBuilder().setContent("message").build());
Consumer consumer;
client.subscribe("topic-string", "my-sub", ConsumerConfiguration().setSchema(schemaInfo), consumer);
Message msg;
consumer.receive(msg, 3000);
producer = client.create_producer(
'string-schema-topic',
schema=StringSchema())
producer.send("Hello")
consumer = client.subscribe(
'string-schema-topic',
'sub',
schema=StringSchema())
msg = consumer.receive()
str = msg.value()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
Schema: pulsar.NewStringSchema(nil),
})
id, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: "message",
})
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
Schema: pulsar.NewStringSchema(nil),
SubscriptionName: "my-sub",
Type: pulsar.Exclusive,
})
msg, err := consumer.Receive(context.Background())
key/value
This example shows how to construct a key/value schema using language-specific clients and use it to produce and consume messages.
- Java
- C++
Construct a key/value schema with
INLINE
encoding type.Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.INLINE
);
Alternatively, construct a key/value schema with
SEPARATED
encoding type.Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.SEPARATED
);
Produce messages using a key/value schema.
Producer<KeyValue<Integer, String>> producer = client.newProducer(kvSchema)
.topic(topicName)
.create();
final int key = 100;
final String value = "value-100";
// send the key/value message
producer.newMessage()
.value(new KeyValue(key, value))
.send();
Consume messages using a key/value schema.
Consumer<KeyValue<Integer, String>> consumer = client.newConsumer(kvSchema)
...
.topic(topicName)
.subscriptionName(subscriptionName).subscribe();
// receive key/value pair
Message<KeyValue<Integer, String>> msg = consumer.receive();
KeyValue<Integer, String> kv = msg.getValue();
Construct a key/value schema with
INLINE
encoding type.//Prepare keyValue schema
std::string jsonSchema =
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
SchemaInfo keySchema(JSON, "key-json", jsonSchema);
SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE);
Produce messages using a key/value schema.
//Create Producer
Producer producer;
client.createProducer("my-topic", ProducerConfiguration().setSchema(keyValueSchema), producer);
//Prepare message
std::string jsonData = "{\"re\":2.1,\"im\":1.23}";
KeyValue keyValue(std::move(jsonData), std::move(jsonData));
Message msg = MessageBuilder().setContent(keyValue).setProperty("x", "1").build();
//Send message
producer.send(msg);
Consume messages using a key/value schema.
//Create Consumer
Consumer consumer;
client.subscribe("my-topic", "my-sub", ConsumerConfiguration().setSchema(keyValueSchema), consumer);
//Receive message
Message message;
consumer.receive(message);
Avro
- Java
- C++
- Python
- Go
Suppose you have a SensorReading
class as follows, and you’d like to transmit it over a Pulsar topic.
public class SensorReading {
public float temperature;
public SensorReading(float temperature) {
this.temperature = temperature;
}
// A no-arg constructor is required
public SensorReading() {
}
public float getTemperature() {
return temperature;
}
public void setTemperature(float temperature) {
this.temperature = temperature;
}
}
Create a Producer<SensorReading>
(or Consumer<SensorReading>
) like this:
Producer<SensorReading> producer = client.newProducer(AvroSchema.of(SensorReading.class))
.topic("sensor-readings")
.create();
// Send messages
static const std::string exampleSchema =
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
Producer producer;
ProducerConfiguration producerConf;
producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
client.createProducer("topic-avro", producerConf, producer);
// Receive messages
static const std::string exampleSchema =
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
ConsumerConfiguration consumerConf;
Consumer consumer;
consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
client.subscribe("topic-avro", "sub-2", consumerConf, consumer)
You can declare an AvroSchema
using Python through one of the following methods.
Method 1: Record
Declare an AvroSchema
by passing a class that inherits from pulsar.schema.Record
and defines the fields as class variables.
class Example(Record):
a = Integer()
b = Integer()
producer = client.create_producer(
'avro-schema-topic',
schema=AvroSchema(Example))
r = Example(a=1, b=2)
producer.send(r)
consumer = client.subscribe(
'avro-schema-topic',
'sub',
schema=AvroSchema(Example))
msg = consumer.receive()
e = msg.value()
Method 2: JSON definition
Declare an
AvroSchema
using JSON. In this case, Avro schemas are defined using JSON.Below is an example of
AvroSchema
defined using a JSON file (company.avsc
).{
"doc": "this is doc",
"namespace": "example.avro",
"type": "record",
"name": "Company",
"fields": [
{"name": "name", "type": ["null", "string"]},
{"name": "address", "type": ["null", "string"]},
{"name": "employees", "type": ["null", {"type": "array", "items": {
"type": "record",
"name": "Employee",
"fields": [
{"name": "name", "type": ["null", "string"]},
{"name": "age", "type": ["null", "int"]}
]
}}]},
{"name": "labels", "type": ["null", {"type": "map", "values": "string"}]}
]
}
Load a schema definition from a file by using avro.schema or fastavro.schema.
If you use the JSON definition method to declare an
AvroSchema
, you need to:- Use Python dict to produce and consume messages, which is different from using the Record method.
- Set the value of the
_record_cls
parameter toNone
when generating anAvroSchema
object.
Example
from fastavro.schema import load_schema
from pulsar.schema import *
schema_definition = load_schema("examples/company.avsc")
avro_schema = AvroSchema(None, schema_definition=schema_definition)
producer = client.create_producer(
topic=topic,
schema=avro_schema)
consumer = client.subscribe(topic, 'test', schema=avro_schema)
company = {
"name": "company-name" + str(i),
"address": 'xxx road xxx street ' + str(i),
"employees": [
{"name": "user" + str(i), "age": 20 + i},
{"name": "user" + str(i), "age": 30 + i},
{"name": "user" + str(i), "age": 35 + i},
],
"labels": {
"industry": "software" + str(i),
"scale": ">100",
"funds": "1000000.0"
}
}
producer.send(company)
msg = consumer.receive()
# Users could get a dict object by `value()` method.
msg.value()
Suppose you have an avroExampleStruct
class as follows, and you’d like to transmit it over a Pulsar topic.
type avroExampleStruct struct {
ID int
Name string
}
Add an
avroSchemaDef
like this:var (
exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
)
Create producer and consumer to send/receive messages:
//Create producer and send message
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
Schema: pulsar.NewAvroSchema(exampleSchemaDef, nil),
})
msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: avroExampleStruct{
ID: 10,
Name: "avroExampleStruct",
},
})
//Create Consumer and receive message
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
Schema: pulsar.NewAvroSchema(exampleSchemaDef, nil),
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
message, err := consumer.Receive(context.Background())
JSON
- Java
- C++
- Python
- Go
Similar to using AvroSchema
, you can declare a JsonSchema
by passing a class. The only difference is to use JsonSchema
instead of AvroSchema
when defining the schema type, as shown below. For how to use AvroSchema
via record, see Method 1 - Record.
static class SchemaDemo {
public String name;
public int age;
}
Producer<SchemaDemo> producer = pulsarClient.newProducer(Schema.JSON(SchemaDemo.class))
.topic("my-topic")
.create();
Consumer<SchemaDemo> consumer = pulsarClient.newConsumer(Schema.JSON(SchemaDemo.class))
.topic("my-topic")
.subscriptionName("my-sub")
.subscribe();
SchemaDemo schemaDemo = new SchemaDemo();
schemaDemo.name = "puslar";
schemaDemo.age = 20;
producer.newMessage().value(schemaDemo).send();
Message<SchemaDemo> message = consumer.receive(5, TimeUnit.SECONDS);
To declare a JSON
schema using C++, do the following:
Pass a JSON string like this:
Std::string jsonSchema = R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
SchemaInfo schemaInfo = SchemaInfo(JSON, "JSON", jsonSchema);
Create a producer and use it to send messages.
client.createProducer("my-topic", ProducerConfiguration().setSchema(schemaInfo), producer);
std::string jsonData = "{\"re\":2.1,\"im\":1.23}";
producer.send(MessageBuilder().setContent(std::move(jsonData)).build());
Create consumer and receive message.
Consumer consumer;
client.subscribe("my-topic", "my-sub", ConsumerConfiguration().setSchema(schemaInfo), consumer);
Message msg;
consumer.receive(msg);
You can declare a JsonSchema
by passing a class that inherits from pulsar.schema.Record
and defines the fields as class variables. This is similar to using AvroSchema
. The only difference is to use JsonSchema
instead of AvroSchema
when defining schema type, as shown below. For how to use AvroSchema
via record, see (#method-1-record).
producer = client.create_producer(
'avro-schema-topic',
schema=JsonSchema(Example))
consumer = client.subscribe(
'avro-schema-topic',
'sub',
schema=JsonSchema(Example))
Suppose you have an avroExampleStruct
class as follows, and you’d like to transmit it as JSON form over a Pulsar topic.
type jsonExampleStruct struct {
ID int `json:"id"`
Name string `json:"name"`
}
- Add a
jsonSchemaDef
like this:
jsonSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
- Create a producer/consumer to send/receive messages:
//Create producer and send message
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
Schema: pulsar.NewJSONSchema(jsonSchemaDef, nil),
})
msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: jsonExampleStruct{
ID: 10,
Name: "jsonExampleStruct",
},
})
//Create Consumer and receive message
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
Schema: pulsar.NewJSONSchema(jsonSchemaDef, nil),
SubscriptionName: "my-sub",
Type: pulsar.Exclusive,
})
message, err := consumer.Receive(context.Background())
ProtobufNative
- Java
- C++
The following example shows how to create a producer/consumer with a ProtobufNative schema using Java.
Generate the
DemoMessage
class using Protobuf3 or later versions.syntax = "proto3";
message DemoMessage {
string stringField = 1;
double doubleField = 2;
int32 intField = 6;
TestEnum testEnum = 4;
SubMessage nestedField = 5;
repeated string repeatedField = 10;
proto.external.ExternalMessage externalMessage = 11;
}
Create a producer/consumer to send/receive messages.
Producer<DemoMessage> producer = pulsarClient.newProducer(Schema.PROTOBUF_NATIVE(DemoMessage.class))
.topic("my-topic")
.create();
Consumer<DemoMessage> consumer = pulsarClient.newConsumer(Schema.PROTOBUF_NATIVE(DemoMessage.class))
.topic("my-topic")
.subscriptionName("my-sub")
.subscribe();
SchemaDemo schemaDemo = new SchemaDemo();
schemaDemo.name = "puslar";
schemaDemo.age = 20;
producer.newMessage().value(DemoMessage.newBuilder().setStringField("string-field-value")
.setIntField(1).build()).send();
Message<DemoMessage> message = consumer.receive(5, TimeUnit.SECONDS);
The following example shows how to create a producer/consumer with a ProtobufNative schema.
Generate the
User
class using Protobuf3 or later versions.syntax = "proto3";
message User {
string name = 1;
int32 age = 2;
}
Include the
ProtobufNativeSchema.h
in your source code. Ensure the Protobuf dependency has been added to your project.#include <pulsar/ProtobufNativeSchema.h>
Create a producer to send a
User
instance.ProducerConfiguration producerConf;
producerConf.setSchema(createProtobufNativeSchema(User::GetDescriptor()));
Producer producer;
client.createProducer("topic-protobuf", producerConf, producer);
User user;
user.set_name("my-name");
user.set_age(10);
std::string content;
user.SerializeToString(&content);
producer.send(MessageBuilder().setContent(content).build());
Create a consumer to receive a
User
instance.ConsumerConfiguration consumerConf;
consumerConf.setSchema(createProtobufNativeSchema(User::GetDescriptor()));
consumerConf.setSubscriptionInitialPosition(InitialPositionEarliest);
Consumer consumer;
client.subscribe("topic-protobuf", "my-sub", consumerConf, consumer);
Message msg;
consumer.receive(msg);
User user2;
user2.ParseFromArray(msg.getData(), msg.getLength());
Protobuf
- Java
- C++
- Go
Constructing a protobuf schema using Java is similar to constructing a ProtobufNative
schema. The only difference is to use PROTOBUF
instead of PROTOBUF_NATIVE
when defining schema type as shown below.
Generate the
DemoMessage
class using Protobuf3 or later versions.syntax = "proto3";
message DemoMessage {
string stringField = 1;
double doubleField = 2;
int32 intField = 6;
TestEnum testEnum = 4;
SubMessage nestedField = 5;
repeated string repeatedField = 10;
proto.external.ExternalMessage externalMessage = 11;
}
Create a producer/consumer to send/receive messages.
Producer<DemoMessage> producer = pulsarClient.newProducer(Schema.PROTOBUF(DemoMessage.class))
.topic("my-topic")
.create();
Consumer<DemoMessage> consumer = pulsarClient.newConsumer(Schema.PROTOBUF(DemoMessage.class))
.topic("my-topic")
.subscriptionName("my-sub")
.subscribe();
SchemaDemo schemaDemo = new SchemaDemo();
schemaDemo.name = "puslar";
schemaDemo.age = 20;
producer.newMessage().value(DemoMessage.newBuilder().setStringField("string-field-value")
.setIntField(1).build()).send();
Message<DemoMessage> message = consumer.receive(5, TimeUnit.SECONDS);
Constructing a protobuf schema using C++ is similar to that using JSON
. The only difference is to use PROTOBUF
instead of JSON
when defining the schema type as shown below.
std::string jsonSchema =
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
SchemaInfo schemaInfo = SchemaInfo(pulsar::PROTOBUF, "PROTOBUF", jsonSchema);
Create a producer to send messages.
Producer producer;
client.createProducer("my-topic", ProducerConfiguration().setSchema(schemaInfo), producer);
std::string jsonData = "{\"re\":2.1,\"im\":1.23}";
producer.send(MessageBuilder().setContent(std::move(jsonData)).build());
Create a consumer to receive messages.
Consumer consumer;
client.subscribe("my-topic", "my-sub", ConsumerConfiguration().setSchema(schemaInfo), consumer);
Message msg;
consumer.receive(msg);
Suppose you have a protobufDemo
class as follows, and you’d like to transmit it in JSON form over a Pulsar topic.
type protobufDemo struct {
Num int32 `protobuf:"varint,1,opt,name=num,proto3" json:"num,omitempty"`
Msf string `protobuf:"bytes,2,opt,name=msf,proto3" json:"msf,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
Add a
protoSchemaDef
like this:var (
protoSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
"\"fields\":[{\"name\":\"num\",\"type\":\"int\"},{\"name\":\"msf\",\"type\":\"string\"}]}"
)
Create a producer/consumer to send/receive messages:
psProducer := pulsar.NewProtoSchema(protoSchemaDef, nil)
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "proto",
Schema: psProducer,
})
msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: &protobufDemo{
Num: 100,
Msf: "pulsar",
},
})
psConsumer := pulsar.NewProtoSchema(protoSchemaDef, nil)
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "proto",
SubscriptionName: "sub-1",
Schema: psConsumer,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
})
msg, err := consumer.Receive(context.Background())
Native Avro
This example shows how to construct a native Avro schema.
org.apache.avro.Schema nativeAvroSchema = … ;
Producer<byte[]> producer = pulsarClient.newProducer().topic("ingress").create();
byte[] content = … ;
producer.newMessage(Schema.NATIVE_AVRO(nativeAvroSchema)).value(content).send();
AUTO_PRODUCE
Suppose you have a Pulsar topic P, a producer processing messages from a Kafka topic K, an application reading the messages from K and writing the messages to P.
This example shows how to construct an AUTO_PRODUCE schema to verify whether the bytes produced by K can be sent to P.
Produce<byte[]> pulsarProducer = client.newProducer(Schema.AUTO_PRODUCE_BYTES())
…
.create();
byte[] kafkaMessageBytes = … ;
pulsarProducer.produce(kafkaMessageBytes);
AUTO_CONSUME
Suppose you have a Pulsar topic P and a consumer MySQL that receives messages from P, and you want to check if these messages have the information that your application needs to count.
This example shows how to construct an AUTO_CONSUME schema to verify whether the bytes produced by P can be sent to MySQL.
Consumer<GenericRecord> pulsarConsumer = client.newConsumer(Schema.AUTO_CONSUME())
…
.subscribe();
Message<GenericRecord> msg = consumer.receive() ;
GenericRecord record = msg.getValue();
record.getFields().forEach((field -> {
if (field.getName().equals("theNeedFieldName")) {
Object recordField = record.getField(field);
//Do some things
}
}));
Customize schema storage
By default, Pulsar stores various data types of schemas in Apache BookKeeper deployed alongside Pulsar. Alternatively, you can use another storage system if needed.
To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces before deploying custom schema storage:
Implement SchemaStorage
interface
The SchemaStorage
interface has the following methods:
public interface SchemaStorage {
// How schemas are updated
CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
// How schemas are fetched from storage
CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
// How schemas are deleted
CompletableFuture<SchemaVersion> delete(String key);
// Utility method for converting a schema version byte array to a SchemaVersion object
SchemaVersion versionFromBytes(byte[] version);
// Startup behavior for the schema storage client
void start() throws Exception;
// Shutdown behavior for the schema storage client
void close() throws Exception;
}
tip
For a complete example of schema storage implementation, see the BookKeeperSchemaStorage class.
Implement SchemaStorageFactory
interface
The SchemaStorageFactory
interface has the following method:
public interface SchemaStorageFactory {
@NotNull
SchemaStorage create(PulsarService pulsar) throws Exception;
}
tip
For a complete example of schema storage factory implementation, see the BookKeeperSchemaStorageFactory class.
Deploy custom schema storage
To use your custom schema storage implementation, perform the following steps.
Package the implementation in a JAR file.
Add the JAR file to the
lib
folder in your Pulsar binary or source distribution.Change the
schemaRegistryStorageClassName
configuration in theconf/broker.conf
file to your custom factory class.Start Pulsar.