Manual Query Propagation

When the user issues a query, the Citus coordinator partitions it into smaller query fragments where each query fragment can be run independently on a worker shard. This allows Citus to distribute each query across the cluster.

However, the way queries are partitioned into fragments (and which queries are propagated at all) varies by the type of query. In some advanced situations it is useful to manually control this behavior. Citus provides utility functions to propagate SQL to workers, shards, or placements.

Manual query propagation bypasses coordinator logic, locking, and any other consistency checks. These functions are available as a last resort to allow statements which Citus otherwise does not run natively. Use them carefully to avoid data inconsistency and deadlocks.

Running on all Workers

The least granular level of execution is broadcasting a statement for execution on all workers. This is useful for viewing properties of entire worker databases.

  1. -- List the work_mem setting of each worker database
  2. SELECT run_command_on_workers($cmd$ SHOW work_mem; $cmd$);

Although this command can also be used to create database objects on the workers, doing so will make it harder to add worker nodes in an automated fashion. To create functions on the workers, it’s best to use create_distributed_function on the coordinator node for that. Registering functions on the coordinator tracks them in the Distributed object table metadata table, so Citus will know to automatically create a copy of the function in any future nodes added to the cluster.

Note

The run_command_on_workers function and other manual propagation commands in this section can run only queries which return a single column and single row.

Running on all Shards

The next level of granularity is running a command across all shards of a particular distributed table. It can be useful, for instance, in reading the properties of a table directly on workers. Queries run locally on a worker node have full access to metadata such as table statistics.

The run_command_on_shards function applies a SQL command to each shard, where the shard name is provided for interpolation in the command. Here is an example of estimating the row count for a distributed table by using the pg_class table on each worker to estimate the number of rows for each shard. Notice the %s which will be replaced with each shard’s name.

  1. -- Get the estimated row count for a distributed table by summing the
  2. -- estimated counts of rows for each shard.
  3. SELECT sum(result::bigint) AS estimated_count
  4. FROM run_command_on_shards(
  5. 'my_distributed_table',
  6. $cmd$
  7. SELECT reltuples
  8. FROM pg_class c
  9. JOIN pg_catalog.pg_namespace n on n.oid=c.relnamespace
  10. WHERE (n.nspname || '.' || relname)::regclass = '%s'::regclass
  11. AND n.nspname NOT IN ('citus', 'pg_toast', 'pg_catalog')
  12. $cmd$
  13. );

Running on all Placements

The most granular level of execution is running a command across all shards and their replicas (aka placements). It can be useful for running data modification commands, which must apply to every replica to ensure consistency.

For example, suppose a distributed table has an updated_at field, and we want to “touch” all rows so that they are marked as updated at a certain time. An ordinary UPDATE statement on the coordinator requires a filter by the distribution column, but we can manually propagate the update across all shards and replicas:

  1. -- note we're using a hard-coded date rather than
  2. -- a function such as "now()" because the query will
  3. -- run at slightly different times on each replica
  4. SELECT run_command_on_placements(
  5. 'my_distributed_table',
  6. $cmd$
  7. UPDATE %s SET updated_at = '2017-01-01';
  8. $cmd$
  9. );

A useful companion to run_command_on_placements is run_command_on_colocated_placements. It interpolates the names of two placements of co-located distributed tables into a query. The placement pairs are always chosen to be local to the same worker where full SQL coverage is available. Thus we can use advanced SQL features like triggers to relate the tables:

  1. -- Suppose we have two distributed tables
  2. CREATE TABLE little_vals (key int, val int);
  3. CREATE TABLE big_vals (key int, val int);
  4. SELECT create_distributed_table('little_vals', 'key');
  5. SELECT create_distributed_table('big_vals', 'key');
  6. -- We want to synchronize them so that every time little_vals
  7. -- are created, big_vals appear with double the value
  8. --
  9. -- First we make a trigger function on each worker, which will
  10. -- take the destination table placement as an argument
  11. SELECT run_command_on_workers($cmd$
  12. CREATE OR REPLACE FUNCTION embiggen() RETURNS TRIGGER AS $$
  13. BEGIN
  14. IF (TG_OP = 'INSERT') THEN
  15. EXECUTE format(
  16. 'INSERT INTO %s (key, val) SELECT ($1).key, ($1).val*2;',
  17. TG_ARGV[0]
  18. ) USING NEW;
  19. END IF;
  20. RETURN NULL;
  21. END;
  22. $$ LANGUAGE plpgsql;
  23. $cmd$);
  24. -- Next we relate the co-located tables by the trigger function
  25. -- on each co-located placement
  26. SELECT run_command_on_colocated_placements(
  27. 'little_vals',
  28. 'big_vals',
  29. $cmd$
  30. CREATE TRIGGER after_insert AFTER INSERT ON %s
  31. FOR EACH ROW EXECUTE PROCEDURE embiggen(%L)
  32. $cmd$
  33. );

Limitations

  • There are no safe-guards against deadlock for multi-statement transactions.

  • There are no safe-guards against mid-query failures and resulting inconsistencies.

  • Query results are cached in memory; these functions can’t deal with very big result sets.

  • The functions error out early if they cannot connect to a node.

  • You can do very bad things!