TimescaleDB Integration

Installation

  1. Install TimescaleDB as described in the TimescaleDB documentation.

  2. Install the Debezium PostgresSQL connector according to the instructions in the Debezium installation guide.

  3. Configure TimescaleDB, and deploy the connector.

How it works

Debezium can capture events from the following TimescaleDB functions:

  • Hypertables

  • Continous aggregates

  • Compression

These three functions are internally dependent. Each of the functions are built on the PostgreSQL functionality for storing data in tables. Debezium supports all three functions to differing degrees.

The SMT needs access to TimescaleDB metadata. Because the SMT cannot access the database configuration at the connector level, you must explicitly define configuration metadata for the transformation.

Hypertables

Hypertable is a logical table that is used to store time-series data. Data is chunked (partitioned) according to a defined time-bound column. TimescaleDB creates one or more physical tables in its internal schema, with each table representing a single chunk. By default, the connector captures changes from each chunk table, and streams the changes to the individual topics that correspond to each chunk. The Timescaledb transformation reassembles data from the separate topics, and then streams the reassembled data to a single topic.

  • The transformation has access to TimescaleDB metadata to obtain chunk/hypertable mapping.

  • The transformation reroutes the captured events from their chunk-specific topics to a single logical topic that is named according to the following pattern: _<prefix>_._<hypertable-schema-name>_._<hypertable-name>_

  • The transformation adds the following headers to the event:

    __debezium_timescaledb_chunk_table

    The name of the physical table that stores the event data.

    __debezium_timescaledb_chunk_schema

    The name of the schema that the physical table belongs to.

Example: Streaming data from a hypertable

The following example shows a SQL command for creating a conditions hypertable in the public schema:

  1. CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL);
  2. SELECT create_hypertable('conditions', 'time');

The Timescaledb SMT routes change events captured in the hypertable to a topic with the name timescaledb.public.conditions. The transformation enriches event messages with headers that you define in the configuration. For example:

  1. __debezium_timescaledb_chunk_table: _hyper_1_1_chunk
  2. __debezium_timescaledb_chunk_schema: _timescaledb_internal

Continuous aggregates

Continuous aggregates provides automatic statistical calculations over data that is stored in hypertables. The aggregate view is backed by its own hypertable which in turn is backed by a set of PostgreSQL tables. The aggregates can be recalculated either automatically or manually. After an aggregate is recalculated, the new values are stored in the hypertable, from which they can be captured and streamed. Data from the aggregates is streamed to different topics, based on the chunk in which it is stored. The Timescaledb transformation reassembles data that was streamed to different topics and routes it to a single topic.

  • The transformation has access to TimescaleDB metadata to obtain mappings between chunks and hypertables, and between hypertables and aggregates.

  • The transformation reroutes the captured events from their chunk-specific topics to a single logical topic that is named according to the following pattern _<prefix>_._<aggregate-schema-name>_._<aggregate-name>_.

  • The transformation adds the following headers to the event:

    __debezium_timescaledb_hypertable_table

    The name of the hypertable that stores the continuous aggregate.

    __debezium_timescaledb_hypertable_schema

    The name of the schema that the hypertable belongs to.

    __debezium_timescaledb_chunk_table

    The name of the physical table that stores the continuous aggregate.

    __debezium_timescaledb_chunk_schema

    The name of the schema that the physical table belongs to.

Example: Streaming data from a continuous aggregate

The following example shows a SQL command for creating a continuous aggregate conditions_summary in the public schema.

  1. CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) AS
  2. SELECT
  3. location,
  4. time_bucket(INTERVAL '1 hour', time) AS bucket,
  5. AVG(temperature),
  6. MAX(temperature),
  7. MIN(temperature)
  8. FROM conditions
  9. GROUP BY location, bucket;

The TimescaleDB SMT routes the change events captured in the aggregate to a topic with the name timescaledb.public.conditions_summary. The transformation enriches event messages with headers that you define in the configuration. For example:

  1. _debezium_timescaledb_chunk_table: _hyper_2_2_chunk
  2. __debezium_timescaledb_chunk_schema: _timescaledb_internal
  3. __debezium_timescaledb_hypertable_table: _materialized_hypertable_2
  4. __debezium_timescaledb_hypertable_schema: _timescaledb_internal

Compression

The TimescaleDB SMT does not apply any special processing to compression functions. Compressed chunks are forwarded unchanged to the next downstream job in the pipeline for further processing as needed. Typically, messages with compressed chunks are dropped, and are not processed by subsequent jobs in the pipeline.

