本章主要介绍 Pulsar schema 及其重要性。
Schema 管理
对于围绕消息总线如pulsar搭建的应用来说,类型安全非常的重要。
Producer 和 consumer 需要某种机制,以在主题级别上协调类型,从而避免出现各种潜在问题。 例如,序列化和反序列化问题。
应用程序通常采用以下方法来保证消息传递中的类型安全。 这两种方法都被Pulsar支持,你可以在topic的基础上,自由选择采用哪一种,或者混用。
客户端方法
Producer 和 consumer 不仅负责序列化和反序列化消息(由原始字节组成),还负责“确认”topic 正在传输的类型。
如果 producer 正在发送关于topic-1
的温度传感器数据,则该 topic 的 consumer 在尝试解析数据为湿度传感器读数时会遇到问题。
Producer 和 consumer 可以发送并接收包含原始字节数组的消息,并在“带外”基础上将所有类型的安全强制措施留给应用程序。
服务器端方法
Producer 和 consumer 通知系统可以通过该 topic 传输哪些类型的数据。
通过这种方法,消息系统强制执行类型安全, 并确保生产者和消费者保持同步。
Pulsar has a built-in schema registry that enables clients to upload data schemas on a per-topic basis. 这些schema显示了,topic可以识别哪些数据类型为有效。
为什么使用 schema
启用 schema 后,Pulsar 会解析数据,即接收字节作为输入并发送字节作为输出。 虽然数据不仅是字节,但的确需要解析这些数据,解析时还可能发生解析异常,解析异常主要出现在以下几种情况中:
字段不存在
字段类型已更改(例如,将
string
更改为int
)
有几种方法可以避免并解决这些异常,例如,在解析错误时捕获异常会造成代码难以维护;采用 schema 管理系统来进化 schema,而不破坏下游应用程序;最大程度执行所使用语言的类型安全,Pulsar Schema 能够解决上述问题。
在创建并处理从简单类型(如:string
)到更复杂的因应用程序而异的数据类型时,通过 Pulsar schema 可以使用特定语言的数据类型。
示例
You can use the User class to define the messages sent to Pulsar topics.
public class User {
String name;
int age;
}
When constructing a producer with the User class, you can specify a schema or not as below.
无 Schema 的情况
在不指定 schema 的情况下创建 producer,则 producer 只能 produce byte[]
类型的消息。 如果有 POJO 类,则需要在发送消息前将 POJO 序列化为字节。
示例
Producer<byte[]> producer = client.newProducer()
.topic(topic)
.create();
User user = new User(“Tom”, 28);
byte[] message = … // serialize the `user` by yourself;
producer.send(message);
有 Schema 的情况
如果通过指定 schema 来创建 producer,则可以直接将类发送到 topic,而无需考虑如何将 POJO 序列化为字节。
示例
This example constructs a producer with the JSONSchema, and you can send the User class to topics directly without worrying about how to serialize it into bytes.
Producer<User> producer = client.newProducer(JSONSchema.of(User.class))
.topic(topic)
.create();
User user = new User(“Tom”, 28);
producer.send(user);
总结
在使用 schema 创建 producer 时,不需要将消息序列化为字节,Pulsar cchema 会在后台执行此操作。