Citus Tables and Views

Coordinator Metadata

Citus divides each distributed table into multiple logical shards based on the distribution column. The coordinator then maintains metadata tables to track statistics and information about the health and location of these shards. In this section, we describe each of these metadata tables and their schema. You can view and query these tables using SQL after logging into the coordinator node.

Partition table

The pg_dist_partition table stores metadata about which tables in the database are distributed. For each distributed table, it also stores information about the distribution method and detailed information about the distribution column.

Name

Type

Description

logicalrelid

regclass

Distributed table to which this row corresponds. This value references
the relfilenode column in the pg_class system catalog table.

partmethod

char

The method used for partitioning / distribution. The values of this
column corresponding to different distribution methods are :-
append: ‘a’
hash: ‘h’
reference table: ‘n’

partkey

text

Detailed information about the distribution column including column
number, type and other relevant information.

colocationid

integer

Co-location group to which this table belongs. Tables in the same group
allow co-located joins and distributed rollups among other
optimizations. This value references the colocationid column in the
pg_dist_colocation table.

repmodel

char

The method used for data replication. The values of this column
corresponding to different replication methods are :-
citus statement-based replication: ‘c’
postgresql streaming replication: ‘s’
* two-phase commit (for reference tables): ‘t’
  1. SELECT * from pg_dist_partition;
  2. logicalrelid | partmethod | partkey | colocationid | repmodel
  3. ---------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+----------
  4. github_events | h | {VAR :varno 1 :varattno 4 :vartype 20 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 4 :location -1} | 2 | c
  5. (1 row)

Shard table

The pg_dist_shard table stores metadata about individual shards of a table. This includes information about which distributed table the shard belongs to and statistics about the distribution column for that shard. For append distributed tables, these statistics correspond to min / max values of the distribution column. In case of hash distributed tables, they are hash token ranges assigned to that shard. These statistics are used for pruning away unrelated shards during SELECT queries.

Name

Type

Description

logicalrelid

regclass

Distributed table to which this shard belongs. This value references the
relfilenode column in the pg_class system catalog table.

shardid

bigint

Globally unique identifier assigned to this shard.

shardstorage

char

Type of storage used for this shard. Different storage types are
discussed in the table below.

shardminvalue

text

For append distributed tables, minimum value of the distribution column
in this shard (inclusive).
For hash distributed tables, minimum hash token value assigned to that
shard (inclusive).

shardmaxvalue

text

For append distributed tables, maximum value of the distribution column
in this shard (inclusive).
For hash distributed tables, maximum hash token value assigned to that
shard (inclusive).
  1. SELECT * from pg_dist_shard;
  2. logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
  3. ---------------+---------+--------------+---------------+---------------
  4. github_events | 102026 | t | 268435456 | 402653183
  5. github_events | 102027 | t | 402653184 | 536870911
  6. github_events | 102028 | t | 536870912 | 671088639
  7. github_events | 102029 | t | 671088640 | 805306367
  8. (4 rows)

Shard Storage Types

The shardstorage column in pg_dist_shard indicates the type of storage used for the shard. A brief overview of different shard storage types and their representation is below.

Storage Type

Shardstorage value

Description

TABLE

‘t’

Indicates that shard stores data belonging to a regular
distributed table.

COLUMNAR

‘c’

Indicates that shard stores columnar data. (Used by
distributed cstore_fdw tables)

FOREIGN

‘f’

Indicates that shard stores foreign data. (Used by
distributed file_fdw tables)

Shard information view

In addition to the low-level shard metadata table described above, Citus provides a citus_shards view to easily check:

  • Where each shard is (node, and port),

  • What kind of table it belongs to, and

  • Its size

