Useful Diagnostic Queries

Finding which shard contains data for a specific tenant

The rows of a distributed table are grouped into shards, and each shard is placed on a worker node in the Citus cluster. In the multi-tenant Citus use case we can determine which worker node contains the rows for a specific tenant by putting together two pieces of information: the shard id associated with the tenant id, and the shard placements on workers. The two can be retrieved together in a single query. Suppose our multi-tenant application’s tenants and are stores, and we want to find which worker node holds the data for Gap.com (id=4, suppose).

To find the worker node holding the data for store id=4, ask for the placement of rows whose distribution column has value 4:

  1. SELECT shardid, shardstate, shardlength, nodename, nodeport, placementid
  2. FROM pg_dist_placement AS placement,
  3. pg_dist_node AS node
  4. WHERE placement.groupid = node.groupid
  5. AND node.noderole = 'primary'
  6. AND shardid = (
  7. SELECT get_shard_id_for_distribution_column('stores', 4)
  8. );

The output contains the host and port of the worker database.

  1. ┌─────────┬────────────┬─────────────┬───────────┬──────────┬─────────────┐
  2. shardid shardstate shardlength nodename nodeport placementid
  3. ├─────────┼────────────┼─────────────┼───────────┼──────────┼─────────────┤
  4. 102009 1 0 localhost 5433 2
  5. └─────────┴────────────┴─────────────┴───────────┴──────────┴─────────────┘

Finding the distribution column for a table

Each distributed table in Citus has a “distribution column.” For more information about what this is and how it works, see Distributed Data Modeling. There are many situations where it is important to know which column it is. Some operations require joining or filtering on the distribution column, and you may encounter error messages with hints like, “add a filter to the distribution column.”

The pg_dist_* tables on the coordinator node contain diverse metadata about the distributed database. In particular pg_dist_partition holds information about the distribution column (formerly called partition column) for each table. You can use a convenient utility function to look up the distribution column name from the low-level details in the metadata. Here’s an example and its output:

  1. -- create example table
  2. CREATE TABLE products (
  3. store_id bigint,
  4. product_id bigint,
  5. name text,
  6. price money,
  7. CONSTRAINT products_pkey PRIMARY KEY (store_id, product_id)
  8. );
  9. -- pick store_id as distribution column
  10. SELECT create_distributed_table('products', 'store_id');
  11. -- get distribution column name for products table
  12. SELECT column_to_column_name(logicalrelid, partkey) AS dist_col_name
  13. FROM pg_dist_partition
  14. WHERE logicalrelid='products'::regclass;

Example output:

  1. ┌───────────────┐
  2. dist_col_name
  3. ├───────────────┤
  4. store_id
  5. └───────────────┘

Detecting locks

This query will run across all worker nodes and identify locks, how long they’ve been open, and the offending queries:

  1. SELECT run_command_on_workers($cmd$
  2. SELECT array_agg(
  3. blocked_statement || ' $ ' || cur_stmt_blocking_proc
  4. || ' $ ' || cnt::text || ' $ ' || age
  5. )
  6. FROM (
  7. SELECT blocked_activity.query AS blocked_statement,
  8. blocking_activity.query AS cur_stmt_blocking_proc,
  9. count(*) AS cnt,
  10. age(now(), min(blocked_activity.query_start)) AS "age"
  11. FROM pg_catalog.pg_locks blocked_locks
  12. JOIN pg_catalog.pg_stat_activity blocked_activity
  13. ON blocked_activity.pid = blocked_locks.pid
  14. JOIN pg_catalog.pg_locks blocking_locks
  15. ON blocking_locks.locktype = blocked_locks.locktype
  16. AND blocking_locks.DATABASE IS NOT DISTINCT FROM blocked_locks.DATABASE
  17. AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
  18. AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
  19. AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
  20. AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
  21. AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
  22. AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
  23. AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
  24. AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
  25. AND blocking_locks.pid != blocked_locks.pid
  26. JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
  27. WHERE NOT blocked_locks.GRANTED
  28. AND blocking_locks.GRANTED
  29. GROUP BY blocked_activity.query,
  30. blocking_activity.query
  31. ORDER BY 4
  32. ) a
  33. $cmd$);

