Message Filtering

Set up

For security reasons, the filter SMT is not included with the Debezium connector archives. Instead, it is provided in a separate artifact, debezium-scripting-2.2.1.Final.tar.gz.

To use the content-based routing SMT with a Debezium connector plug-in, you must explicitly add the SMT artifact to your Kafka Connect environment. IMPORTANT: After the filter SMT is present in a Kafka Connect instance, any user who is allowed to add a connector to the instance can run scripting expressions. To ensure that scripting expressions can be run only by authorized users, be sure to secure the Kafka Connect instance and its configuration interface before you add the filter SMT.

With Zookeeper, Kafka, Kafka Connect, and one or more Debezium connectors installed, the remaining tasks to install the filter SMT are:

  1. Download the scripting SMT archive

  2. Extract the contents of the archive into the Debezium plug-in directories of your Kafka Connect environment.

  3. Obtain a JSR-223 script engine implementation and add its contents to the Debezium plug-in directories of your Kafka Connect environment.

  4. Restart your Kafka Connect process to pick up the new JAR files.

The Groovy language needs the following libraries on the classpath:

  • groovy

  • groovy-json (optional)

  • groovy-jsr223

The JavaScript language needs the following libraries on the classpath:

  • graalvm.js

  • graalvm.js.scriptengine

Example: Basic configuration

You configure the filter transformation in the Debezium connector’s Kafka Connect configuration. In the configuration, you specify the events that you are interested in by defining filter conditions that are based on business rules. As the filter SMT processes the event stream, it evaluates each event against the configured filter conditions. Only events that meet the criteria of the filter conditions are passed to the broker.

To configure a Debezium connector to filter change event records, configure the Filter SMT in the Kafka Connect configuration for the Debezium connector. Configuration of the filter SMT requires you to specify a regular expression that defines the filtering criteria.

For example, you might add the following configuration in your connector configuration.

  1. ...
  2. transforms=filter
  3. transforms.filter.type=io.debezium.transforms.Filter
  4. transforms.filter.language=jsr223.groovy
  5. transforms.filter.condition=value.op == 'u' && value.before.id == 2
  6. ...

The preceding example specifies the use of the Groovy expression language. The regular expression value.op == 'u' && value.before.id == 2 removes all messages, except those that represent update (u) records with id values that are equal to 2.

Customizing the configuration

The preceding example shows a simple SMT configuration that is designed to process only DML events, which contain an op field. Other types of messages that a connector might emit (heartbeat messages, tombstone messages, or metadata messages about schema changes and transactions) do not contain this field. To avoid processing failures, you can define an SMT predicate statement that selectively applies the transformation to specific events only.

Variables for use in filter expressions

Debezium binds certain variables into the evaluation context for the filter SMT. When you create expressions to specify filter conditions, you can use the variables that Debezium binds into the evaluation context. By binding variables, Debezium enables the SMT to look up and interpret their values as it evaluates the conditions in an expression.

The following table lists the variables that Debezium binds into the evaluation context for the filter SMT:

Table 1. Filter expression variables
NameDescriptionType

key

A key of the message.

org.apache.kafka.connect​.data​.Struct

value

A value of the message.

org.apache.kafka.connect.data​.Struct

keySchema

Schema of the message key.

org.apache.kafka.connect​.data​.Schema

valueSchema

Schema of the message value.

org.apache.kafka.connect​.data​.Schema

topic

Name of the target topic.

String

headers

A Java map of message headers. The key field is the header name. The headers variable exposes the following properties:

  • value (of type Object)

  • schema (of type org.apache.kafka​.connect​.data​.Schema)

java.util.Map​<String, ​io.debezium.transforms​.scripting​.RecordHeader>

An expression can invoke arbitrary methods on its variables. Expressions should resolve to a Boolean value that determines how the SMT dispositions the message. When the filter condition in an expression evaluates to true, the message is retained. When the filter condition evaluates to false, the message is removed.

Expressions should not result in any side-effects. That is, they should not modify any variables that they pass.

Options for applying the transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages. You can use one of the following methods to configure the connector to apply the SMT selectively:

Language specifics

The way that you express filtering conditions depends on the scripting language that you use.

For example, as shown in the basic configuration example, when you use Groovy as the expression language, the following expression removes all messages, except for update records that have id values set to 2:

  1. value.op == 'u' && value.before.id == 2

Other languages use different methods to express the same condition.

The Debezium MongoDB connector emits the after and patch fields as serialized JSON documents rather than as structures.
To use the filter SMT with the MongoDB connector, you must first unwind the array fields in the JSON into separate documents.
You can do this by applying the MongoDB ExtractNewDocumentState SMT.

You could also take the approach of using a JSON parser within an expression to generate separate output documents for each array item.
For example, if you use Groovy as the expression language, add the groovy-json artifact to the classpath, and then add an expression such as (new groovy.json.JsonSlurper()).parseText(value.after).last_name == ‘Kretchmar’.

Javascript

If you use JavaScript as the expression language, you can call the Struct#get() method to specify the filtering condition, as in the following example:

  1. value.get('op') == 'u' && value.get('before').get('id') == 2

Javascript with Graal.js

If you use JavaScript with Graal.js to define filtering conditions, you use an approach that is similar to the one that you use with Groovy. For example:

  1. value.op == 'u' && value.before.id == 2

Configuration options

The following table lists the configuration options that you can use with the filter SMT.

Table 2. filter SMT configuration options

Property

Default

Description

An optional regular expression that evaluates the name of the destination topic for an event to determine whether to apply filtering logic. If the name of the destination topic matches the value in topic.regex, the transformation applies the filter logic before it passes the event to the topic. If the name of the topic does not match the value in topic.regex, the SMT passes the event to the topic unmodified.

The language in which the expression is written. Must begin with jsr223., for example, jsr223.groovy, or jsr223.graal.js. Debezium supports bootstrapping through the JSR 223 API (“Scripting for the Java ™ Platform”) only.

The expression to be evaluated for every message. Must evaluate to a Boolean value where a result of true keeps the message, and a result of false removes it.

keep

Specifies how the transformation handles null (tombstone) messages. You can specify one of the following options:

    keep

    (Default) Pass the messages through.

    drop

    Remove the messages completely.

    evaluate

    Apply the filter condition to the messages.