This view helps you inspect shards to find, among other things, any size imbalances across nodes.

  1. SELECT * FROM citus_shards;
  1. .
  2. table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size
  3. ------------+---------+--------------+------------------+---------------+-----------+----------+------------
  4. dist | 102170 | dist_102170 | distributed | 34 | localhost | 9701 | 90677248
  5. dist | 102171 | dist_102171 | distributed | 34 | localhost | 9702 | 90619904
  6. dist | 102172 | dist_102172 | distributed | 34 | localhost | 9701 | 90701824
  7. dist | 102173 | dist_102173 | distributed | 34 | localhost | 9702 | 90693632
  8. ref | 102174 | ref_102174 | reference | 2 | localhost | 9701 | 8192
  9. ref | 102174 | ref_102174 | reference | 2 | localhost | 9702 | 8192
  10. dist2 | 102175 | dist2_102175 | distributed | 34 | localhost | 9701 | 933888
  11. dist2 | 102176 | dist2_102176 | distributed | 34 | localhost | 9702 | 950272
  12. dist2 | 102177 | dist2_102177 | distributed | 34 | localhost | 9701 | 942080
  13. dist2 | 102178 | dist2_102178 | distributed | 34 | localhost | 9702 | 933888

The colocation_id refers to the colocation group. For more info about citus_table_type, see Table Types.

Shard placement table

The pg_dist_placement table tracks the location of shard replicas on worker nodes. Each replica of a shard assigned to a specific node is called a shard placement. This table stores information about the health and location of each shard placement.

Name

Type

Description

placementid

bigint

Unique auto-generated identifier for each individual placement.

shardid

bigint

Shard identifier associated with this placement. This value references
the shardid column in the pg_dist_shard catalog table.

shardstate

int

Describes the state of this placement. Different shard states are
discussed in the section below.

shardlength

bigint

For append distributed tables, the size of the shard placement on the
worker node in bytes.
For hash distributed tables, zero.

groupid

int

Identifier used to denote a group of one primary server and zero or more
secondary servers.
  1. SELECT * from pg_dist_placement;
  2. placementid | shardid | shardstate | shardlength | groupid
  3. -------------+---------+------------+-------------+---------
  4. 1 | 102008 | 1 | 0 | 1
  5. 2 | 102008 | 1 | 0 | 2
  6. 3 | 102009 | 1 | 0 | 2
  7. 4 | 102009 | 1 | 0 | 3
  8. 5 | 102010 | 1 | 0 | 3
  9. 6 | 102010 | 1 | 0 | 4
  10. 7 | 102011 | 1 | 0 | 4

Note

As of Citus 7.0 the analogous table pg_dist_shard_placement has been deprecated. It included the node name and port for each placement:

  1. SELECT * from pg_dist_shard_placement;
  2. shardid | shardstate | shardlength | nodename | nodeport | placementid
  3. ---------+------------+-------------+-----------+----------+-------------
  4. 102008 | 1 | 0 | localhost | 12345 | 1
  5. 102008 | 1 | 0 | localhost | 12346 | 2
  6. 102009 | 1 | 0 | localhost | 12346 | 3
  7. 102009 | 1 | 0 | localhost | 12347 | 4
  8. 102010 | 1 | 0 | localhost | 12347 | 5
  9. 102010 | 1 | 0 | localhost | 12345 | 6
  10. 102011 | 1 | 0 | localhost | 12345 | 7

That information is now available by joining pg_dist_placement with pg_dist_node on the groupid. For compatibility Citus still provides pg_dist_shard_placement as a view. However, we recommend using the new, more normalized, tables when possible.

Shard Placement States

Citus manages shard health on a per-placement basis and automatically marks a placement as unavailable if leaving the placement in service would put the cluster in an inconsistent state. The shardstate column in the pg_dist_placement table is used to store the state of shard placements. A brief overview of different shard placement states and their representation is below.

State name

Shardstate value

Description

FINALIZED

1

This is the state new shards are created in. Shard placements
in this state are considered up-to-date and are used in query
planning and execution.

INACTIVE

3

