Creating and Modifying Distributed Tables (DDL)

Creating And Distributing Tables

To create a distributed table, you need to first define the table schema. To do so, you can define a table using the CREATE TABLE statement in the same way as you would do with a regular PostgreSQL table.

  1. CREATE TABLE github_events
  2. (
  3. event_id bigint,
  4. event_type text,
  5. event_public boolean,
  6. repo_id bigint,
  7. payload jsonb,
  8. repo jsonb,
  9. actor jsonb,
  10. org jsonb,
  11. created_at timestamp
  12. );

Next, you can use the create_distributed_table() function to specify the table distribution column and create the worker shards.

  1. SELECT create_distributed_table('github_events', 'repo_id');

This function informs Citus that the github_events table should be distributed on the repo_id column (by hashing the column value). The function also creates shards on the worker nodes using the citus.shard_count and citus.shard_replication_factor configuration values.

This example would create a total of citus.shard_count number of shards where each shard owns a portion of a hash token space and gets replicated based on the default citus.shard_replication_factor configuration value. The shard replicas created on the worker have the same table schema, index, and constraint definitions as the table on the coordinator. Once the replicas are created, this function saves all distributed metadata on the coordinator.

Each created shard is assigned a unique shard id and all its replicas have the same shard id. Each shard is represented on the worker node as a regular PostgreSQL table with name ‘tablename_shardid’ where tablename is the name of the distributed table and shardid is the unique id assigned to that shard. You can connect to the worker postgres instances to view or run commands on individual shards.

You are now ready to insert data into the distributed table and run queries on it. You can also learn more about the UDF used in this section in the Citus Utility Functions of our documentation.

Reference Tables

The above method distributes tables into multiple horizontal shards, but another possibility is distributing tables into a single shard and replicating the shard to every worker node. Tables distributed this way are called reference tables. They are used to store data that needs to be frequently accessed by multiple nodes in a cluster.

Common candidates for reference tables include:

  • Smaller tables which need to join with larger distributed tables.

  • Tables in multi-tenant apps which lack a tenant id column or which aren’t associated with a tenant. (In some cases, to reduce migration effort, users might even choose to make reference tables out of tables associated with a tenant but which currently lack a tenant id.)

  • Tables which need unique constraints across multiple columns and are small enough.

For instance suppose a multi-tenant eCommerce site needs to calculate sales tax for transactions in any of its stores. Tax information isn’t specific to any tenant. It makes sense to consolidate it in a shared table. A US-centric reference table might look like this:

  1. -- a reference table
  2. CREATE TABLE states (
  3. code char(2) PRIMARY KEY,
  4. full_name text NOT NULL,
  5. general_sales_tax numeric(4,3)
  6. );
  7. -- distribute it to all workers
  8. SELECT create_reference_table('states');

Now queries such as one calculating tax for a shopping cart can join on the states table with no network overhead, and can add a foreign key to the state code for better validation.

In addition to distributing a table as a single replicated shard, the create_reference_table UDF marks it as a reference table in the Citus metadata tables. Citus automatically performs two-phase commits (2PC) for modifications to tables marked this way, which provides strong consistency guarantees.

If you have an existing distributed table, you can change it to a reference table by running:

  1. SELECT undistribute_table('table_name');
  2. SELECT create_reference_table('table_name');

For another example of using reference tables in a multi-tenant application, see Sharing Data Between Tenants.

Distributing Coordinator Data

If an existing PostgreSQL database is converted into the coordinator node for a Citus cluster, the data in its tables can be distributed efficiently and with minimal interruption to an application.

The create_distributed_table function described earlier works on both empty and non-empty tables, and for the latter it automatically distributes table rows throughout the cluster. You will know if it does this by the presence of the message, “NOTICE: Copying data from local table…” For example:

  1. CREATE TABLE series AS SELECT i FROM generate_series(1,1000000) i;
  2. SELECT create_distributed_table('series', 'i');
  3. NOTICE: Copying data from local table...
  4. NOTICE: copying the data has completed
  5. DETAIL: The local data in the table is no longer visible, but is still on disk.
  6. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.series$$)
  7. create_distributed_table
  8. --------------------------
  9. (1 row)

Writes on the table are blocked while the data is migrated, and pending writes are handled as distributed queries once the function commits. (If the function fails then the queries become local again.) Reads can continue as normal and will become distributed queries once the function commits.

When distributing tables A and B, where A has a foreign key to B, distribute the key destination table B first. Doing it in the wrong order will cause an error:

  1. ERROR: cannot create foreign key constraint
  2. DETAIL: Referenced table must be a distributed table or a reference table.

If it’s not possible to distribute in the correct order then drop the foreign keys, distribute the tables, and recreate the foreign keys.

After the tables are distributed, use the truncate_local_data_after_distributing_table function to remove local data. Leftover local data in distributed tables is inaccessible to Citus queries, and can cause irrelevant constraint violations on the coordinator.

