Custom schema storage

默认情况下,Pulsar 在 Apache BookKeeper (与 Pulsar 一起集成部署) 中存储数据类型 schema。 但是, 如果愿意, 你可以使用其他存储系统。 此文档引导你创建自己的schema存储实现。

为了使用Pulsar schema的非默认 (即非BookKeeper) 存储系统, 您需要实现两个 Java 接口: SchemaStorageSchemaStorageFactory

SchemaStorage 接口

SchemaStorage接口包含以下方法:

  1. public interface SchemaStorage {
  2. // 如何更新schema
  3. CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
  4. // 如何从存储中获取schema
  5. CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
  6. // 如何删除schema
  7. CompletableFuture<SchemaVersion> delete(String key);
  8. // 用户将schema字节数据转换为SchemaVersion 对象的工具方法
  9. SchemaVersion versionFromBytes(byte[] version);
  10. // 启动schema存储客户端
  11. void start() throws Exception;
  12. // 关闭schema存储客户端
  13. void close() throws Exception;
  14. }

有关完整的schema存储实现示例, 请参见 BookKeeperSchemaStorage 类。

SchemaStorageFactory接口

  1. public interface SchemaStorageFactory {
  2. @NotNull
  3. SchemaStorage create(PulsarService pulsar) throws Exception;
  4. }

有关完整的schema storage factory实现例子,请参考BookKeeperSchemaStorageFactory类。

部署

为了适应你自定义的schema存储实现,你需要:

  1. 将实现打包到 JAR 文件中。
  2. 将jar添加到Pulsar二进制或源码lib文件夹中。
  3. broker.conf配置中将schemaRegistryStorageClassName修改为你自定义的factory类(即SchemaStorageFactory的实现,而不是SchemaStorage的实现)。
  4. 启动Pulsar。