Schema evolution and compatibility

通常情况下,Schema 不会在很长时间内保持不变。 相反地,它们会不断更新,以满足新的需求。

本章将探讨 Pulsar Schema 如何演化以及 Pulsar Schema 的兼容性检查策略。

Schema 演化

SchemaInfo 的数据结构中定义 Pulsar schema 。

每个用 topic 存储的 SchemaInfo 都有一个版本。 版本信息用于管理 topic 内发生的 schema 更改。

使用 SchemaInfo 生成的消息用 schema 版本进行标记。 当消息被 Pulsar 客户端 consume 时,Pulsar 客户端可以使用 schema 版本检索相应的 SchemaInfo,并使用正确的 schema 信息来反序列化数据。

什么是 Schema 演化?

属性和类型的详细信息存储在 Schemas 中。 To satisfy new business requirements, you need to update schemas inevitably over time, which is called schema evolution.

Schema 的任何变化都会影响到下游 consumers。 Schema 演化确保了下游 consumers 能够无缝地处理以旧 schemas 和以新 schemas 编码的数据。

Pulsar schema 应如何演化?

答案就是 Pulsar schema 兼容性检查策略。 这一策略决定了 schema 如何将 topics 中的新、旧 schema 进行比较。

了解更多信息,请参阅 Schema 兼容性检查策略

Pulsar 对 Schema 演化提供了怎样的支持?

  1. 当 producer/consumer/ 阅读器连接到 broker 时,broker 会部署由 schemaRegistryCompatibilityCheckers 配置的 schema 兼容性检查器来进行 schema 兼容性检查。

    Schema 兼容性检查器是每个 schema 类型都有的实例。

    目前,Avro 和 JSON 有自己的兼容性检查器,所有其他 schema 类型共享默认兼容性检查器,而此默认检查器禁用 schema 演化。

  2. Producer/consumer/ 阅读器将其客户端 SchemaInfo 发送到 broker。

  3. Broker 知道 schema 类型,并找到此类型的 schema 兼容性检查器。

  4. Broker 使用检查器,以兼容性检查策略来检查 SchemaInfo 是否与 topic 的最新 schema 兼容。

    目前,兼容性检查策略是在命名空间级别配置的,适用于命名空间中的所有 topics。

Schema 兼容性检查策略

Pulsar 有 8 种 schema 兼容性检查策略,如下所示。

假设某 topic 包含三个 schema(V1、V2 和 V3),V1 是最旧的,V3 是最新的:

兼容性检查策略

定义

更改

检查 Schema 版本

优先升级

ALWAYS_COMPATIBLE

禁用 schema 兼容性检查。

允许所有更改

所有旧版本

任何请求

ALWAYS_INCOMPATIBLE

禁用 schema 演化。

禁用更改

BACKWARD

使用 schema V3 的 consumer 可以处理 producer 使用 schema V3 或 V2 编写的数据。

  • 添加可选字段

  • 删除字段

最新版本

Consumers

BACKWARD_TRANSITIVE

使用 schema V3 的 consumer 可以处理 producer 使用 schema V3、V2 或 V1 编写的数据。

  • 添加可选字段

  • 删除字段

所有旧版本

Consumers

FORWARD

使用 schema V3 或 V2 的 consumer 可以处理 producer 使用 schema V3 编写的数据。

  • 添加字段

  • 删除可选字段

最新版本

Producers

FORWARD_TRANSITIVE

使用 schema V3、V2 或 V1 的 consumer 可以处理 producer 使用 schema V3 编写的数据。

  • 添加字段

  • 删除可选字段

所有旧版本

Producers

FULL

Schema V3 和 V2 之间向后和向前兼容。

  • 修改可选字段

最新版本

任何请求

FULL_TRANSITIVE

Schema V3、V2 和 V1 之间向后和向前兼容。

  • 修改可选字段

所有旧版本

任何请求

