默认情况下, Pulsar在 Apache BookKeeper (与脉冲星一起部署) 中存储数据类型 schema。 但是, 如果愿意, 你可以使用其他存储系统。 此文档引导你创建自己的schema存储实现。
为了使用Pulsar schema的非默认 (即非BookKeeper) 存储系统, 您需要实现两个 Java 接口: SchemaStorage
和 SchemaStorageFactory
。
SchemaStorage 接口
SchemaStorage
接口包含以下方法:
public interface SchemaStorage {
// 如何更新schema
CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
// 如何从存储中获取schema
CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
// 如何删除schema
CompletableFuture<SchemaVersion> delete(String key);
// 用户将schema字节数据转换为SchemaVersion 对象的工具方法
SchemaVersion versionFromBytes(byte[] version);
// 启动schema存储客户端
void start() throws Exception;
// 关闭schema存储客户端
void close() throws Exception;
}
For a full-fledged example schema storage implementation, see the
BookKeeperSchemaStorage
class.
SchemaStorageFactory接口
public interface SchemaStorageFactory {
@NotNull
SchemaStorage create(PulsarService pulsar) throws Exception;
}
有关完整的schema storage factory实现例子,请参考
BookKeeperSchemaStorageFactory
类。
部署
为了适应你自定义的schema存储实现,你需要:
- 将实现打包到 JAR 文件中。
- 将jar添加到Pulsar二进制或源码的
lib
文件夹中。 - 在
broker.conf
配置中将schemaRegistryStorageClassName
修改为你自定义的factory类(即SchemaStorageFactory
的实现,而不是SchemaStorage
的实现)。 - 启动Pulsar。
当前内容版权归 Apache Pulsar 或其关联方所有,如需对内容或内容相关联开源项目进行关注与资助,请访问 Apache Pulsar .