When migrating data from an external database, such as from Amazon RDS to Citus Cloud, first create the Citus distributed tables via create_distributed_table, then copy the data into the table. Copying into distributed tables avoids running out of space on the coordinator node.

Co-Locating Tables

Co-location is the practice of dividing data tactically, keeping related information on the same machines to enable efficient relational operations, while taking advantage of the horizontal scalability for the whole dataset. For more information and examples see Table Co-Location.

Tables are co-located in groups. To manually control a table’s co-location group assignment use the optional colocate_with parameter of create_distributed_table. If you don’t care about a table’s co-location then omit this parameter. It defaults to the value 'default', which groups the table with any other default co-location table having the same distribution column type, shard count, and replication factor. If you want to break or update this implicit colocation, you can use update_distributed_table_colocation().

  1. -- these tables are implicitly co-located by using the same
  2. -- distribution column type and shard count with the default
  3. -- co-location group
  4. SELECT create_distributed_table('A', 'some_int_col');
  5. SELECT create_distributed_table('B', 'other_int_col');

When a new table is not related to others in its would-be implicit co-location group, specify colocated_with => 'none'.

  1. -- not co-located with other tables
  2. SELECT create_distributed_table('A', 'foo', colocate_with => 'none');

Splitting unrelated tables into their own co-location groups will improve shard rebalancing performance, because shards in the same group have to be moved together.

When tables are indeed related (for instance when they will be joined), it can make sense to explicitly co-locate them. The gains of appropriate co-location are more important than any rebalancing overhead.

To explicitly co-locate multiple tables, distribute one and then put the others into its co-location group. For example:

  1. -- distribute stores
  2. SELECT create_distributed_table('stores', 'store_id');
  3. -- add to the same group as stores
  4. SELECT create_distributed_table('orders', 'store_id', colocate_with => 'stores');
  5. SELECT create_distributed_table('products', 'store_id', colocate_with => 'stores');

Information about co-location groups is stored in the pg_dist_colocation table, while pg_dist_partition reveals which tables are assigned to which groups.

Upgrading from Citus 5.x

Starting with Citus 6.0, we made co-location a first-class concept, and started tracking tables’ assignment to co-location groups in pg_dist_colocation. Since Citus 5.x didn’t have this concept, tables created with Citus 5 were not explicitly marked as co-located in metadata, even when the tables were physically co-located.

Since Citus uses co-location metadata information for query optimization and pushdown, it becomes critical to inform Citus of this co-location for previously created tables. To fix the metadata, simply mark the tables as co-located using mark_tables_colocated:

  1. -- Assume that stores, products and line_items were created in a Citus 5.x database.
  2. -- Put products and line_items into store's co-location group
  3. SELECT mark_tables_colocated('stores', ARRAY['products', 'line_items']);

This function requires the tables to be distributed with the same method, column type, number of shards, and replication method. It doesn’t re-shard or physically move data, it merely updates Citus metadata.

Dropping Tables

You can use the standard PostgreSQL DROP TABLE command to remove your distributed tables. As with regular tables, DROP TABLE removes any indexes, rules, triggers, and constraints that exist for the target table. In addition, it also drops the shards on the worker nodes and cleans up their metadata.

  1. DROP TABLE github_events;

Modifying Tables

Citus automatically propagates many kinds of DDL statements, which means that modifying a distributed table on the coordinator node will update shards on the workers too. Other DDL statements require manual propagation, and certain others are prohibited such as those which would modify a distribution column. Attempting to run DDL that is ineligible for automatic propagation will raise an error and leave tables on the coordinator node unchanged.

Here is a reference of the categories of DDL statements which propagate. Note that automatic propagation can be enabled or disabled with a configuration parameter.

Adding/Modifying Columns

Citus propagates most ALTER TABLE commands automatically. Adding columns or changing their default values work as they would in a single-machine PostgreSQL database:

  1. -- Adding a column
  2. ALTER TABLE products ADD COLUMN description text;
  3. -- Changing default value
  4. ALTER TABLE products ALTER COLUMN price SET DEFAULT 7.77;

Significant changes to an existing column like renaming it or changing its data type are fine too. However, the data type of the distribution column cannot be altered. This column determines how table data distributes through the Citus cluster, and modifying its data type would require moving the data.

Attempting to do so causes an error:

  1. -- assuming store_id is the distribution column
  2. -- for products, and that it has type integer
  3. ALTER TABLE products
  4. ALTER COLUMN store_id TYPE text;
  5. /*
  6. ERROR: cannot execute ALTER TABLE command involving partition column
  7. */

As a workaround, you can consider changing the distribution column, updating it, and changing it back.

Adding/Removing Constraints

Using Citus allows you to continue to enjoy the safety of a relational database, including database constraints (see the PostgreSQL docs). Due to the nature of distributed systems, Citus will not cross-reference uniqueness constraints or referential integrity between worker nodes.