Shard placements in this state are considered inactive due to
being out-of-sync with other replicas of the same shard. This
can occur when an append, modification (INSERT, UPDATE or
DELETE ) or a DDL operation fails for this placement. The query
planner will ignore placements in this state during planning and
execution. Users can synchronize the data in these shards with
a finalized replica as a background activity.

TO_DELETE

4

If Citus attempts to drop a shard placement in response to a
master_apply_delete_command call and fails, the placement is
moved to this state. Users can then delete these shards as a
subsequent background activity.

Worker node table

The pg_dist_node table contains information about the worker nodes in the cluster.

Name

Type

Description

nodeid

int

Auto-generated identifier for an individual node.

groupid

int

Identifier used to denote a group of one primary server and zero or more
secondary servers. By default it is the same as the nodeid.

nodename

text

Host Name or IP Address of the PostgreSQL worker node.

nodeport

int

Port number on which the PostgreSQL worker node is listening.

noderack

text

(Optional) Rack placement information for the worker node.

hasmetadata

boolean

Reserved for internal use.

isactive

boolean

Whether the node is active accepting shard placements.

noderole

text

Whether the node is a primary or secondary

nodecluster

text

The name of the cluster containing this node

metadatasynced

boolean

Reserved for internal use.

shouldhaveshards

boolean

If false, shards will be moved off node (drained) when rebalancing,
nor will shards from new distributed tables be placed on the node,
unless they are colocated with shards already there
  1. SELECT * from pg_dist_node;
  2. nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
  3. --------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------
  4. 1 | 1 | localhost | 12345 | default | f | t | primary | default | f | t
  5. 2 | 2 | localhost | 12346 | default | f | t | primary | default | f | t
  6. 3 | 3 | localhost | 12347 | default | f | t | primary | default | f | t
  7. (3 rows)

Distributed object table

The citus.pg_dist_object table contains a list of objects such as types and functions that have been created on the coordinator node and propagated to worker nodes. When an administrator adds new worker nodes to the cluster, Citus automatically creates copies of the distributed objects on the new nodes (in the correct order to satisfy object dependencies).

Name

Type

Description

classid

oid

Class of the distributed object

objid

oid

Object id of the distributed object

objsubid

integer

Object sub id of the distributed object, e.g. attnum

type

text

Part of the stable address used during pg upgrades

object_names

text[]

Part of the stable address used during pg upgrades

object_args

text[]

Part of the stable address used during pg upgrades

distribution_argument_index

integer

Only valid for distributed functions/procedures

colocationid

integer

Only valid for distributed functions/procedures

“Stable addresses” uniquely identify objects independently of a specific server. Citus tracks objects during a PostgreSQL upgrade using stable addresses created with the pg_identify_object_as_address() function.

Here’s an example of how create_distributed_function() adds entries to the citus.pg_dist_object table:

  1. CREATE TYPE stoplight AS enum ('green', 'yellow', 'red');
  2. CREATE OR REPLACE FUNCTION intersection()
  3. RETURNS stoplight AS $$
  4. DECLARE
  5. color stoplight;
  6. BEGIN
  7. SELECT *
  8. FROM unnest(enum_range(NULL::stoplight)) INTO color
  9. ORDER BY random() LIMIT 1;
  10. RETURN color;
  11. END;
  12. $$ LANGUAGE plpgsql VOLATILE;
  13. SELECT create_distributed_function('intersection()');
  14. -- will have two rows, one for the TYPE and one for the FUNCTION
  15. TABLE citus.pg_dist_object;
  1. -[ RECORD 1 ]---------------+------
  2. classid | 1247
  3. objid | 16780
  4. objsubid | 0
  5. type |
  6. object_names |
  7. object_args |
  8. distribution_argument_index |
  9. colocationid |
  10. -[ RECORD 2 ]---------------+------
  11. classid | 1255
  12. objid | 16788
  13. objsubid | 0
  14. type |
  15. object_names |
  16. object_args |
  17. distribution_argument_index |
  18. colocationid |

