Custom schema storage
默认情况下,Pulsar 在 Apache BookKeeper (与 Pulsar 一起集成部署) 中存储数据类型 schema。 但是, 如果愿意, 你可以使用其他存储系统。 此文档引导你创建自己的schema存储实现。
为了使用Pulsar schema的非默认 (即非BookKeeper) 存储系统, 您需要实现两个 Java 接口: SchemaStorage 和 SchemaStorageFactory。
SchemaStorage 接口
SchemaStorage
接口包含以下方法:
public interface SchemaStorage {
// 如何更新schema
CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
// 如何从存储中获取schema
CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
// 如何删除schema
CompletableFuture<SchemaVersion> delete(String key);
// 用户将schema字节数据转换为SchemaVersion 对象的工具方法
SchemaVersion versionFromBytes(byte[] version);
// 启动schema存储客户端
void start() throws Exception;
// 关闭schema存储客户端
void close() throws Exception;
}
有关完整的schema存储实现示例, 请参见 BookKeeperSchemaStorage 类。
SchemaStorageFactory接口
public interface SchemaStorageFactory {
@NotNull
SchemaStorage create(PulsarService pulsar) throws Exception;
}
有关完整的schema storage factory实现例子,请参考BookKeeperSchemaStorageFactory类。
部署
为了适应你自定义的schema存储实现,你需要:
- 将实现打包到 JAR 文件中。
- 将jar添加到Pulsar二进制或源码的
lib
文件夹中。 - 在broker.conf配置中将
schemaRegistryStorageClassName
修改为你自定义的factory类(即SchemaStorageFactory
的实现,而不是SchemaStorage
的实现)。 - 启动Pulsar。