本章主要介绍 Pulsar schema 及其重要性。

Schema 管理

对于围绕消息总线如pulsar搭建的应用来说,类型安全非常的重要。

Producer 和 consumer 需要某种机制,以在主题级别上协调类型,从而避免出现各种潜在问题。 例如,序列化和反序列化问题。

应用程序通常采用以下方法来保证消息传递中的类型安全。 这两种方法都被Pulsar支持,你可以在topic的基础上,自由选择采用哪一种,或者混用。

客户端方法

Producer 和 consumer 不仅负责序列化和反序列化消息(由原始字节组成),还负责“确认”topic 正在传输的类型。

如果 producer 正在发送关于topic-1的温度传感器数据,则该 topic 的 consumer 在尝试解析数据为湿度传感器读数时会遇到问题。

Producer 和 consumer 可以发送并接收包含原始字节数组的消息,并在“带外”基础上将所有类型的安全强制措施留给应用程序。

服务器端方法

Producer 和 consumer 通知系统可以通过该 topic 传输哪些类型的数据。

通过这种方法,消息系统强制执行类型安全, 并确保生产者和消费者保持同步。

Pulsar has a built-in schema registry that enables clients to upload data schemas on a per-topic basis. 这些schema显示了,topic可以识别哪些数据类型为有效。

为什么使用 schema

启用 schema 后,Pulsar 会解析数据,即接收字节作为输入并发送字节作为输出。 虽然数据不仅是字节,但的确需要解析这些数据,解析时还可能发生解析异常,解析异常主要出现在以下几种情况中:

  • 字段不存在

  • 字段类型已更改(例如,将 string 更改为 int

有几种方法可以避免并解决这些异常,例如,在解析错误时捕获异常会造成代码难以维护;采用 schema 管理系统来进化 schema,而不破坏下游应用程序;最大程度执行所使用语言的类型安全,Pulsar Schema 能够解决上述问题。

在创建并处理从简单类型(如:string)到更复杂的因应用程序而异的数据类型时,通过 Pulsar schema 可以使用特定语言的数据类型。

示例

You can use the User class to define the messages sent to Pulsar topics.

  1. public class User {
  2. String name;
  3. int age;
  4. }

When constructing a producer with the User class, you can specify a schema or not as below.

无 Schema 的情况

在不指定 schema 的情况下创建 producer,则 producer 只能 produce byte[] 类型的消息。 如果有 POJO 类,则需要在发送消息前将 POJO 序列化为字节。

示例

  1. Producer<byte[]> producer = client.newProducer()
  2. .topic(topic)
  3. .create();
  4. User user = new User(“Tom”, 28);
  5. byte[] message = // serialize the `user` by yourself;
  6. producer.send(message);

有 Schema 的情况

如果通过指定 schema 来创建 producer,则可以直接将类发送到 topic,而无需考虑如何将 POJO 序列化为字节。

示例

This example constructs a producer with the JSONSchema, and you can send the User class to topics directly without worrying about how to serialize it into bytes.

  1. Producer<User> producer = client.newProducer(JSONSchema.of(User.class))
  2. .topic(topic)
  3. .create();
  4. User user = new User(“Tom”, 28);
  5. producer.send(user);

总结

在使用 schema 创建 producer 时,不需要将消息序列化为字节,Pulsar cchema 会在后台执行此操作。