Catalogs

Catalogs provide metadata, such as databases, tables, partitions, views, and functions and information needed to access data stored in a database or other external systems.

One of the most crucial aspects of data processing is managing metadata. It may be transient metadata like temporary tables, or UDFs registered against the table environment. Or permanent metadata, like that in a Hive Metastore. Catalogs provide a unified API for managing metadata and making it accessible from the Table API and SQL Queries.

Catalog enables users to reference existing metadata in their data systems, and automatically maps them to Flink’s corresponding metadata. For example, Flink can map JDBC tables to Flink table automatically, and users don’t have to manually re-writing DDLs in Flink. Catalog greatly simplifies steps required to get started with Flink with users’ existing system, and greatly enhanced user experiences.

Catalog Types

GenericInMemoryCatalog

The GenericInMemoryCatalog is an in-memory implementation of a catalog. All objects will be available only for the lifetime of the session.

JdbcCatalog

The JdbcCatalog enables users to connect Flink to relational databases over JDBC protocol. Postgres Catalog and MySQL Catalog are the only two implementations of JDBC Catalog at the moment. See JdbcCatalog documentation for more details on setting up the catalog.

HiveCatalog

The HiveCatalog serves two purposes; as persistent storage for pure Flink metadata, and as an interface for reading and writing existing Hive metadata. Flink’s Hive documentation provides full details on setting up the catalog and interfacing with an existing Hive installation.

The Hive Metastore stores all meta-object names in lower case. This is unlike GenericInMemoryCatalog which is case-sensitive

User-Defined Catalog

Catalogs are pluggable and users can develop custom catalogs by implementing the Catalog interface.

In order to use custom catalogs with Flink SQL, users should implement a corresponding catalog factory by implementing the CatalogFactory interface. The factory is discovered using Java’s Service Provider Interfaces (SPI). Classes that implement this interface can be added to META_INF/services/org.apache.flink.table.factories.Factory in JAR files. The provided factory identifier will be used for matching against the required type property in a SQL CREATE CATALOG DDL statement.

Since Flink v1.16, TableEnvironment introduces a user class loader to have a consistent class loading behavior in table programs, SQL Client and SQL Gateway. The user classloader manages all user jars such as jar added by ADD JAR or CREATE FUNCTION .. USING JAR .. statements. User-defined catalogs should replace Thread.currentThread().getContextClassLoader() with the user class loader to load classes. Otherwise, ClassNotFoundException maybe thrown. The user class loader can be accessed via CatalogFactory.Context#getClassLoader.

Interface in Catalog for supporting time travel

Starting from version 1.18, the Flink framework supports time travel to query historical data of a table. To query the historical data of a table, users should implement getTable(ObjectPath tablePath, long timestamp) method for the catalog that the table belongs to.

  1. public class MyCatalogSupportTimeTravel implements Catalog {
  2. @Override
  3. public CatalogBaseTable getTable(ObjectPath tablePath, long timestamp)
  4. throws TableNotExistException {
  5. // Build a schema corresponding to the specific time point.
  6. Schema schema = buildSchema(timestamp);
  7. // Set parameters to read data at the corresponding time point.
  8. Map<String, String> options = buildOptions(timestamp);
  9. // Build CatalogTable
  10. CatalogTable catalogTable =
  11. CatalogTable.of(schema, "", Collections.emptyList(), options, timestamp);
  12. return catalogTable;
  13. }
  14. }
  15. public class MyDynamicTableFactory implements DynamicTableSourceFactory {
  16. @Override
  17. public DynamicTableSource createDynamicTableSource(Context context) {
  18. final ReadableConfig configuration =
  19. Configuration.fromMap(context.getCatalogTable().getOptions());
  20. // Get snapshot from CatalogTable
  21. final Optional<Long> snapshot = context.getCatalogTable().getSnapshot();
  22. // Build DynamicTableSource using snapshot options.
  23. final DynamicTableSource dynamicTableSource = buildDynamicSource(configuration, snapshot);
  24. return dynamicTableSource;
  25. }
  26. }

Using SQL DDL

Users can use SQL DDL to create tables in catalogs in both Table API and SQL.

Java

  1. TableEnvironment tableEnv = ...;
  2. // Create a HiveCatalog
  3. Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");
  4. // Register the catalog
  5. tableEnv.registerCatalog("myhive", catalog);
  6. // Create a catalog database
  7. tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");
  8. // Create a catalog table
  9. tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
  10. tableEnv.listTables(); // should return the tables in current catalog and database.