Citus tables view

The citus_tables view shows a summary of all tables managed by Citus (distributed and reference tables). The view combines information from Citus metadata tables for an easy, human-readable overview of these table properties:

Here’s an example:

  1. SELECT * FROM citus_tables;
  1. ┌────────────┬──────────────────┬─────────────────────┬───────────────┬────────────┬─────────────┬─────────────┬───────────────┐
  2. table_name citus_table_type distribution_column colocation_id table_size shard_count table_owner access_method
  3. ├────────────┼──────────────────┼─────────────────────┼───────────────┼────────────┼─────────────┼─────────────┼───────────────┤
  4. foo.test distributed test_column 1 0 bytes 32 citus heap
  5. ref reference <none> 2 24 GB 1 citus heap
  6. test distributed id 1 248 TB 32 citus heap
  7. └────────────┴──────────────────┴─────────────────────┴───────────────┴────────────┴─────────────┴─────────────┴───────────────┘

Time partitions view

Citus provides UDFs to manage partitions for the Timeseries Data use case. It also maintains a time_partitions view to inspect the partitions it manages.

Columns:

  • parent_table the table which is partitioned

  • partition_column the column on which the parent table is partitioned

  • partition the name of a partition table

  • from_value lower bound in time for rows in this partition

  • to_value upper bound in time for rows in this partition

  • access_method heap for row-based storage, and columnar for columnar storage

  1. SELECT * FROM time_partitions;
  1. ┌────────────────────────┬──────────────────┬─────────────────────────────────────────┬─────────────────────┬─────────────────────┬───────────────┐
  2. parent_table partition_column partition from_value to_value access_method
  3. ├────────────────────────┼──────────────────┼─────────────────────────────────────────┼─────────────────────┼─────────────────────┼───────────────┤
  4. github_columnar_events created_at github_columnar_events_p2015_01_01_0000 2015-01-01 00:00:00 2015-01-01 02:00:00 columnar
  5. github_columnar_events created_at github_columnar_events_p2015_01_01_0200 2015-01-01 02:00:00 2015-01-01 04:00:00 columnar
  6. github_columnar_events created_at github_columnar_events_p2015_01_01_0400 2015-01-01 04:00:00 2015-01-01 06:00:00 columnar
  7. github_columnar_events created_at github_columnar_events_p2015_01_01_0600 2015-01-01 06:00:00 2015-01-01 08:00:00 heap
  8. └────────────────────────┴──────────────────┴─────────────────────────────────────────┴─────────────────────┴─────────────────────┴───────────────┘

Co-location group table

The pg_dist_colocation table contains information about which tables’ shards should be placed together, or co-located. When two tables are in the same co-location group, Citus ensures shards with the same partition values will be placed on the same worker nodes. This enables join optimizations, certain distributed rollups, and foreign key support. Shard co-location is inferred when the shard counts, replication factors, and partition column types all match between two tables; however, a custom co-location group may be specified when creating a distributed table, if so desired.

Name

Type

Description

colocationid

int

Unique identifier for the co-location group this row corresponds to.

shardcount

int

Shard count for all tables in this co-location group

replicationfactor

int

Replication factor for all tables in this co-location group.

distributioncolumntype

oid

The type of the distribution column for all tables in this
co-location group.

distributioncolumncollation

oid

The collation of the distribution column for all tables in
this co-location group.
  1. SELECT * from pg_dist_colocation;
  2. colocationid | shardcount | replicationfactor | distributioncolumntype | distributioncolumncollation
  3. --------------+------------+-------------------+------------------------+-----------------------------
  4. 2 | 32 | 2 | 20 | 0
  5. (1 row)

Rebalancer strategy table

This table defines strategies that rebalance_table_shards can use to determine where to move shards.

Name

Type

Description

name

name

Unique name for the strategy

default_strategy

