Identify Distribution Strategy

Pick distribution key

The first step in migrating to Citus is identifying suitable distribution keys and planning table distribution accordingly. In multi-tenant applications this will typically be an internal identifier for tenants. We typically refer to it as the “tenant ID.” The use-cases may vary, so we advise being thorough on this step.

For guidance, read these sections:

  1. Determining Application Type

  2. Choosing Distribution Column

We are happy to help review your environment to be sure that the ideal distribution key is chosen. To do so, we typically examine schema layouts, larger tables, long-running and/or problematic queries, standard use cases, and more.

Identify types of tables

Once a distribution key is identified, review the schema to identify how each table will be handled and whether any modifications to table layouts will be required. We typically advise tracking this with a spreadsheet, and have created a template you can use.

Tables will generally fall into one of the following categories:

  1. Ready for distribution. These tables already contain the distribution key, and are ready for distribution.

  2. Needs backfill. These tables can be logically distributed by the chosen key, but do not contain a column directly referencing it. The tables will be modified later to add the column.

  3. Reference table. These tables are typically small, do not contain the distribution key, are commonly joined by distributed tables, and/or are shared across tenants. A copy of each of these tables will be maintained on all nodes. Common examples include country code lookups, product categories, and the like.

  4. Local table. These are typically not joined to other tables, and do not contain the distribution key. They are maintained exclusively on the coordinator node. Common examples include admin user lookups and other utility tables.

Consider an example multi-tenant application similar to Etsy or Shopify where each tenant is a store. Here’s a portion of a simplified schema:

Schema before migration

(Underlined items are primary keys, italicized items are foreign keys.)

In this example stores are a natural tenant. The tenant id is in this case the store_id. After distributing tables in the cluster, we want rows relating to the same store to reside together on the same nodes.

Prepare Source Tables for Migration

Once the scope of needed database changes is identified, the next major step is to modify the data structure for the application’s existing database. First, tables requiring backfill are modified to add a column for the distribution key.

Add distribution keys

In our storefront example the stores and products tables have a store_id and are ready for distribution. Being normalized, the line_items table lacks a store id. If we want to distribute by store_id, the table needs this column.

  1. -- denormalize line_items by including store_id
  2. ALTER TABLE line_items ADD COLUMN store_id uuid;

Be sure to check that the distribution column has the same type in all tables, e.g. don’t mix int and bigint. The column types must match to ensure proper data colocation.

Backfill newly created columns

Once the schema is updated, backfill missing values for the tenant_id column in tables where the column was added. In our example line_items requires values for store_id.

We backfill the table by obtaining the missing values from a join query with orders:

  1. UPDATE line_items
  2. SET store_id = orders.store_id
  3. FROM line_items
  4. INNER JOIN orders
  5. WHERE line_items.order_id = orders.order_id;

Doing the whole table at once may cause too much load on the database and disrupt other queries. The backfill can done more slowly instead. One way to do that is to make a function that backfills small batches at a time, then call the function repeatedly with pg_cron.

  1. -- the function to backfill up to one
  2. -- thousand rows from line_items
  3. CREATE FUNCTION backfill_batch()
  4. RETURNS void LANGUAGE sql AS $$
  5. WITH batch AS (
  6. SELECT line_items_id, order_id
  7. FROM line_items
  8. WHERE store_id IS NULL
  9. LIMIT 10000
  10. FOR UPDATE
  11. SKIP LOCKED
  12. )
  13. UPDATE line_items AS li
  14. SET store_id = orders.store_id
  15. FROM batch, orders
  16. WHERE batch.line_item_id = li.line_item_id
  17. AND batch.order_id = orders.order_id;
  18. $$;
  19. -- run the function every quarter hour
  20. SELECT cron.schedule('*/15 * * * *', 'SELECT backfill_batch()');
  21. -- ^^ note the return value of cron.schedule

Once the backfill is caught up, the cron job can be disabled:

  1. -- assuming 42 is the job id returned
  2. -- from cron.schedule
  3. SELECT cron.unschedule(42);