Custom schema storage

By default, Pulsar stores data type schemas in Apache BookKeeper (which is deployed alongside Pulsar). You can, however, use another storage system if you wish. This doc walks you through creating your own schema storage implementation.

In order to use a non-default (i.e. non-BookKeeper) storage system for Pulsar schemas, you need to implement two Java interfaces: SchemaStorage and SchemaStorageFactory.

SchemaStorage interface

The SchemaStorage interface has the following methods:

  1. public interface SchemaStorage {
  2. // How schemas are updated
  3. CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
  4. // How schemas are fetched from storage
  5. CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
  6. // How schemas are deleted
  7. CompletableFuture<SchemaVersion> delete(String key);
  8. // Utility method for converting a schema version byte array to a SchemaVersion object
  9. SchemaVersion versionFromBytes(byte[] version);
  10. // Startup behavior for the schema storage client
  11. void start() throws Exception;
  12. // Shutdown behavior for the schema storage client
  13. void close() throws Exception;
  14. }

For a full-fledged example schema storage implementation, see the BookKeeperSchemaStorage class.

SchemaStorageFactory interface

  1. public interface SchemaStorageFactory {
  2. @NotNull
  3. SchemaStorage create(PulsarService pulsar) throws Exception;
  4. }

For a full-fledged example schema storage factory implementation, see the BookKeeperSchemaStorageFactory class.

Deployment

In order to use your custom schema storage implementation, you’ll need to:

  1. Package the implementation in a JAR file.
  2. Add that jar to the lib folder in your Pulsar binary or source distribution.
  3. Change the schemaRegistryStorageClassName configuration in broker.conf to your custom factory class (i.e. the SchemaStorageFactory implementation, not the SchemaStorage implementation).
  4. Start up Pulsar.