boolean

Whether rebalance_table_shards should choose this strategy by
this column

shard_cost_function

regproc

Identifier for a cost function, which must take a shardid as bigint,
and return its notion of a cost, as type real

node_capacity_function

regproc

Identifier for a capacity function, which must take a nodeid as int,
and return its notion of node capacity as type real

shard_allowed_on_node_function

regproc

Identifier for a function that given shardid bigint, and nodeidarg int,
returns boolean for whether the shard is allowed to be stored on the
node

default_threshold

float4

Threshold for deeming a node too full or too empty, which determines
when the rebalance_table_shards should try to move shards

minimum_threshold

float4

A safeguard to prevent the threshold argument of
rebalance_table_shards() from being set too low

improvement_threshold

float4

Determines when moving a shard is worth it during a rebalance.
The rebalancer will move a shard when the ratio of the improvement with
the shard move to the improvement without crosses the threshold. This
is most useful with the by_disk_size strategy.

A Citus installation ships with these strategies in the table:

  1. SELECT * FROM pg_dist_rebalance_strategy;
  1. -[ RECORD 1 ]------------------+---------------------------------
  2. name | by_shard_count
  3. default_strategy | t
  4. shard_cost_function | citus_shard_cost_1
  5. node_capacity_function | citus_node_capacity_1
  6. shard_allowed_on_node_function | citus_shard_allowed_on_node_true
  7. default_threshold | 0
  8. minimum_threshold | 0
  9. improvement_threshold | 0
  10. -[ RECORD 2 ]------------------+---------------------------------
  11. name | by_disk_size
  12. default_strategy | f
  13. shard_cost_function | citus_shard_cost_by_disk_size
  14. node_capacity_function | citus_node_capacity_1
  15. shard_allowed_on_node_function | citus_shard_allowed_on_node_true
  16. default_threshold | 0.1
  17. minimum_threshold | 0.01
  18. improvement_threshold | 0.5

The default strategy, by_shard_count, assigns every shard the same cost. Its effect is to equalize the shard count across nodes. The other predefined strategy, by_disk_size, assigns a cost to each shard matching its disk size in bytes plus that of the shards that are colocated with it. The disk size is calculated using pg_total_relation_size, so it includes indices. This strategy attempts to achieve the same disk space on every node. Note the threshold of 0.1 – it prevents unnecessary shard movement caused by insigificant differences in disk space.

Creating custom rebalancer strategies

Here are examples of functions that can be used within new shard rebalancer strategies, and registered in the Rebalancer strategy table with the citus_add_rebalance_strategy function.

  • Setting a node capacity exception by hostname pattern:

    1. -- example of node_capacity_function
    2. CREATE FUNCTION v2_node_double_capacity(nodeidarg int)
    3. RETURNS real AS $$
    4. SELECT
    5. (CASE WHEN nodename LIKE '%.v2.worker.citusdata.com' THEN 2.0::float4 ELSE 1.0::float4 END)
    6. FROM pg_dist_node where nodeid = nodeidarg
    7. $$ LANGUAGE sql;
  • Rebalancing by number of queries that go to a shard, as measured by the Query statistics table:

    1. -- example of shard_cost_function
    2. CREATE FUNCTION cost_of_shard_by_number_of_queries(shardid bigint)
    3. RETURNS real AS $$
    4. SELECT coalesce(sum(calls)::real, 0.001) as shard_total_queries
    5. FROM citus_stat_statements
    6. WHERE partition_key is not null
    7. AND get_shard_id_for_distribution_column('tab', partition_key) = shardid;
    8. $$ LANGUAGE sql;
  • Isolating a specific shard (10000) on a node (address ‘10.0.0.1’):

    1. -- example of shard_allowed_on_node_function
    2. CREATE FUNCTION isolate_shard_10000_on_10_0_0_1(shardid bigint, nodeidarg int)
    3. RETURNS boolean AS $$
    4. SELECT
    5. (CASE WHEN nodename = '10.0.0.1' THEN shardid = 10000 ELSE shardid != 10000 END)
    6. FROM pg_dist_node where nodeid = nodeidarg
    7. $$ LANGUAGE sql;
    8. -- The next two definitions are recommended in combination with the above function.
    9. -- This way the average utilization of nodes is not impacted by the isolated shard.
    10. CREATE FUNCTION no_capacity_for_10_0_0_1(nodeidarg int)
    11. RETURNS real AS $$
    12. SELECT
    13. (CASE WHEN nodename = '10.0.0.1' THEN 0 ELSE 1 END)::real
    14. FROM pg_dist_node where nodeid = nodeidarg
    15. $$ LANGUAGE sql;
    16. CREATE FUNCTION no_cost_for_10000(shardid bigint)
    17. RETURNS real AS $$
    18. SELECT
    19. (CASE WHEN shardid = 10000 THEN 0 ELSE 1 END)::real
    20. $$ LANGUAGE sql;