Scala

  1. val tableEnv = ...
  2. // Create a HiveCatalog
  3. val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")
  4. // Register the catalog
  5. tableEnv.registerCatalog("myhive", catalog)
  6. // Create a catalog database
  7. tableEnv.executeSql("CREATE DATABASE mydb WITH (...)")
  8. // Create a catalog table
  9. tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")
  10. tableEnv.listTables() // should return the tables in current catalog and database.

Python

  1. from pyflink.table.catalog import HiveCatalog
  2. # Create a HiveCatalog
  3. catalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")
  4. # Register the catalog
  5. t_env.register_catalog("myhive", catalog)
  6. # Create a catalog database
  7. t_env.execute_sql("CREATE DATABASE mydb WITH (...)")
  8. # Create a catalog table
  9. t_env.execute_sql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")
  10. # should return the tables in current catalog and database.
  11. t_env.list_tables()

SQL Client

  1. // the catalog should have been registered via yaml file
  2. Flink SQL> CREATE DATABASE mydb WITH (...);
  3. Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);
  4. Flink SQL> SHOW TABLES;
  5. mytable

For detailed information, please check out Flink SQL CREATE DDL.

Using Java, Scala or Python

Users can use Java, Scala or Python to create catalog tables programmatically.

Java

  1. import org.apache.flink.table.api.*;
  2. import org.apache.flink.table.catalog.*;
  3. import org.apache.flink.table.catalog.hive.HiveCatalog;
  4. TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
  5. // Create a HiveCatalog
  6. Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");
  7. // Register the catalog
  8. tableEnv.registerCatalog("myhive", catalog);
  9. // Create a catalog database
  10. catalog.createDatabase("mydb", new CatalogDatabaseImpl(...));
  11. // Create a catalog table
  12. final Schema schema = Schema.newBuilder()
  13. .column("name", DataTypes.STRING())
  14. .column("age", DataTypes.INT())
  15. .build();
  16. tableEnv.createTable("myhive.mydb.mytable", TableDescriptor.forConnector("kafka")
  17. .schema(schema)
  18. // …
  19. .build());
  20. List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"

Scala

  1. import org.apache.flink.table.api._
  2. import org.apache.flink.table.catalog._
  3. import org.apache.flink.table.catalog.hive.HiveCatalog
  4. val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
  5. // Create a HiveCatalog
  6. val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")
  7. // Register the catalog
  8. tableEnv.registerCatalog("myhive", catalog)
  9. // Create a catalog database
  10. catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))
  11. // Create a catalog table
  12. val schema = Schema.newBuilder()
  13. .column("name", DataTypes.STRING())
  14. .column("age", DataTypes.INT())
  15. .build()
  16. tableEnv.createTable("myhive.mydb.mytable", TableDescriptor.forConnector("kafka")
  17. .schema(schema)
  18. // …
  19. .build())
  20. val tables = catalog.listTables("mydb") // tables should contain "mytable"

Python

  1. from pyflink.table import *
  2. from pyflink.table.catalog import HiveCatalog, CatalogDatabase, ObjectPath, CatalogBaseTable
  3. settings = EnvironmentSettings.in_batch_mode()
  4. t_env = TableEnvironment.create(settings)
  5. # Create a HiveCatalog
  6. catalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")
  7. # Register the catalog
  8. t_env.register_catalog("myhive", catalog)
  9. # Create a catalog database
  10. database = CatalogDatabase.create_instance({"k1": "v1"}, None)
  11. catalog.create_database("mydb", database)
  12. # Create a catalog table
  13. schema = Schema.new_builder() \
  14. .column("name", DataTypes.STRING()) \
  15. .column("age", DataTypes.INT()) \
  16. .build()
  17. catalog_table = t_env.create_table("myhive.mydb.mytable", TableDescriptor.for_connector("kafka")
  18. .schema(schema)
  19. # …
  20. .build())
  21. # tables should contain "mytable"
  22. tables = catalog.list_tables("mydb")

Catalog API

Note: only catalog program APIs are listed here. Users can achieve many of the same functionalities with SQL DDL. For detailed DDL information, please refer to SQL CREATE DDL.

Database operations

