Manage schema
本教程介绍了管理 Schema 的方法:
自动管理
手动管理
Schema 自动更新
如果某个 schema 通过了 schema 兼容性检查,Pulsar producer 就会自动将此 schema 更新为 topic 默认创建的 schema。
Producer 的自动更新
Producer 会在以下情况中执行AutoUpdate
(自动更新):
If a topic doesn’t have a schema, Pulsar registers a schema automatically.
If a topic has a schema:
If a producer doesn’t carry a schema:
If
isSchemaValidationEnforced
orschemaValidationEnforced
is disabled in the namespace to which the topic belongs, the producer is allowed to connect to the topic and produce data.If
isSchemaValidationEnforced
orschemaValidationEnforced
is enabled in the namespace to which the topic belongs, the producer is rejected and disconnected.
If a producer carries a schema:
Broker 根据 topic 所属命名空间中已配置的兼容性检查策略执行兼容性检查。
已注册 schema,producer 将连接到 broker。
未注册 schema:
If
isAllowAutoUpdateSchema
sets to false, the producer is rejected to connect to a broker.If
isAllowAutoUpdateSchema
sets to true:Schema 通过了兼容性检查,则 broker 主动为该 topic 注册一个新的 schema,并连接到 producer。
Schema 未通过兼容性检查,则 broker 不会注册 schema,并且 producer 连接到 broker 的请求被拒。
Consumer 的自动更新
Consumer 会在以下情况中AutoUpdate
(自动更新):
If a consumer connects to a topic without a schema (which means the consumer receiving raw bytes), the consumer can connect to the topic successfully without doing any compatibility check.
If a consumer connects to a topic with a schema.
Topic 中不同时包含 schema / 数据 / 本地 consumer 和本地 producer:
If
isAllowAutoUpdateSchema
sets to true, then the consumer registers a schema and it is connected to a broker.If
isAllowAutoUpdateSchema
sets to false, then the consumer is rejected to connect to a broker.
Topic 中包含 schema / 数据 / 本地 consumer 和本地 producer 中的一个,则执行 schema 兼容性检查。
Schema 通过兼容性检查,则 consumer 连接到 broker。
Schema 未通过兼容性检查,则拒绝 consumer 连接到 broker。
管理自动更新策略
可以使用 pulsar-admin
命令来管理 自动更新
策略,如下所示:
启用自动更新
可以使用 pulsar-admin
命令在命名空间上启用 AutoUpdate
。
bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable tenant/namespace
禁用自动更新
要在名称空间上禁用AutoUpdate
,可以使用pulsar-admin
命令。
bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace
禁用AutoUpdate
后,你只能使用pulsar-admin
命令注册新的架构。
调整兼容性
要调整名称空间上的架构兼容性级别,可以使用pulsar-admin
命令。
bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility <compatibility-level> tenant/namespace
Schema 验证
By default, schemaValidationEnforced
is disabled for producers:
没有 schema 的 producer 可以使用 schema 向 topic 中生成任何类型的消息,这可能会导致 topic 中有垃圾数据。
这样,不支持 schema 的非 Java 语言客户端就可以使用 schema 向 topic 生成消息。
但是,如果你希望对具有架构的主题有更强的保证,则可以在整个群集中或在每个命名空间的基础上启用schemaValidationEnforced
。
启用 Schema 验证
要在名称空间上启用schemaValidationEnforced
,可以使用pulsar-admin
命令。
bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace
禁用 Schema 验证
要在名称空间上禁用schemaValidationEnforced
,可以使用pulsar-admin
命令。
bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace
Schema 手动管理
若要管理架构,可以使用以下方法之一。
Method | 说明 |
---|---|
Admin CLI | 可以使用pulsar-admin 工具来管理 Pulsar schema、broker、集群、source、sink、topic、租户等。更多如何使用 |
REST API | Pulsar 在 admin RESTful API 中暴露了与 schema 相关的管理 API。 可以直接访问 admin RESTful 端点来管理 schema。 关于如何使用 Pulsar REST API 的更多信息,可参阅 此处。 |
Java Admin API | Pulsar 提供 Java 管理库。 |
上传 Schema
To upload (register) a new schema for a topic, you can use one of the following methods.
Admin CLI
REST API
Java Admin API
Use the upload
subcommand.
$ pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>
The schema-definition-file
is in JSON format.
{ "type": "<schema-type>", "schema": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}
The schema-definition-file
includes the following fields:
字段 | 说明 |
---|---|
| Schema 类型。 |
| Schema 定义数据,编码格式为 UTF 8 字符集。
|
| 与 schema 相关的其他属性。 |
Here are examples of the schema-definition-file
for a JSON schema.
示例 1
{ "type": "JSON", "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":\"string\",\"default\":null},{\"name\":\"file3\",\"type\":[\"null\",\"string\"],\"default\":\"dfdf\"}]}", "properties": {}}
示例 2
{ "type": "STRING", "schema": "", "properties": { "key1": "value1" }}
向此端点发送POST
请求:POST /admin/v2/schemas/:tenant/:namespace/:topic/schema
The post payload is in JSON format.
{ "type": "<schema-type>", "schema": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}
The post payload includes the following fields:
字段 | 说明 |
---|---|
| Schema 类型。 |
| Schema 定义数据,编码格式为 UTF 8 字符集。
|
| 与 schema 相关的其他属性。 |
void createSchema(String topic, PostSchemaPayload schemaPayload)
The PostSchemaPayload
includes the following fields:
字段 | 说明 |
---|---|
| Schema 类型。 |
| Schema 定义数据,编码格式为 UTF 8 字符集。
|
| 与 schema 相关的其他属性。 |
Here is an example of PostSchemaPayload
:
PulsarAdmin admin = …;PostSchemaPayload payload = new PostSchemaPayload();payload.setType("INT8");payload.setSchema("");admin.createSchema("my-tenant/my-ns/my-topic", payload);
获取 Schema(最新版本)
To get the latest schema for a topic, you can use one of the following methods.
Admin CLI
REST API
Java Admin API
使用 get
子命令。
$ pulsar-admin schemas get <topic-name>{ "version": 0, "type": "String", "timestamp": 0, "data": "string", "properties": { "property1": "string", "property2": "string" }}
向此端点发送GET
请求:GET /admin/v2/schemas/:tenant/:namespace/:topic/schema
Here is an example of a response, which is returned in JSON format.
{ "version": "<the-version-number-of-the-schema>", "type": "<the-schema-type>", "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>", "data": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}
The response includes the following fields:
字段 | 说明 |
---|---|
| Schema 版本,是一个长数。 |
| Schema 类型。 |
| 创建此版本 schema 的时间戳。 |
| Schema 定义数据,编码格式为 UTF 8 字符集。
|
| 与 schema 相关的其他属性。 |
SchemaInfo createSchema(String topic)
The SchemaInfo
includes the following fields:
字段 | 说明 |
---|---|
| Schema 名称。 |
| Schema 类型。 |
| Schema 定义数据的字节数组,以 UTF 8 字符集编码。 如果该 schema 是“primitive” schema,则此字节数组应为空。 如果该 schema 是 struct ** schema,则此字段应为由 Avro schema 定义的 JSON 字符串转换成的字节数组。 |
|
Here is an example of SchemaInfo
:
PulsarAdmin admin = …;SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic");
获取 schema(详细信息)
To get a specific version of a schema, you can use one of the following methods.
Admin CLI
REST API
Java Admin API
使用 get
子命令。
$ pulsar-admin schemas get <topic-name> --version=<version>
向 Schema 端点发送GET
请求: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema/:version
Here is an example of a response, which is returned in JSON format.
{ "version": "<the-version-number-of-the-schema>", "type": "<the-schema-type>", "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>", "data": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}
The response includes the following fields:
字段 | 说明 |
---|---|
| Schema 版本,是一个长数。 |
| Schema 类型。 |
| 创建此版本 schema 的时间戳。 |
| Schema 定义数据,编码格式为 UTF 8 字符集。
|
| 与 schema 相关的其他属性。 |
SchemaInfo createSchema(String topic, long version)
The SchemaInfo
includes the following fields:
字段 | 说明 |
---|---|
| Schema 名称。 |
| Schema 类型。 |
| Schema 定义数据的字节数组以 UTF 8 字符集编码。 如果该 schema 是“primitive” schema,则此字节数组应为空。 如果该 schema 是 struct ** schema,则此字段应为由 Avro schema 定义的 JSON 字符串转换成的字节数组。 |
|
Here is an example of SchemaInfo
:
PulsarAdmin admin = …;SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);
解压缩 schema
To provide a schema via a topic, you can use the following method.
Admin CLI
Use the extract
subcommand.
$ pulsar-admin schemas extract --classname <class-name> --jar <jar-path> --type <type-name>
删除 schema
To delete a schema for a topic, you can use one of the following methods.
Note
In any case, the delete action deletes all versions of a schema registered for a topic.
Admin CLI
REST API
Java Admin API
使用 delete
子命令。
$ pulsar-admin schemas delete <topic-name>
向 Schema 端点发送DELETE
请求:DELETE /admin/v2/schemas/:tenant/:namespace/:topic/schema
Here is an example of a response, which is returned in JSON format.
{ "version": "<the-latest-version-number-of-the-schema>",}
The response includes the following field:
字段 | 说明 |
---|---|
version | Schema 版本,是一个长数。 |
void deleteSchema(String topic)
Here is an example of deleting a schema.
PulsarAdmin admin = …;admin.deleteSchema("my-tenant/my-ns/my-topic");
自定义schema存储
By default, Pulsar stores various data types of schemas in Apache BookKeeper deployed alongside Pulsar.
However, you can use another storage system if needed.
实现
To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces:
SchemaStorage 接口
SchemaStorage
接口包含以下方法:
public interface SchemaStorage {
// 如何更新schema
CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
// 如何从存储中获取schema
CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
// 如何删除schema
CompletableFuture<SchemaVersion> delete(String key);
// 用户将schema字节数据转换为SchemaVersion 对象的工具方法
SchemaVersion versionFromBytes(byte[] version);
// 启动schema存储客户端
void start() throws Exception;
// 关闭schema存储客户端
void close() throws Exception;
}
提示
For a complete example of schema storage implementation, see BookKeeperSchemaStorage class.
SchemaStorageFactory接口
The SchemaStorageFactory
interface has the following method:
public interface SchemaStorageFactory {
@NotNull
SchemaStorage create(PulsarService pulsar) throws Exception;
}
提示
For a complete example of schema storage factory implementation, see BookKeeperSchemaStorageFactory class.
部署
To use your custom schema storage implementation, perform the following steps.
将实现打包到 JAR 文件中。
将 JAR 文件添加到 Pulsar 二进制包或源码中的
lib
文件夹。将
broker.conf
中的schemaRegistryStorageClassName
配置更改为自定义工厂类。启动 Pulsar。
设置 schema 兼容性检查策略
You can set schema compatibility check strategy at namespace or broker level.
If you set schema compatibility check strategy at both namespace or broker level, it uses the strategy set for the namespace level.
If you do not set schema compatibility check strategy at both namespace or broker level, it uses the
FULL
strategy.If you set schema compatibility check strategy at broker level rather than namespace level, it uses the strategy set for the broker level.
If you set schema compatibility check strategy at namespace level rather than broker level, it uses the strategy set for the namespace level.
命名空间(Namespace)
You can set schema compatibility check strategy at namespace level using one of the following methods.
pulsar-admin
REST API
Java
Use the pulsar-admin namespaces set-schema-compatibility-strategy command.
pulsar-admin namespaces set-schema-compatibility-strategy options
Send a PUT
request to this endpoint: PUT /admin/v2/namespaces/:tenant/:namespace
Use the setSchemaCompatibilityStrategymethod.
admin.namespaces().setSchemaCompatibilityStrategy("test", SchemaCompatibilityStrategy.FULL);
Broker
You can set schema compatibility check strategy at broker level by setting schemaCompatibilityStrategy
in broker.conf or standalone.conf file.
示例
schemaCompatibilityStrategy=ALWAYS_INCOMPATIBLE