Query statistics table

Note

The citus_stat_statements view is a part of Citus Enterprise. Please contact us to obtain this functionality.

Citus provides citus_stat_statements for stats about how queries are being executed, and for whom. It’s analogous to (and can be joined with) the pg_stat_statements view in PostgreSQL which tracks statistics about query speed.

This view can trace queries to originating tenants in a multi-tenant application, which helps for deciding when to do Tenant Isolation.

Name

Type

Description

queryid

bigint

identifier (good for pg_stat_statements joins)

userid

oid

user who ran the query

dbid

oid

database instance of coordinator

query

text

anonymized query string

executor

text

Citus executor used: adaptive, or insert-select

partition_key

text

value of distribution column in router-executed queries, else NULL

calls

bigint

number of times the query was run

  1. -- create and populate distributed table
  2. create table foo ( id int );
  3. select create_distributed_table('foo', 'id');
  4. insert into foo select generate_series(1,100);
  5. -- enable stats
  6. -- pg_stat_statements must be in shared_preload libraries
  7. create extension pg_stat_statements;
  8. select count(*) from foo;
  9. select * from foo where id = 42;
  10. select * from citus_stat_statements;

Results:

  1. -[ RECORD 1 ]-+----------------------------------------------
  2. queryid | -909556869173432820
  3. userid | 10
  4. dbid | 13340
  5. query | insert into foo select generate_series($1,$2)
  6. executor | insert-select
  7. partition_key |
  8. calls | 1
  9. -[ RECORD 2 ]-+----------------------------------------------
  10. queryid | 3919808845681956665
  11. userid | 10
  12. dbid | 13340
  13. query | select count(*) from foo;
  14. executor | adaptive
  15. partition_key |
  16. calls | 1
  17. -[ RECORD 3 ]-+----------------------------------------------
  18. queryid | 5351346905785208738
  19. userid | 10
  20. dbid | 13340
  21. query | select * from foo where id = $1
  22. executor | adaptive
  23. partition_key | 42
  24. calls | 1

Caveats:

  • The stats data is not replicated, and won’t survive database crashes or failover

  • Tracks a limited number of queries, set by the pg_stat_statements.max GUC (default 5000)

  • To truncate the table, use the citus_stat_statements_reset() function

Distributed Query Activity

In some situations, queries might get blocked on row-level locks on one of the shards on a worker node. If that happens then those queries would not show up in pg_locks on the Citus coordinator node.

Citus provides special views to watch queries and locks throughout the cluster, including shard-specific queries used internally to build results for distributed queries.

  • citus_dist_stat_activity: shows the distributed queries that are executing on all nodes. A superset of pg_stat_activity, usable wherever the latter is.

  • citus_worker_stat_activity: shows queries on workers, including fragment queries against individual shards.

  • citus_lock_waits: Blocked queries throughout the cluster.