ALWAYS_COMPATIBLE 和 ALWAYS_INCOMPATIBLE

兼容性检查策略

定义

Note

ALWAYS_COMPATIBLE

禁用 schema 兼容性检查。

ALWAYS_INCOMPATIBLE

禁用 schema 演化,即不允许 schema 更改。

  • 除 Avro 和 JSON 以外所有类型的 schema,其默认 schema 兼容性检查策略为 ALWAYS_INCOMPATIBLE

  • Avro 和 JSON 的默认 schema 兼容性检查策略为 `FULL’。

示例

  • 示例 1

    在某些情况下,应用程序需要将几种不同类型的事件存储在同一个 Pulsar topic 中。

    尤其是在 事件来源 风格中开发数据模型时,这种情况下会遇到多种影响实体状态的事件。

    例如,对于用户实体来说,有 用户创建用户地址变更用户查询已收到 等事件。 应用程序要求按相同顺序读取这些事件。

    因此,这些事件需要进入同一个 Pulsar 分区以保证顺序。 应用程序可以使用 ALWAYS_COMPATIBLE 策略,将不同类型的事件存储在同一个 topic 中。

  • 示例 2

    有时,我们需要将兼容状态更改为不兼容状态。

    For example, you are modifying a field type from string to int.

    在这种情况下,你需要:

    • 同时更新 producer 和 consumer 版本更新至新 schema 版本。

    • 可选操作,创建一个新的 topic,并迁移应用程序以使用新 topic 和新 schema, 避免在同一 topic 中处理两个不兼容版本的操作。

BACKWARD 和 BACKWARD_TRANSITIVE

假设某 topic 包含三个 schema(V1、V2 和 V3),V1 是最旧的,V3 是最新的:

兼容性检查策略定义说明

BACKWARD | Consumers using the new schema can process data written by producers using the last schema. | 使用 schema V3 的 consumer 可以处理 producer 使用 schema V3 或 V2 写入的数据。 | BACKWARD_TRANSITIVE | Consumers using the new schema can process data written by producers using all previous schemas. | 使用 schema V3 的 consumer 可以处理 producer 使用 schema V3、V2 或 V1 写入的数据。 |

示例

  • 示例 1

    删除字段。

    为处理不含某一字段的事件而构建的 consumer 可以处理旧 schema 编写的、包含此字段的事件,consumer 在处理时会忽略这一字段。

  • 示例 2

    想要将所有 Pulsar 数据加载到 Hive 数据仓库中,并进行 SQL 查询。

    即使数据被更改,仍然可以进行相同的 SQL 查询。 可以通过 BACKWARD 策略演化 schema 以实现上述操作。

FORWARD 和 FORWARD_TRANSITIVE

假设某 topic 包含三个 schema(V1、V2 和 V3),V1 是最旧的,V3 是最新的:

兼容性检查策略定义说明

FORWARD | Consumers using the last schema can process data written by producers using a new schema, even though they may not be able to use the full capabilities of the new schema. | 使用schema V3 或 V2 的 consumer 可以处理 producer 使用 schema V3 写入的数据。 | FORWARD_TRANSITIVE | Consumers using all previous schemas can process data written by producers using a new schema. | 使用 schema V3、V2 或 V1 的 consumer 可以处理 producer 使用 schema V3 写入的数据。

示例

  • 示例 1

    添加字段。

    在大多数数据格式中,写入 consumer 的进程事件中如果不包含新字段,则该 consumer 在收到包含新字段的新事件后仍然可以写入。

  • 示例 2

    如果 consumer 的一个应用程序逻辑绑定了一个完整版本的 schema ,则当 schema 演化时,应用程序逻辑可能无法立即更新。

    在这种情况下,需要用新 schema 将数据投射到应用程序可以理解的旧 schema 上。

    因此,可以使用 FORWARD 策略来演化 schema,以确保旧 schema 能够处理新schema 编码的数据。

FULL 和 FULL_TRANSITIVE

假设某 topic 包含三个 schema(V1、V2 和 V3),V1 是最旧的,V3 是最新的:

兼容性检查策略

定义

说明

Note

FULL

Schema 既向后兼容,也向前兼容,也就是说:

使用最新 schema 的 consumer 可以处理 producer 使用新 schema 编写的数据。

并且

使用 schema 的 consumer 可以处理 producer 使用最新 schema 编写的数据。

使用 schema V3 的 consumer 可以处理 producer 使用 schema V3 或 V2 编写的数据。

并且

使用 schema V3 或 V2 的 consumer 可以处理 producer 使用 schema V3 编写的数据。

  • Avro 和 JSON 的默认 schema 兼容性检查策略为 `FULL’。

  • 除 Avro 和 JSON 以外所有类型的 schema,其默认 schema 兼容性检查策略为 ALWAYS_INCOMPATIBLE