Java/Scala

  1. // create database
  2. catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
  3. // drop database
  4. catalog.dropDatabase("mydb", false);
  5. // alter database
  6. catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);
  7. // get database
  8. catalog.getDatabase("mydb");
  9. // check if a database exist
  10. catalog.databaseExists("mydb");
  11. // list databases in a catalog
  12. catalog.listDatabases();

Python

  1. from pyflink.table.catalog import CatalogDatabase
  2. # create database
  3. catalog_database = CatalogDatabase.create_instance({"k1": "v1"}, None)
  4. catalog.create_database("mydb", catalog_database, False)
  5. # drop database
  6. catalog.drop_database("mydb", False)
  7. # alter database
  8. catalog.alter_database("mydb", catalog_database, False)
  9. # get database
  10. catalog.get_database("mydb")
  11. # check if a database exist
  12. catalog.database_exists("mydb")
  13. # list databases in a catalog
  14. catalog.list_databases()

Table operations

Java/Scala

  1. // create table
  2. catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
  3. // drop table
  4. catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
  5. // alter table
  6. catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
  7. // rename table
  8. catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
  9. // get table
  10. catalog.getTable("mytable");
  11. // check if a table exist or not
  12. catalog.tableExists("mytable");
  13. // list tables in a database
  14. catalog.listTables("mydb");

Python

  1. from pyflink.table import *
  2. from pyflink.table.catalog import CatalogBaseTable, ObjectPath
  3. from pyflink.table.descriptors import Kafka
  4. table_schema = TableSchema.builder() \
  5. .field("name", DataTypes.STRING()) \
  6. .field("age", DataTypes.INT()) \
  7. .build()
  8. table_properties = Kafka() \
  9. .version("0.11") \
  10. .start_from_earlist() \
  11. .to_properties()
  12. catalog_table = CatalogBaseTable.create_table(schema=table_schema, properties=table_properties, comment="my comment")
  13. # create table
  14. catalog.create_table(ObjectPath("mydb", "mytable"), catalog_table, False)
  15. # drop table
  16. catalog.drop_table(ObjectPath("mydb", "mytable"), False)
  17. # alter table
  18. catalog.alter_table(ObjectPath("mydb", "mytable"), catalog_table, False)
  19. # rename table
  20. catalog.rename_table(ObjectPath("mydb", "mytable"), "my_new_table")
  21. # get table
  22. catalog.get_table("mytable")
  23. # check if a table exist or not
  24. catalog.table_exists("mytable")
  25. # list tables in a database
  26. catalog.list_tables("mydb")

View operations

Java/Scala

  1. // create view
  2. catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);
  3. // drop view
  4. catalog.dropTable(new ObjectPath("mydb", "myview"), false);
  5. // alter view
  6. catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);
  7. // rename view
  8. catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);
  9. // get view
  10. catalog.getTable("myview");
  11. // check if a view exist or not
  12. catalog.tableExists("mytable");
  13. // list views in a database
  14. catalog.listViews("mydb");

Python

  1. from pyflink.table import *
  2. from pyflink.table.catalog import CatalogBaseTable, ObjectPath
  3. table_schema = TableSchema.builder() \
  4. .field("name", DataTypes.STRING()) \
  5. .field("age", DataTypes.INT()) \
  6. .build()
  7. catalog_table = CatalogBaseTable.create_view(
  8. original_query="select * from t1",
  9. expanded_query="select * from test-catalog.db1.t1",
  10. schema=table_schema,
  11. properties={},
  12. comment="This is a view"
  13. )
  14. catalog.create_table(ObjectPath("mydb", "myview"), catalog_table, False)
  15. # drop view
  16. catalog.drop_table(ObjectPath("mydb", "myview"), False)
  17. # alter view
  18. catalog.alter_table(ObjectPath("mydb", "mytable"), catalog_table, False)
  19. # rename view
  20. catalog.rename_table(ObjectPath("mydb", "myview"), "my_new_view", False)
  21. # get view
  22. catalog.get_table("myview")
  23. # check if a view exist or not
  24. catalog.table_exists("mytable")
  25. # list views in a database
  26. catalog.list_views("mydb")

Partition operations