The first two views include all columns of pg_stat_activity plus the host/port of the worker that initiated the query and the host/port of the coordinator node of the cluster.

For example, consider counting the rows in a distributed table:

  1. -- run from worker on localhost:9701
  2. SELECT count(*) FROM users_table;

We can see the query appear in citus_dist_stat_activity:

  1. SELECT * FROM citus_dist_stat_activity;
  2. -[ RECORD 1 ]----------+----------------------------------
  3. query_hostname | localhost
  4. query_hostport | 9701
  5. master_query_host_name | localhost
  6. master_query_host_port | 9701
  7. transaction_number | 1
  8. transaction_stamp | 2018-10-05 13:27:20.691907+03
  9. datid | 12630
  10. datname | postgres
  11. pid | 23723
  12. usesysid | 10
  13. usename | citus
  14. application_name | psql
  15. client_addr |
  16. client_hostname |
  17. client_port | -1
  18. backend_start | 2018-10-05 13:27:14.419905+03
  19. xact_start | 2018-10-05 13:27:16.362887+03
  20. query_start | 2018-10-05 13:27:20.682452+03
  21. state_change | 2018-10-05 13:27:20.896546+03
  22. wait_event_type | Client
  23. wait_event | ClientRead
  24. state | idle in transaction
  25. backend_xid |
  26. backend_xmin |
  27. query | SELECT count(*) FROM users_table;
  28. backend_type | client backend

This query requires information from all shards. Some of the information is in shard users_table_102038 which happens to be stored in localhost:9700. We can see a query accessing the shard by looking at the citus_worker_stat_activity view:

  1. SELECT * FROM citus_worker_stat_activity;
  2. -[ RECORD 1 ]----------+-----------------------------------------------------------------------------------------
  3. query_hostname | localhost
  4. query_hostport | 9700
  5. master_query_host_name | localhost
  6. master_query_host_port | 9701
  7. transaction_number | 1
  8. transaction_stamp | 2018-10-05 13:27:20.691907+03
  9. datid | 12630
  10. datname | postgres
  11. pid | 23781
  12. usesysid | 10
  13. usename | citus
  14. application_name | citus
  15. client_addr | ::1
  16. client_hostname |
  17. client_port | 51773
  18. backend_start | 2018-10-05 13:27:20.75839+03
  19. xact_start | 2018-10-05 13:27:20.84112+03
  20. query_start | 2018-10-05 13:27:20.867446+03
  21. state_change | 2018-10-05 13:27:20.869889+03
  22. wait_event_type | Client
  23. wait_event | ClientRead
  24. state | idle in transaction
  25. backend_xid |
  26. backend_xmin |
  27. query | COPY (SELECT count(*) AS count FROM users_table_102038 users_table WHERE true) TO STDOUT
  28. backend_type | client backend

The query field shows data being copied out of the shard to be counted.

Note

If a router query (e.g. single-tenant in a multi-tenant application, SELECT * FROM table WHERE tenant_id = X) is executed without a transaction block, then citus_query_host_name and citus_query_host_port columns will be NULL in citus_worker_stat_activity.

Here are examples of useful queries you can build using citus_worker_stat_activity:

  1. -- active queries' wait events on a certain node
  2. SELECT query, wait_event_type, wait_event
  3. FROM citus_worker_stat_activity
  4. WHERE query_hostname = 'xxxx' and state='active';
  5. -- active queries' top wait events
  6. SELECT wait_event, wait_event_type, count(*)
  7. FROM citus_worker_stat_activity
  8. WHERE state='active'
  9. GROUP BY wait_event, wait_event_type
  10. ORDER BY count(*) desc;
  11. -- total internal connections generated per node by Citus
  12. SELECT query_hostname, count(*)
  13. FROM citus_worker_stat_activity
  14. GROUP BY query_hostname;
  15. -- total internal active connections generated per node by Citus
  16. SELECT query_hostname, count(*)
  17. FROM citus_worker_stat_activity
  18. WHERE state='active'
  19. GROUP BY query_hostname;