FULL_TRANSITIVE

新的 schema 可以与所有注册过的 schema 向后或向前兼容。

使用 schema V3 的 consumer 可以处理 producer 使用 schema V3、V2 或 V1 编写的数据。

并且

使用 schema V3、V2 或 V1 的 consumer 可以处理 producer 使用 schema V3 编写的数据。

示例

在某些数据格式中(例如:Avro),可以使用默认值来定义字段。 这样,添加或删除这些字段的更改是完全可兼容的。

Schema 验证

当 producer 或 consumer 尝试连接到某个 topic 时,broker 会进行检查以验证 schema。

生产者(Producer)

当 producer 试图连接到某个 topic 时(假定忽略 schema 自动创建),broker 会进行以下检查:

  • 检查 producer 携带的 schema 是否存在于 schema 注册表中。

    • 如果 schema 已经注册,那么 producer 就与 broker 连接,并 produce 与此 schema 相关的消息。

    • 如果 schema 没有注册,那么 Pulsar 会验证是否允许基于配置的兼容性检查策略对 schema 进行注册。

消费者(Consumer)

当 consumer 尝试连接到某个 topic 时,broker 会根据配置的 schema 兼容性检查策略来检查 consumer 所携带的 schema 是否与注册的 schema 兼容。

兼容性检查策略

检查逻辑

ALWAYS_COMPATIBLE

全部通过

ALWAYS_INCOMPATIBLE

不能通过

BACKWARD

可读取上一个 schema

BACKWARD_TRANSITIVE

可以读取所有 schemas

FORWARD

可读取上一个 schema

FORWARD_TRANSITIVE

可读取上一个 schema

FULL

可读取上一个 schema

FULL_TRANSITIVE

可以读取所有 schemas

升级客户端的顺序

升级客户端应用程序的顺序由兼容性检查策略决定

比如,当producers使用schemas来将数据写入Pulsar,并且consumers使用schemas来从Pulsar读取数据。

兼容性检查策略

优先升级

说明

ALWAYS_COMPATIBLE

任何请求

兼容性检查被禁用。

因此,您可以以任何顺序升级producers与consumers

ALWAYS_INCOMPATIBLE

schema演化被禁用

  • BACKWARD

  • BACKWARD_TRANSITIVE

Consumers

不保证使用旧schema的consumer能读取使用新schema的producer产生的数据。

因此,优先升级所有consumers,然后再开始生产新的数据

  • FORWARD

  • FORWARD_TRANSITIVE

Producers

不保证使用新schema的consumers能读取使用旧schema的producer产生的数据。

因此,优先升级所有producers,让它们用上新的schema,并且保证先前使用旧schema生成的数据不再对consumers可用,然后再去升级consumers

  • FULL

  • FULL_TRANSITIVE

任何请求

不保证使用旧schema的consumers能读取使用新schema生产的数据,或使用新schema的consumers可以读取旧schema生产的数据

因此,您可以以任何顺序升级producers与consumers