Topic Routing

Use case

The default behavior is that a Debezium connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. In other words, a topic receives records for one physical table. When you want a topic to receive records for more than one physical table, you must configure the Debezium connector to re-route the records to that topic.

Logical tables

A logical table is a common use case for routing records for multiple physical tables to one topic. In a logical table, there are multiple physical tables that all have the same schema. For example, sharded tables have the same schema. A logical table might consist of two or more sharded tables: db_shard1.my_table and db_shard2.my_table. The tables are in different shards and are physically distinct but together they form a logical table. You can re-route change event records for tables in any of the shards to the same topic.

Partitioned PostgreSQL tables

When the Debezium PostgreSQL connector captures changes in a partitioned table, the default behavior is that change event records are routed to a different topic for each partition. To emit records from all partitions to one topic, configure the ByLogicalTableRouter SMT. Because each key in a partitioned table is guaranteed to be unique, configure key.enforce.uniqueness=false so that the SMT does not add a key field to ensure unique keys. The addition of a key field is default behavior.

Example

To route change event records for multiple physical tables to the same topic, configure the ByLogicalTableRouter transformation in the Kafka Connect configuration for the Debezium connector. Configuration of the ByLogicalTableRouter SMT requires you to specify regular expressions that determine:

  • The tables for which to route records. These tables must all have the same schema.

  • The destination topic name.

For example, configuration in a .properties file looks like this:

  1. transforms=Reroute
  2. transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
  3. transforms.Reroute.topic.regex=(.*)customers_shard(.*)
  4. transforms.Reroute.topic.replacement=$1customers_all_shards

topic.regex

Specifies a regular expression that the transformation applies to each change event record to determine if it should be routed to a particular topic.

In the example, the regular expression, (.**)customers_shard(.**) matches records for changes to tables whose names include the customers_shard string. This would re-route records for tables with the following names:

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

topic.replacement

Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. In this example, records for the three sharded tables listed above would be routed to the myserver.mydb.customers_all_shards topic.

Ensure unique key

A Debezium change event key uses the table columns that make up the table’s primary key. To route records for multiple physical tables to one topic, the event key must be unique across all of those tables. However, it is possible for each physical table to have a primary key that is unique within only that table. For example, a row in the myserver.mydb.customers_shard1 table might have the same key value as a row in the myserver.mydb.customers_shard2 table.

To ensure that each event key is unique across the tables whose change event records go to the same topic, the ByLogicalTableRouter transformation inserts a field into change event keys. By default, the name of the inserted field is __dbz__physicalTableIdentifier. The value of the inserted field is the default destination topic name.

If you want to, you can configure the ByLogicalTableRouter transformation to insert a different field into the key. To do this, specify the key.field.name option and set it to a field name that does not clash with existing primary key field names. For example:

  1. transforms=Reroute
  2. transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
  3. transforms.Reroute.topic.regex=(.*)customers_shard(.*)
  4. transforms.Reroute.topic.replacement=$1customers_all_shards
  5. transforms.Reroute.key.field.name=shard_id

This example adds the shard_id field to the key structure in routed records.

If you want to adjust the value of the key’s new field, configure both of these options:

key.field.regex

Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters.

key.field.replacement

Specifies a regular expression for determining the value of the inserted key field in terms of those captured groups.

For example:

  1. transforms.Reroute.key.field.regex=(.*)customers_shard(.*)
  2. transforms.Reroute.key.field.replacement=$2

With this configuration, suppose that the default destination topic names are:

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

The transformation uses the values in the second captured group, the shard numbers, as the value of the key’s new field. In this example, the inserted key field’s values would be 1, 2, or 3.

If your tables contain globally unique keys and you do not need to change the key structure, you can set the key.enforce.uniqueness property to false:

  1. ...
  2. transforms.Reroute.key.enforce.uniqueness=false
  3. ...

Configuration options

Property

Default

Description

Specifies a regular expression that the transformation applies to each change event record to determine if it should be routed to a particular topic.

Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. This expression can refer to groups captured by the regular expression that you specify for topic.regex. To refer to a group, specify $1, $2, and so on.

true

Indicates whether to add a field to the record’s change event key. Adding a key field ensures that each event key is unique across the tables whose change event records go to the same topic. This helps to prevent collisions of change events for records that have the same key but that originate from different source tables.

Specify false if you do not want the transformation to add a key field. For example, if you are routing records from a partitioned PostgreSQL table to one topic, you can configure key.enforce.uniqueness=false because unique keys are guaranteed in partitioned PostgreSQL tables.

dbzphysicalTableIdentifier

Name of a field to be added to the change event key. The value of this field identifies the original table name. For the SMT to add this field, key.enforce.uniqueness must be true, which is the default.

Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters. For the SMT to apply this expression, key.enforce.uniqueness must be true, which is the default.

Specifies a regular expression for determining the value of the inserted key field in terms of the groups captured by the expression specified for key.field.regex. For the SMT to apply this expression, key.enforce.uniqueness must be true, which is the default.