The next view is citus_lock_waits. To see how it works, we can generate a locking situation manually. First we’ll set up a test table from the coordinator:

  1. CREATE TABLE numbers AS
  2. SELECT i, 0 AS j FROM generate_series(1,10) AS i;
  3. SELECT create_distributed_table('numbers', 'i');

Then, using two sessions on the coordinator, we can run this sequence of statements:

  1. -- session 1 -- session 2
  2. ------------------------------------- -------------------------------------
  3. BEGIN;
  4. UPDATE numbers SET j = 2 WHERE i = 1;
  5. BEGIN;
  6. UPDATE numbers SET j = 3 WHERE i = 1;
  7. -- (this blocks)

The citus_lock_waits view shows the situation.

  1. SELECT * FROM citus_lock_waits;
  2. -[ RECORD 1 ]-------------------------+----------------------------------------
  3. waiting_pid | 88624
  4. blocking_pid | 88615
  5. blocked_statement | UPDATE numbers SET j = 3 WHERE i = 1;
  6. current_statement_in_blocking_process | UPDATE numbers SET j = 2 WHERE i = 1;
  7. waiting_node_id | 0
  8. blocking_node_id | 0
  9. waiting_node_name | coordinator_host
  10. blocking_node_name | coordinator_host
  11. waiting_node_port | 5432
  12. blocking_node_port | 5432

In this example the queries originated on the coordinator, but the view can also list locks between queries originating on workers.

Tables on all Nodes

Citus has other informational tables and views which are accessible on all nodes, not just the coordinator.

Connection Credentials Table

Note

This table is a part of Citus Enterprise Edition. Please contact us to obtain this functionality.

The pg_dist_authinfo table holds authentication parameters used by Citus nodes to connect to one another.

Name

Type

Description

nodeid

integer

Node id from Worker node table, or 0, or -1

rolename

name

Postgres role

authinfo

text

Space-separated libpq connection parameters

Upon beginning a connection, a node consults the table to see whether a row with the destination nodeid and desired rolename exists. If so, the node includes the corresponding authinfo string in its libpq connection. A common example is to store a password, like 'password=abc123', but you can review the full list of possibilities.

The parameters in authinfo are space-separated, in the form key=val. To write an empty value, or a value containing spaces, surround it with single quotes, e.g., keyword='a value'. Single quotes and backslashes within the value must be escaped with a backslash, i.e., \' and \\.

The nodeid column can also take the special values 0 and -1, which mean all nodes or loopback connections, respectively. If, for a given node, both specific and all-node rules exist, the specific rule has precedence.

  1. SELECT * FROM pg_dist_authinfo;
  2. nodeid | rolename | authinfo
  3. --------+----------+-----------------
  4. 123 | jdoe | password=abc123
  5. (1 row)

Connection Pooling Credentials

Note

This table is a part of Citus Enterprise Edition. Please contact us to obtain this functionality.

If you want to use a connection pooler to connect to a node, you can specify the pooler options using pg_dist_poolinfo. This metadata table holds the host, port and database name for Citus to use when connecting to a node through a pooler.

If pool information is present, Citus will try to use these values instead of setting up a direct connection. The pg_dist_poolinfo information in this case supersedes pg_dist_node.

Name

Type

Description

nodeid

integer

Node id from Worker node table

poolinfo

text

Space-separated parameters: host, port, or dbname

Note

In some situations Citus ignores the settings in pg_dist_poolinfo. For instance Shard rebalancing is not compatible with connection poolers such as pgbouncer. In these scenarios Citus will use a direct connection.

  1. -- how to connect to node 1 (as identified in pg_dist_node)
  2. INSERT INTO pg_dist_poolinfo (nodeid, poolinfo)
  3. VALUES (1, 'host=127.0.0.1 port=5433');