TimescaleDB configuration

Debezium uses replication slots to capture changes from TimescaleDB and PostgreSQL. Replication slots Data store data in multiple message formats. Typically, it’s best to configure Debezium to use the pgoutput decoder, the default decoder for TimescaleDB instances, to read from the slot.

To configure the replication slot, specify the following in the postgresql.conf file:

  1. # REPLICATION
  2. wal_level = logical (1)
1Instructs the server to use logical decoding with the write-ahead log.

To configure tables for replication, you must create a publication, as shown in the following example:

  1. CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update')

You can create publications globally, as in the preceding example, or create separate publications for each table. Because the TimescaleDB creates tables automatically, as needed, the use of global publications is strongly recommended.

Connector configuration

Configure the TimescaleDB SMT in the same way that you would configure the PostgreSQL connector. To enable the connector to correctly process events from TimescaleDB, add the following options to the connector configuration:

  1. "transforms": "timescaledb",
  2. "transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb",
  3. "transforms.timescaledb.database.hostname": "timescaledb",
  4. "transforms.timescaledb.database.port": "...",
  5. "transforms.timescaledb.database.user": "...",
  6. "transforms.timescaledb.database.password": "...",
  7. "transforms.timescaledb.database.dbname": "..."

Connector configuration example

The following example shows the configuration for setting up a PostgreSQL connector to connect to a TimescaleDB server with the logical name dbserver1 on port 5432 at 192.168.99.100. Typically, you configure the Debezium PostgreSQL connector in a JSON file by setting the configuration properties available for the connector.

You can choose to produce events for a subset of the schemas and tables in a database. Optionally, you can ignore, mask, or truncate columns that contain sensitive data, that exceed a specified size, or that you do not need.

  1. {
  2. "name": "timescaledb-connector", (1)
  3. "config": {
  4. "connector.class": "io.debezium.connector.postgresql.PostgresConnector", (2)
  5. "database.hostname": "192.168.99.100", (3)
  6. "database.port": "5432", (4)
  7. "database.user": "postgres", (5)
  8. "database.password": "postgres", (6)
  9. "database.dbname" : "postgres", (7)
  10. "topic.prefix": "dbserver1", (8)
  11. "plugin.name": "pgoutput", (9)
  12. "schema.include.list": "_timescaledb_internal", (10)
  13. "transforms": "timescaledb", (11)
  14. "transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb", (12)
  15. "transforms.timescaledb.database.hostname": "timescaledb", (13)
  16. "transforms.timescaledb.database.port": "5432", (14)
  17. "transforms.timescaledb.database.user": "postgres", (15)
  18. "transforms.timescaledb.database.password": "postgres", (16)
  19. "transforms.timescaledb.database.dbname": "postgres" (17)
  20. }
  21. }
1The name of the connector when registered with a Kafka Connect service.
2The name of this PostgreSQL connector class.
3The address of the TimescaleDB server.
4The port number of the TimescaleDB server.
5The name of the TimescaleDB user.
6The password for the TimescaleDB.
7The name of the TimescaleDB database to connect to.
8The topic prefix for the TimescaleDB server or cluster. This prefix forms a namespace, and is used in the names of all Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema, when the Avro converter is used.
9Indicates use of the pgoutput logical decoding plug-in.
10A list of all schemas that contain TimescaleDB physical tables.
11Enables the SMT to process raw TimescaleDB events.
12Enables the SMT to process raw TimescaleDB events.
13Provides TimescaleDB connection information for the SMT. The values must match the value of items 3 - 7.

Configuration options

The following table lists the configuration options that you can set for the TimescaleDB integration SMT.

Table 1. TimescaleDB integration SMT (TimescaleDB) configuration options

Property

Default

Description

No default

IP address or hostname of the TimescaleDB database server.

5432

Integer port number of the TimescaleDB database server.

No default

Name of the TimescaleDB database user for connecting to the TimescaleDB database server.

No default

Password to use when connecting to the TimescaleDB database server.

No default

The name of the TimescaleDB database from which to stream changes.

timescaledb_internal

Comma-separated list of schema names that contain TimescaleDB raw (internal) data tables. The SMT processes only those changes that originate in one of the schemas in the list.

timescaledb

The namespace (prefix) of topics to which TimescaleDB events are routed. The SMT routes messages into topics named `<prefix>.<schema>.<hypertable