Java/Scala

  1. // create view
  2. catalog.createPartition(
  3. new ObjectPath("mydb", "mytable"),
  4. new CatalogPartitionSpec(...),
  5. new CatalogPartitionImpl(...),
  6. false);
  7. // drop partition
  8. catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);
  9. // alter partition
  10. catalog.alterPartition(
  11. new ObjectPath("mydb", "mytable"),
  12. new CatalogPartitionSpec(...),
  13. new CatalogPartitionImpl(...),
  14. false);
  15. // get partition
  16. catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
  17. // check if a partition exist or not
  18. catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
  19. // list partitions of a table
  20. catalog.listPartitions(new ObjectPath("mydb", "mytable"));
  21. // list partitions of a table under a give partition spec
  22. catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
  23. // list partitions of a table by expression filter
  24. catalog.listPartitionsByFilter(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));

Python

  1. from pyflink.table.catalog import ObjectPath, CatalogPartitionSpec, CatalogPartition
  2. catalog_partition = CatalogPartition.create_instance({}, "my partition")
  3. catalog_partition_spec = CatalogPartitionSpec({"third": "2010", "second": "bob"})
  4. catalog.create_partition(
  5. ObjectPath("mydb", "mytable"),
  6. catalog_partition_spec,
  7. catalog_partition,
  8. False)
  9. # drop partition
  10. catalog.drop_partition(ObjectPath("mydb", "mytable"), catalog_partition_spec, False)
  11. # alter partition
  12. catalog.alter_partition(
  13. ObjectPath("mydb", "mytable"),
  14. CatalogPartitionSpec(...),
  15. catalog_partition,
  16. False)
  17. # get partition
  18. catalog.get_partition(ObjectPath("mydb", "mytable"), catalog_partition_spec)
  19. # check if a partition exist or not
  20. catalog.partition_exists(ObjectPath("mydb", "mytable"), catalog_partition_spec)
  21. # list partitions of a table
  22. catalog.list_partitions(ObjectPath("mydb", "mytable"))
  23. # list partitions of a table under a give partition spec
  24. catalog.list_partitions(ObjectPath("mydb", "mytable"), catalog_partition_spec)

Function operations

Java/Scala

  1. // create function
  2. catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
  3. // drop function
  4. catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
  5. // alter function
  6. catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
  7. // get function
  8. catalog.getFunction("myfunc");
  9. // check if a function exist or not
  10. catalog.functionExists("myfunc");
  11. // list functions in a database
  12. catalog.listFunctions("mydb");

Python

  1. from pyflink.table.catalog import ObjectPath, CatalogFunction
  2. catalog_function = CatalogFunction.create_instance(class_name="my.python.udf")
  3. # create function
  4. catalog.create_function(ObjectPath("mydb", "myfunc"), catalog_function, False)
  5. # drop function
  6. catalog.drop_function(ObjectPath("mydb", "myfunc"), False)
  7. # alter function
  8. catalog.alter_function(ObjectPath("mydb", "myfunc"), catalog_function, False)
  9. # get function
  10. catalog.get_function("myfunc")
  11. # check if a function exist or not
  12. catalog.function_exists("myfunc")
  13. # list functions in a database
  14. catalog.list_functions("mydb")

Table API and SQL for Catalog

Registering a Catalog

Users have access to a default in-memory catalog named default_catalog, that is always created by default. This catalog by default has a single database called default_database. Users can also register additional catalogs into an existing Flink session.

Java/Scala

  1. tableEnv.registerCatalog(new CustomCatalog("myCatalog"));

Python

  1. t_env.register_catalog(catalog)

YAML

All catalogs defined using YAML must provide a type property that specifies the type of catalog. The following types are supported out of the box.

CatalogType Value
GenericInMemorygeneric_in_memory
Hivehive
  1. catalogs:
  2. - name: myCatalog
  3. type: custom_catalog
  4. hive-conf-dir: ...

Changing the Current Catalog And Database

Flink will always search for tables, views, and UDF’s in the current catalog and database.

Java/Scala

  1. tableEnv.useCatalog("myCatalog");
  2. tableEnv.useDatabase("myDb");

Python

  1. t_env.use_catalog("myCatalog")
  2. t_env.use_database("myDb")

SQL

  1. Flink SQL> USE CATALOG myCatalog;
  2. Flink SQL> USE myDB;

Metadata from catalogs that are not the current catalog are accessible by providing fully qualified names in the form catalog.database.object.

Java/Scala

  1. tableEnv.from("not_the_current_catalog.not_the_current_db.my_table");