To set up a foreign key between colocated distributed tables, always include the distribution column in the key. This may involve making the key compound.

Foreign keys may be created in these situations:

Foreign keys from reference tables to distributed tables are not supported.

Citus supports all referential actions on foreign keys from local to reference tables, but does not support support ON DELETE/UPDATE CASCADE in the reverse direction (reference to local).

Note

Primary keys and uniqueness constraints must include the distribution column. Adding them to a non-distribution column will generate an error (see Cannot create uniqueness constraint).

This example shows how to create primary and foreign keys on distributed tables:

  1. --
  2. -- Adding a primary key
  3. -- --------------------
  4. -- We'll distribute these tables on the account_id. The ads and clicks
  5. -- tables must use compound keys that include account_id.
  6. ALTER TABLE accounts ADD PRIMARY KEY (id);
  7. ALTER TABLE ads ADD PRIMARY KEY (account_id, id);
  8. ALTER TABLE clicks ADD PRIMARY KEY (account_id, id);
  9. -- Next distribute the tables
  10. SELECT create_distributed_table('accounts', 'id');
  11. SELECT create_distributed_table('ads', 'account_id');
  12. SELECT create_distributed_table('clicks', 'account_id');
  13. --
  14. -- Adding foreign keys
  15. -- -------------------
  16. -- Note that this can happen before or after distribution, as long as
  17. -- there exists a uniqueness constraint on the target column(s) which
  18. -- can only be enforced before distribution.
  19. ALTER TABLE ads ADD CONSTRAINT ads_account_fk
  20. FOREIGN KEY (account_id) REFERENCES accounts (id);
  21. ALTER TABLE clicks ADD CONSTRAINT clicks_ad_fk
  22. FOREIGN KEY (account_id, ad_id) REFERENCES ads (account_id, id);

Similarly, include the distribution column in uniqueness constraints:

  1. -- Suppose we want every ad to use a unique image. Notice we can
  2. -- enforce it only per account when we distribute by account id.
  3. ALTER TABLE ads ADD CONSTRAINT ads_unique_image
  4. UNIQUE (account_id, image_url);

Not-null constraints can be applied to any column (distribution or not) because they require no lookups between workers.

  1. ALTER TABLE ads ALTER COLUMN image_url SET NOT NULL;

Using NOT VALID Constraints

In some situations it can be useful to enforce constraints for new rows, while allowing existing non-conforming rows to remain unchanged. Citus supports this feature for CHECK constraints and foreign keys, using PostgreSQL’s “NOT VALID” constraint designation.

For example, consider an application which stores user profiles in a reference table.

  1. -- we're using the "text" column type here, but a real application
  2. -- might use "citext" which is available in a postgres contrib module
  3. CREATE TABLE users ( email text PRIMARY KEY );
  4. SELECT create_reference_table('users');

In the course of time imagine that a few non-addresses get into the table.

  1. INSERT INTO users VALUES
  2. ('foo@example.com'), ('hacker12@aol.com'), ('lol');

We would like to validate the addresses, but PostgreSQL does not ordinarily allow us to add a CHECK constraint that fails for existing rows. However, it does allow a constraint marked not valid:

  1. ALTER TABLE users
  2. ADD CONSTRAINT syntactic_email
  3. CHECK (email ~
  4. '^[a-zA-Z0-9.!#$%&''*+/=?^_`{|}~-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$'
  5. ) NOT VALID;

This succeeds, and new rows are protected.

  1. INSERT INTO users VALUES ('fake');
  2. /*
  3. ERROR: new row for relation "users_102010" violates
  4. check constraint "syntactic_email_102010"
  5. DETAIL: Failing row contains (fake).
  6. */

Later, during non-peak hours, a database administrator can attempt to fix the bad rows and re-validate the constraint.

  1. -- later, attempt to validate all rows
  2. ALTER TABLE users
  3. VALIDATE CONSTRAINT syntactic_email;

The PostgreSQL documentation has more information about NOT VALID and VALIDATE CONSTRAINT in the ALTER TABLE section.

Adding/Removing Indices

Citus supports adding and removing indices:

  1. -- Adding an index
  2. CREATE INDEX clicked_at_idx ON clicks USING BRIN (clicked_at);
  3. -- Removing an index
  4. DROP INDEX clicked_at_idx;

Adding an index takes a write lock, which can be undesirable in a multi-tenant “system-of-record.” To minimize application downtime, create the index concurrently instead. This method requires more total work than a standard index build and takes significantly longer to complete. However, since it allows normal operations to continue while the index is built, this method is useful for adding new indexes in a production environment.

  1. -- Adding an index without locking table writes
  2. CREATE INDEX CONCURRENTLY clicked_at_idx ON clicks USING BRIN (clicked_at);

Manual Modification

Currently other DDL commands are not auto-propagated, however, you can propagate the changes manually. See Manual Query Propagation.