Real-time Analytics

In this tutorial, we will demonstrate how you can use Citus to ingest events data and run analytical queries on that data in human real-time. For that, we will use a sample Github events dataset.

Note

This tutorial assumes that you already have Citus installed and running. If you don’t have Citus running, you can setup Citus locally using one of the options from Single-Node Citus.

Data model and sample data

We will demo building the database for a real-time analytics application. This application will insert large volumes of events data and enable analytical queries on that data with sub-second latencies. In our example, we’re going to work with the Github events dataset. This dataset includes all public events on Github, such as commits, forks, new issues, and comments on these issues.

We will use two Postgres tables to represent this data. To get started, you will need to download sample data for these tables:

  1. curl https://examples.citusdata.com/tutorial/users.csv > users.csv
  2. curl https://examples.citusdata.com/tutorial/events.csv > events.csv

If you are using Docker, you should use the docker cp command to copy the files into the Docker container.

  1. docker cp users.csv citus:.
  2. docker cp events.csv citus:.

Creating tables

To start, you can first connect to the Citus coordinator using psql.

If you are using native Postgres, as installed in our Single-Node Citus guide, the coordinator node will be running on port 9700.

  1. psql -p 9700

If you are using Docker, you can connect by running psql with the docker exec command:

  1. docker exec -it citus psql -U postgres

Then, you can create the tables by using standard PostgreSQL CREATE TABLE commands.

  1. CREATE TABLE github_events
  2. (
  3. event_id bigint,
  4. event_type text,
  5. event_public boolean,
  6. repo_id bigint,
  7. payload jsonb,
  8. repo jsonb,
  9. user_id bigint,
  10. org jsonb,
  11. created_at timestamp
  12. );
  13. CREATE TABLE github_users
  14. (
  15. user_id bigint,
  16. url text,
  17. login text,
  18. avatar_url text,
  19. gravatar_id text,
  20. display_login text
  21. );

Next, you can create indexes on events data just like you would do in PostgreSQL. In this example, we’re also going to create a GIN index to make querying on jsonb fields faster.

  1. CREATE INDEX event_type_index ON github_events (event_type);
  2. CREATE INDEX payload_index ON github_events USING GIN (payload jsonb_path_ops);

Distributing tables and loading data

We will now go ahead and tell Citus to distribute these tables across the nodes in the cluster. To do so, you can run create_distributed_table and specify the table you want to shard and the column you want to shard on. In this case, we will shard all the tables on user_id.

  1. SELECT create_distributed_table('github_users', 'user_id');
  2. SELECT create_distributed_table('github_events', 'user_id');

Sharding all tables on the user identifier allows Citus to colocate these tables together, and allows for efficient joins and distributed roll-ups. You can learn more about the benefits of this approach here.

Then, you can go ahead and load the data we downloaded into the tables using the standard PostgreSQL \COPY command. Please make sure that you specify the correct file path if you downloaded the file to a different location.

  1. \copy github_users from 'users.csv' with csv
  2. \copy github_events from 'events.csv' with csv

Running queries

Now that we have loaded data into the tables, let’s go ahead and run some queries. First, let’s check how many users we have in our distributed database.

  1. SELECT count(*) FROM github_users;

Now, let’s analyze Github push events in our data. We will first compute the number of commits per minute by using the number of distinct commits in each push event.

  1. SELECT date_trunc('minute', created_at) AS minute,
  2. sum((payload->>'distinct_size')::int) AS num_commits
  3. FROM github_events
  4. WHERE event_type = 'PushEvent'
  5. GROUP BY minute
  6. ORDER BY minute;

We also have a users table. We can also easily join the users with events, and find the top ten users who created the most repositories.

  1. SELECT login, count(*)
  2. FROM github_events ge
  3. JOIN github_users gu
  4. ON ge.user_id = gu.user_id
  5. WHERE event_type = 'CreateEvent' AND payload @> '{"ref_type": "repository"}'
  6. GROUP BY login
  7. ORDER BY count(*) DESC LIMIT 10;

Citus also supports standard INSERT, UPDATE, and DELETE commands for ingesting and modifying data. For example, you can update a user’s display login by running the following command:

  1. UPDATE github_users SET display_login = 'no1youknow' WHERE user_id = 24305673;

With this, we come to the end of our tutorial. As a next step, you can look at the Real-Time Apps section to see how you can model your own data and power real-time analytical applications.