Python

  1. t_env.from_path("not_the_current_catalog.not_the_current_db.my_table")

SQL

  1. Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;

List Available Catalogs

Java/Scala

  1. tableEnv.listCatalogs();

Python

  1. t_env.list_catalogs()

SQL

  1. Flink SQL> show catalogs;

List Available Databases

Java/Scala

  1. tableEnv.listDatabases();

Python

  1. t_env.list_databases()

SQL

  1. Flink SQL> show databases;

List Available Tables

Java/Scala

  1. tableEnv.listTables();

Python

  1. t_env.list_tables()

SQL

  1. Flink SQL> show tables;

Catalog Modification Listener

Flink supports registering customized listener for catalog modification, such as database and table ddl. Flink will create a CatalogModificationEvent event for ddl and notify CatalogModificationListener. You can implement a listener and do some customized operations when receiving the event, such as report the information to some external meta-data systems.

Implement Catalog Listener

There are two interfaces for the catalog modification listener: CatalogModificationListenerFactory to create the listener and CatalogModificationListener to receive and process the event. You need to implement these interfaces and below is an example.

  1. /** Factory used to create a {@link CatalogModificationListener} instance. */
  2. public class YourCatalogListenerFactory implements CatalogModificationListenerFactory {
  3. /** The identifier for the customized listener factory, you can named it yourself. */
  4. private static final String IDENTIFIER = "your_factory";
  5. @Override
  6. public String factoryIdentifier() {
  7. return IDENTIFIER;
  8. }
  9. @Override
  10. public CatalogModificationListener createListener(Context context) {
  11. return new YourCatalogListener(Create http client from context);
  12. }
  13. }
  14. /** Customized catalog modification listener. */
  15. public class YourCatalogListener implements CatalogModificationListener {
  16. private final HttpClient client;
  17. YourCatalogListener(HttpClient client) {
  18. this.client = client;
  19. }
  20. @Override
  21. public void onEvent(CatalogModificationEvent event) {
  22. // Report the database and table information via http client.
  23. }
  24. }

Register Catalog Listener

After implemented above catalog modification factory and listener, you can register it to the table environment.

  1. Configuration configuration = new Configuration();
  2. // Add the factory identifier, you can set multiple listeners in the configuraiton.
  3. configuration.set(TableConfigOptions.TABLE_CATALOG_MODIFICATION_LISTENERS, Arrays.asList("your_factory"));
  4. TableEnvironment env = TableEnvironment.create(
  5. EnvironmentSettings.newInstance()
  6. .withConfiguration(configuration)
  7. .build());
  8. // Create/Alter/Drop database and table.
  9. env.executeSql("CREATE TABLE ...").wait();

For sql-gateway, you can add the option table.catalog-modification.listeners in the flink-conf.yaml and start the gateway, or you can also use SET to specify the listener for ddl, for example, in sql-client or jdbc-driver.

  1. Flink SQL> SET 'table.catalog-modification.listeners' = 'your_factory';
  2. Flink SQL> CREATE TABLE test_table(...);

Catalog Store

Catalog Store is used to store the configuration of catalogs. When using Catalog Store, the configurations of catalogs created in the session will be persisted in the corresponding external system of Catalog Store. Even if the session is reconstructed, previously created catalogs can still be retrieved from Catalog Store.

Configure Catalog Store

Users can configure the Catalog Store in different ways, one is to use the Table API, and another is to use YAML configuration.

Register a catalog store using catalog store instance:

  1. // Initialize a catalog Store instance
  2. CatalogStore catalogStore = new FileCatalogStore("file:///path/to/catalog/store/");
  3. // set up the catalog store
  4. final EnvironmentSettings settings =
  5. EnvironmentSettings.newInstance().inBatchMode()
  6. .withCatalogStore(catalogStore)
  7. .build();

Register a catalog store using configuration:

  1. // Set up configuration
  2. Configuration configuration = new Configuration();
  3. configuration.set("table.catalog-store.kind", "file");
  4. configuration.set("table.catalog-store.file.path", "file:///path/to/catalog/store/");
  5. // set up the configuration.
  6. final EnvironmentSettings settings =
  7. EnvironmentSettings.newInstance().inBatchMode()
  8. .withConfiguration(configuration)
  9. .build();
  10. final TableEnvironment tableEnv = TableEnvironment.create(settings);

