Partition Routing

Example: Basic configuration

You configure the partition routing transformation in the Debezium connector’s Kafka Connect configuration. The configuration specifies the following parameters:

partition.payload.field

Specifies the fields in the event payload that the SMT uses to calculate the destination partition. You can use dot notation to specify nested payload fields.

partition.topic.num

Specifies the number of partitions in the destination topic.

partition.hash.function

Specifies hash function to be used hash of the fields which would determine number of the destination partition.

By default, Debezium routes all change event records for a configured data collection to a single Apache Kafka topic. Connectors do not direct event records to specific partitions in the topic.

To configure a Debezium connector to route events to a specific partition, configure the PartitionRouting SMT in the Kafka Connect configuration for the Debezium connector.

For example, you might add the following configuration in your connector configuration.

  1. ...
  2. topic.creation.default.partitions=2
  3. topic.creation.default.replication.factor=1
  4. ...
  5. topic.prefix=fulfillment
  6. transforms=PartitionRouting
  7. transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
  8. transforms.PartitionRouting.partition.payload.fields=change.name
  9. transforms.PartitionRouting.partition.topic.num=2
  10. transforms.PartitionRouting.predicate=allTopic
  11. predicates=allTopic
  12. predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
  13. predicates.allTopic.pattern=fulfillment.*
  14. ...

Based on the preceding configuration, whenever the SMT receives a message that is bound for a topic with a name that begin with the prefix, fulfillment, it redirects the message to a specific topic partition.

The SMT computes the target partition from a hash of the value of the name field in the message payload. By specifying the`allTopic` predicate, the configuration selectively applies the SMT. The change prefix is a special keyword that enables the SMT to automatically refer to elements in the payload that describe the before or after states of the data. If a specified field is not present in the event message, the SMT ignores it. If none of the fields exist in the message, then the transformation ignores the event message entirely, and delivers the original version of the message to the default destination topic. The number of partitions specified by the topic.num setting in the SMT configuration must match the number of partitions specified by the Kafka Connect configuration. For example, in the preceding configuration example, the value specified by the Kafka Connect property topic.creation.default.partitions matches the topic.num value in the SMT configuration.

Given this Products table

Table 1. Products table

id

name

description

weight

101

scooter

Small 2-wheel scooter

3.14

102

car battery

12V car battery

8.1

103

12-pack drill bits

12-pack of drill bits with sizes ranging from #40 to #3

0.8

104

hammer

12oz carpenter’s hammer

0.75

105

hammer

14oz carpenter’s hammer

0.875

106

hammer

16oz carpenter’s hammer

1.0

107

rocks

box of assorted rocks

5.3

108

jacket

water resistent black wind breaker

0.1

109

spare tire

24 inch spare tire

22.2

Based on the configuration, the SMT routes change events for the records that have the field name hammer to the same partition. That is, the items with id values 104, 105, and 106 are routed to the same partition.

Example: Advanced configuration

Suppose that you want to route events from two data collections (t1, t2) to the same topic (for example, my_topic), and you want to partition events from data collection t1 by using field f1, and partition events from data collection t2 by using field f2.

You could apply the following configuration:

  1. transforms=PartitionRouting
  2. transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
  3. transforms.PartitionRouting.partition.payload.fields=change.f1,change.f2
  4. transforms.PartitionRouting.partition.topic.num=2
  5. transforms.PartitionRouting.predicate=myTopic
  6. predicates=myTopic
  7. predicates.myTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
  8. predicates.myTopic.pattern=my_topic

The preceding configuration does not specify how to re-route events so that they are sent to a specific destination topic. For information about how to send events to topics other than their default destination topics, see the Topic Routing SMT., see the Topic Routing SMT.

Migrating from the Debezium ComputePartition SMT

The Debezium ComputePartition SMT is to be discontinued in a future release. The information in the following section describes how migrate from the ComputePartition SMT to the new PartitionRouting SMT.

Assuming that the configuration sets the same number of partitions for all topics, replace the following ComputePartition`configuration with the `PartitionRouting SMT. The following examples provide a comparison of the two configuration.

Example: Legacy ComputePartition configuration

  1. ...
  2. topic.creation.default.partitions=2
  3. topic.creation.default.replication.factor=1
  4. ...
  5. topic.prefix=fulfillment
  6. transforms=ComputePartition
  7. transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
  8. transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
  9. transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:2,inventory.orders:2
  10. ...

Replace the preceding ComputePartition with the following PartitionRouting configuration. Example: PartitionRouting configuration that replaces the earlier ComputePartition configuration

  1. ...
  2. topic.creation.default.partitions=2
  3. topic.creation.default.replication.factor=1
  4. ...
  5. topic.prefix=fulfillment
  6. transforms=PartitionRouting
  7. transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
  8. transforms.PartitionRouting.partition.payload.fields=change.name,change.purchaser
  9. transforms.PartitionRouting.partition.topic.num=2
  10. transforms.PartitionRouting.predicate=allTopic
  11. predicates=allTopic
  12. predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
  13. predicates.allTopic.pattern=fulfillment.*
  14. ...

If the SMT emits events to topics that do not share the same number of partitions, you must specify unique partition.num.mappings values for each topic. For example, in the following example, the topic for the legacy products collection is configured with 3 partitions, and the topic for the orders data collection is configured with 2 partitions:

Example: Legacy ComputePartition configuration that sets unique partition values for different topics

  1. ...
  2. topic.prefix=fulfillment
  3. transforms=ComputePartition
  4. transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
  5. transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
  6. transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:3,inventory.orders:2
  7. ...

Replace the preceding ComputePartition configuration with the following PartitionRouting configuration: .PartitionRouting configuration that sets unique partition.topic.num values for different topics

  1. ...
  2. topic.prefix=fulfillment
  3. transforms=ProductsPartitionRouting,OrdersPartitionRouting
  4. transforms.ProductsPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
  5. transforms.ProductsPartitionRouting.partition.payload.fields=change.name
  6. transforms.ProductsPartitionRouting.partition.topic.num=3
  7. transforms.ProductsPartitionRouting.predicate=products
  8. transforms.OrdersPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
  9. transforms.OrdersPartitionRouting.partition.payload.fields=change.purchaser
  10. transforms.OrdersPartitionRouting.partition.topic.num=2
  11. transforms.OrdersPartitionRouting.predicate=products
  12. predicates=products,orders
  13. predicates.products.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
  14. predicates.products.pattern=fulfillment.inventory.products
  15. predicates.orders.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
  16. predicates.orders.pattern=fulfillment.inventory.orders
  17. ...

Configuration options

The following table lists the configuration options that you can set for the partition routing SMT.

Table 2. Partition routing SMT (PartitionRouting) configuration options

Property

Default

Description

Specifies the fields in the event payload that the SMT uses to calculate the target partition. Use dot notation if you want the SMT to add fields from the original payload to specific levels in the output data structure. To access fields related to data collections, you can use: after, before, or change. The ‘change’ field is a special field that results in the SMT automatically populating content in the ‘after’ or ‘before’ elements, depending on type of operation. If a specified field is not present in a record, the SMT skips it. For example, after.name,source.table,change.name

The number of partitions for the topic on which this SMT acts. Use the TopicNameMatches predicate to filter records by topic.

java

Hash function to be used when computing hash of the fields which would determine number of the destination partition. Possible values are:

java - standard Java Object::hashCode function

murmur - latest version of MurmurHash function, MurmurHash3

This configuration is optional. If not specified or invalid value is used, the default value will be used.