Example output:

  1. ┌───────────────────────────────────────────────────────────────────────────────────┐
  2. run_command_on_workers
  3. ├───────────────────────────────────────────────────────────────────────────────────┤
  4. (localhost,5433,t,"")
  5. (localhost,5434,t,"{""update ads_102277 set name = 'new name' where id = 1; $ sel…│
  6. │…ect * from ads_102277 where id = 1 for update; $ 1 $ 00:00:03.729519""}")
  7. └───────────────────────────────────────────────────────────────────────────────────┘

Querying the size of your shards

This query will provide you with the size of every shard of a given distributed table, designated here with the placeholder my_table:

  1. SELECT shardid, table_name, shard_size
  2. FROM citus_shards
  3. WHERE table_name = 'my_table';

Example output:

  1. .
  2. shardid | table_name | shard_size
  3. ---------+------------+------------
  4. 102170 | my_table | 90177536
  5. 102171 | my_table | 90177536
  6. 102172 | my_table | 91226112
  7. 102173 | my_table | 90177536

This query uses the Shard information view.

Querying the size of all distributed tables

This query gets a list of the sizes for each distributed table plus the size of their indices.

  1. SELECT table_name, table_size
  2. FROM citus_tables;

Example output:

  1. ┌───────────────┬────────────┐
  2. table_name table_size
  3. ├───────────────┼────────────┤
  4. github_users 39 MB
  5. github_events 98 MB
  6. └───────────────┴────────────┘

There are other ways to measure distributed table size, as well. See Determining Table and Relation Size.

Determining Replication Factor per Table

When using Citus replication rather than PostgreSQL streaming replication, each table can have a customized “replication factor.” This controls the number of redundant copies Citus keeps of each of the table’s shards. (See Worker Node Failures.)

To see an overview of this setting for all tables, run:

  1. SELECT logicalrelid AS tablename,
  2. count(*)/count(DISTINCT ps.shardid) AS replication_factor
  3. FROM pg_dist_shard_placement ps
  4. JOIN pg_dist_shard p ON ps.shardid=p.shardid
  5. GROUP BY logicalrelid;

Example output:

  1. ┌───────────────┬────────────────────┐
  2. tablename replication_factor
  3. ├───────────────┼────────────────────┤
  4. github_events 1
  5. github_users 1
  6. └───────────────┴────────────────────┘

Identifying unused indices

This query will run across all worker nodes and identify any unused indexes for a given distributed table, designated here with the placeholder my_distributed_table:

  1. SELECT *
  2. FROM run_command_on_shards('my_distributed_table', $cmd$
  3. SELECT array_agg(a) as infos
  4. FROM (
  5. SELECT (
  6. schemaname || '.' || relname || '##' || indexrelname || '##'
  7. || pg_size_pretty(pg_relation_size(i.indexrelid))::text
  8. || '##' || idx_scan::text
  9. ) AS a
  10. FROM pg_stat_user_indexes ui
  11. JOIN pg_index i
  12. ON ui.indexrelid = i.indexrelid
  13. WHERE NOT indisunique
  14. AND idx_scan < 50
  15. AND pg_relation_size(relid) > 5 * 8192
  16. AND (schemaname || '.' || relname)::regclass = '%s'::regclass
  17. ORDER BY
  18. pg_relation_size(i.indexrelid) / NULLIF(idx_scan, 0) DESC nulls first,
  19. pg_relation_size(i.indexrelid) DESC
  20. ) sub
  21. $cmd$);