In SQL Gateway, it is recommended to configure the settings in a yaml file so that all sessions can automatically use the pre-created Catalog. Usually, you need to configure the kind of Catalog Store and other required parameters for the Catalog Store.

  1. table.catalog-store.kind: file
  2. table.catalog-store.file.path: file:///path/to/catalog/store/

Catalog Store Type

Flink has two built-in Catalog Stores, namely GenericInMemoryCatalogStore and FileCatalogStore, but the Catalog Store model is extendable, so users can also implement their own custom Catalog Store.

GenericInMemoryCatalogStore

GenericInMemoryCatalogStore is an implementation of CatalogStore that saves configuration information in memory. All catalog configurations are only available within the sessions’ lifecycle, and the stored catalog configurations will be automatically cleared after the session is closed.

By default, if no Catalog Store related configuration is specified, the system uses this implementation.

FileCatalogStore

FileCatalogStore can save the Catalog configuration to a file. To use FileCatalogStore, you need to specify the directory where the Catalog configurations needs to be saved. Each Catalog will have its own file named the same as the Catalog Name.

The FileCatalogStore implementation supports both local and remote file systems that are available via the Flink FileSystem abstraction. If the given Catalog Store path does not exist either completely or partly, FileCatalogStore will try to create the missing directories.

If the given Catalog Store path does not exist and FileCatalogStore fails to create a directory, the Catalog Store cannot be initialized, hence an exception will be thrown. In case the FileCatalogstore initialization is not successful, both SQL Client and SQL Gateway will be broken.

Here is an example directory structure representing the storage of Catalog configurations using FileCatalogStore:

  1. - /path/to/save/the/catalog/
  2. - catalog1.yaml
  3. - catalog2.yaml
  4. - catalog3.yaml

Catalog Store Configuration

The following options can be used to adjust the Catalog Store behavior.

KeyDefaultTypeDescription
table.catalog-store.kind
“generic_in_memory”StringThe kind of catalog store to be used. Out of the box, ‘generic_in_memory’ and ‘file’ options are supported.
table.catalog-store.file.path
(none)StringThe configuration option for specifying the path to the file catalog store root directory.

Custom Catalog Store

Catalog Store is extensible, and users can customize Catalog Store by implementing its interface. If SQL CLI or SQL Gateway needs to use Catalog Store, the corresponding CatalogStoreFactory interface also needs to be implemented for this Catalog Store.

  1. public class CustomCatalogStoreFactory implements CatalogStoreFactory {
  2. public static final String IDENTIFIER = "custom-kind";
  3. // Used to connect external storage systems
  4. private CustomClient client;
  5. @Override
  6. public CatalogStore createCatalogStore() {
  7. return new CustomCatalogStore();
  8. }
  9. @Override
  10. public void open(Context context) throws CatalogException {
  11. // initialize the resources, such as http client
  12. client = initClient(context);
  13. }
  14. @Override
  15. public void close() throws CatalogException {
  16. // release the resources
  17. }
  18. @Override
  19. public String factoryIdentifier() {
  20. // table store kind identifier
  21. return IDENTIFIER;
  22. }
  23. public Set<ConfigOption<?>> requiredOptions() {
  24. // define the required options
  25. Set<ConfigOption> options = new HashSet();
  26. options.add(OPTION_1);
  27. options.add(OPTION_2);
  28. return options;
  29. }
  30. @Override
  31. public Set<ConfigOption<?>> optionalOptions() {
  32. // define the optional options
  33. }
  34. }
  35. public class CustomCatalogStore extends AbstractCatalogStore {
  36. private Client client;
  37. public CustomCatalogStore(Client client) {
  38. this.client = client;
  39. }
  40. @Override
  41. public void storeCatalog(String catalogName, CatalogDescriptor catalog)
  42. throws CatalogException {
  43. // store the catalog
  44. }
  45. @Override
  46. public void removeCatalog(String catalogName, boolean ignoreIfNotExists)
  47. throws CatalogException {
  48. // remove the catalog descriptor
  49. }
  50. @Override
  51. public Optional<CatalogDescriptor> getCatalog(String catalogName) {
  52. // retrieve the catalog configuration and build the catalog descriptor
  53. }
  54. @Override
  55. public Set<String> listCatalogs() {
  56. // list all catalogs
  57. }
  58. @Override
  59. public boolean contains(String catalogName) {
  60. }
  61. }