TimescaleDB Integration
Installation
Install TimescaleDB as described in the TimescaleDB documentation.
Install the Debezium PostgresSQL connector according to the instructions in the Debezium installation guide.
Configure TimescaleDB, and deploy the connector.
How it works
Debezium can capture events from the following TimescaleDB functions:
Hypertables
Continous aggregates
Compression
These three functions are internally dependent. Each of the functions are built on the PostgreSQL functionality for storing data in tables. Debezium supports all three functions to differing degrees.
The SMT needs access to TimescaleDB metadata. Because the SMT cannot access the database configuration at the connector level, you must explicitly define configuration metadata for the transformation.
Hypertables
Hypertable is a logical table that is used to store time-series data. Data is chunked (partitioned) according to a defined time-bound column. TimescaleDB creates one or more physical tables in its internal schema, with each table representing a single chunk. By default, the connector captures changes from each chunk table, and streams the changes to the individual topics that correspond to each chunk. The Timescaledb transformation reassembles data from the separate topics, and then streams the reassembled data to a single topic.
The transformation has access to TimescaleDB metadata to obtain chunk/hypertable mapping.
The transformation reroutes the captured events from their chunk-specific topics to a single logical topic that is named according to the following pattern:
_<prefix>_._<hypertable-schema-name>_._<hypertable-name>_
The transformation adds the following headers to the event:
__debezium_timescaledb_chunk_table
The name of the physical table that stores the event data.
__debezium_timescaledb_chunk_schema
The name of the schema that the physical table belongs to.
Example: Streaming data from a hypertable
The following example shows a SQL command for creating a conditions
hypertable in the public
schema:
CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL);
SELECT create_hypertable('conditions', 'time');
The Timescaledb SMT routes change events captured in the hypertable to a topic with the name timescaledb.public.conditions
. The transformation enriches event messages with headers that you define in the configuration. For example:
__debezium_timescaledb_chunk_table: _hyper_1_1_chunk
__debezium_timescaledb_chunk_schema: _timescaledb_internal
Continuous aggregates
Continuous aggregates provides automatic statistical calculations over data that is stored in hypertables. The aggregate view is backed by its own hypertable which in turn is backed by a set of PostgreSQL tables. The aggregates can be recalculated either automatically or manually. After an aggregate is recalculated, the new values are stored in the hypertable, from which they can be captured and streamed. Data from the aggregates is streamed to different topics, based on the chunk in which it is stored. The Timescaledb transformation reassembles data that was streamed to different topics and routes it to a single topic.
The transformation has access to TimescaleDB metadata to obtain mappings between chunks and hypertables, and between hypertables and aggregates.
The transformation reroutes the captured events from their chunk-specific topics to a single logical topic that is named according to the following pattern
_<prefix>_._<aggregate-schema-name>_._<aggregate-name>_
.The transformation adds the following headers to the event:
__debezium_timescaledb_hypertable_table
The name of the hypertable that stores the continuous aggregate.
__debezium_timescaledb_hypertable_schema
The name of the schema that the hypertable belongs to.
__debezium_timescaledb_chunk_table
The name of the physical table that stores the continuous aggregate.
__debezium_timescaledb_chunk_schema
The name of the schema that the physical table belongs to.
Example: Streaming data from a continuous aggregate
The following example shows a SQL command for creating a continuous aggregate conditions_summary
in the public
schema.
CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) AS
SELECT
location,
time_bucket(INTERVAL '1 hour', time) AS bucket,
AVG(temperature),
MAX(temperature),
MIN(temperature)
FROM conditions
GROUP BY location, bucket;
The TimescaleDB SMT routes the change events captured in the aggregate to a topic with the name timescaledb.public.conditions_summary
. The transformation enriches event messages with headers that you define in the configuration. For example:
_debezium_timescaledb_chunk_table: _hyper_2_2_chunk
__debezium_timescaledb_chunk_schema: _timescaledb_internal
__debezium_timescaledb_hypertable_table: _materialized_hypertable_2
__debezium_timescaledb_hypertable_schema: _timescaledb_internal
Compression
The TimescaleDB SMT does not apply any special processing to compression functions. Compressed chunks are forwarded unchanged to the next downstream job in the pipeline for further processing as needed. Typically, messages with compressed chunks are dropped, and are not processed by subsequent jobs in the pipeline.
TimescaleDB configuration
Debezium uses replication slots to capture changes from TimescaleDB and PostgreSQL. Replication slots Data store data in multiple message formats. Typically, it’s best to configure Debezium to use the pgoutput decoder, the default decoder for TimescaleDB instances, to read from the slot.
To configure the replication slot, specify the following in the postgresql.conf
file:
# REPLICATION
wal_level = logical (1)
1 | Instructs the server to use logical decoding with the write-ahead log. |
To configure tables for replication, you must create a publication, as shown in the following example:
CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update')
You can create publications globally, as in the preceding example, or create separate publications for each table. Because the TimescaleDB creates tables automatically, as needed, the use of global publications is strongly recommended.
Connector configuration
Configure the TimescaleDB SMT in the same way that you would configure the PostgreSQL connector. To enable the connector to correctly process events from TimescaleDB, add the following options to the connector configuration:
"transforms": "timescaledb",
"transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb",
"transforms.timescaledb.database.hostname": "timescaledb",
"transforms.timescaledb.database.port": "...",
"transforms.timescaledb.database.user": "...",
"transforms.timescaledb.database.password": "...",
"transforms.timescaledb.database.dbname": "..."
Connector configuration example
The following example shows the configuration for setting up a PostgreSQL connector to connect to a TimescaleDB server with the logical name dbserver1
on port 5432 at 192.168.99.100. Typically, you configure the Debezium PostgreSQL connector in a JSON file by setting the configuration properties available for the connector.
You can choose to produce events for a subset of the schemas and tables in a database. Optionally, you can ignore, mask, or truncate columns that contain sensitive data, that exceed a specified size, or that you do not need.
{
"name": "timescaledb-connector", (1)
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", (2)
"database.hostname": "192.168.99.100", (3)
"database.port": "5432", (4)
"database.user": "postgres", (5)
"database.password": "postgres", (6)
"database.dbname" : "postgres", (7)
"topic.prefix": "dbserver1", (8)
"plugin.name": "pgoutput", (9)
"schema.include.list": "_timescaledb_internal", (10)
"transforms": "timescaledb", (11)
"transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb", (12)
"transforms.timescaledb.database.hostname": "timescaledb", (13)
"transforms.timescaledb.database.port": "5432", (14)
"transforms.timescaledb.database.user": "postgres", (15)
"transforms.timescaledb.database.password": "postgres", (16)
"transforms.timescaledb.database.dbname": "postgres" (17)
}
}
1 | The name of the connector when registered with a Kafka Connect service. |
2 | The name of this PostgreSQL connector class. |
3 | The address of the TimescaleDB server. |
4 | The port number of the TimescaleDB server. |
5 | The name of the TimescaleDB user. |
6 | The password for the TimescaleDB. |
7 | The name of the TimescaleDB database to connect to. |
8 | The topic prefix for the TimescaleDB server or cluster. This prefix forms a namespace, and is used in the names of all Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema, when the Avro converter is used. |
9 | Indicates use of the pgoutput logical decoding plug-in. |
10 | A list of all schemas that contain TimescaleDB physical tables. |
11 | Enables the SMT to process raw TimescaleDB events. |
12 | Enables the SMT to process raw TimescaleDB events. |
13 | Provides TimescaleDB connection information for the SMT. The values must match the value of items 3 - 7 . |
Configuration options
The following table lists the configuration options that you can set for the TimescaleDB integration SMT.
Property | Default | Description |
No default | IP address or hostname of the TimescaleDB database server. | |
| Integer port number of the TimescaleDB database server. | |
No default | Name of the TimescaleDB database user for connecting to the TimescaleDB database server. | |
No default | Password to use when connecting to the TimescaleDB database server. | |
No default | The name of the TimescaleDB database from which to stream changes. | |
| Comma-separated list of schema names that contain TimescaleDB raw (internal) data tables. The SMT processes only those changes that originate in one of the schemas in the list. | |
| The namespace (prefix) of topics to which TimescaleDB events are routed. The SMT routes messages into topics named `<prefix>.<schema>.<hypertable |