Example output:

  1. ┌─────────┬─────────┬───────────────────────────────────────────────────────────────────────┐
  2. shardid success result
  3. ├─────────┼─────────┼───────────────────────────────────────────────────────────────────────┤
  4. 102008 t
  5. 102009 t {"public.my_distributed_table_102009##stupid_index_102009##28 MB##0"}
  6. 102010 t
  7. 102011 t
  8. └─────────┴─────────┴───────────────────────────────────────────────────────────────────────┘

Monitoring client connection count

This query will give you the connection count by each type that are open on the coordinator:

  1. SELECT state, count(*)
  2. FROM pg_stat_activity
  3. GROUP BY state;

Exxample output:

  1. ┌────────┬───────┐
  2. state count
  3. ├────────┼───────┤
  4. active 3
  5. 1
  6. └────────┴───────┘

Viewing system queries

Active queries

The pg_stat_activity view shows which queries are currently executing. You can filter to find the actively executing ones, along with the process ID of their backend:

  1. SELECT pid, query, state
  2. FROM pg_stat_activity
  3. WHERE state != 'idle';

Why are queries waiting

We can also query to see the most common reasons that non-idle queries that are waiting. For an explanation of the reasons, check the PostgreSQL documentation.

  1. SELECT wait_event || ':' || wait_event_type AS type, count(*) AS number_of_occurences
  2. FROM pg_stat_activity
  3. WHERE state != 'idle'
  4. GROUP BY wait_event, wait_event_type
  5. ORDER BY number_of_occurences DESC;

Example output when running pg_sleep in a separate query concurrently:

  1. ┌─────────────────┬──────────────────────┐
  2. type number_of_occurences
  3. ├─────────────────┼──────────────────────┤
  4. 1
  5. PgSleep:Timeout 1
  6. └─────────────────┴──────────────────────┘

Index hit rate

This query will provide you with your index hit rate across all nodes. Index hit rate is useful in determining how often indices are used when querying:

  1. -- on coordinator
  2. SELECT 100 * (sum(idx_blks_hit) - sum(idx_blks_read)) / sum(idx_blks_hit) AS index_hit_rate
  3. FROM pg_statio_user_indexes;
  4. -- on workers
  5. SELECT nodename, result as index_hit_rate
  6. FROM run_command_on_workers($cmd$
  7. SELECT 100 * (sum(idx_blks_hit) - sum(idx_blks_read)) / sum(idx_blks_hit) AS index_hit_rate
  8. FROM pg_statio_user_indexes;
  9. $cmd$);

Example output:

  1. ┌───────────┬────────────────┐
  2. nodename index_hit_rate
  3. ├───────────┼────────────────┤
  4. 10.0.0.16 96.0
  5. 10.0.0.20 98.0
  6. └───────────┴────────────────┘

Cache hit rate

Most applications typically access a small fraction of their total data at once. Postgres keeps frequently accessed data in memory to avoid slow reads from disk. You can see statistics about it in the pg_statio_user_tables view.

An important measurement is what percentage of data comes from the memory cache vs the disk in your workload:

  1. -- on coordinator
  2. SELECT
  3. sum(heap_blks_read) AS heap_read,
  4. sum(heap_blks_hit) AS heap_hit,
  5. 100 * sum(heap_blks_hit) / (sum(heap_blks_hit) + sum(heap_blks_read)) AS cache_hit_rate
  6. FROM
  7. pg_statio_user_tables;
  8. -- on workers
  9. SELECT nodename, result as cache_hit_rate
  10. FROM run_command_on_workers($cmd$
  11. SELECT
  12. 100 * sum(heap_blks_hit) / (sum(heap_blks_hit) + sum(heap_blks_read)) AS cache_hit_rate
  13. FROM
  14. pg_statio_user_tables;
  15. $cmd$);

Example output:

  1. ┌───────────┬──────────┬─────────────────────┐
  2. heap_read heap_hit cache_hit_rate
  3. ├───────────┼──────────┼─────────────────────┤
  4. 1 132 99.2481203007518796
  5. └───────────┴──────────┴─────────────────────┘

If you find yourself with a ratio significantly lower than 99%, then you likely want to consider increasing the cache available to your database