Debezium notifications

Overview

Debezium notifications provide a mechanism to obtain status information about the connector. Notifications can be sent to the following channels:

SinkNotificationChannel

Sends notifications through the Connect API to a configured topic.

LogNotificationChannel

Notifications are appended to the log.

JmxNotificationChannel

Notifications are exposed as an attribute in a JMX bean.

Custom

Notifications are sent to a custom channel that you implement.

Debezium notification format

Notification messages contain the following information:

PropertyDescription

id

A unique identifier that is assigned to the notification. For incremental snapshot notifications, the id is the same sent with the execute-snapshot signal.

aggregate_type

The data type of the aggregate root to which a notification is related. In domain-driven design, exported events should always refer to an aggregate.

type

Provides status information about the event specified in the aggregate_type field.

additional_data

A Map<String,String> with detailed information about the notification. For an example, see Debezium notifications about the progress of incremental snapshots.

timestamp

The time when the notification has been created. Epoch unix timestamp in milliseconds

Available notifications

Debezium notifications deliver information about the progress of initial snapshots or incremental snapshots.

Debezium notifications about the status of an initial snapshot

The following example shows a typical notification that provides the status of an initial snapshot:

  1. {
  2. "id": "5563ae14-49f8-4579-9641-c1bbc2d76f99",
  3. "aggregate_type": "Initial Snapshot",
  4. "type": "COMPLETED", (1)
  5. "additional_data" : {
  6. "connector_name": "myConnector"
  7. },
  8. "timestamp": "1695817046353"
  9. }
1The type field can contain one of the following values:
  • STARTED

  • IN_PROGRESS

  • TABLE_SCAN_COMPLETED

  • COMPLETED

  • ABORTED

  • SKIPPED

Debezium notifications about the progress of incremental snapshots

The following table shows examples of the different payloads that might be present in notifications that report the status of incremental snapshots:

StatusPayload

Start

  1. {
  2. id”:”ff81ba59-15ea-42ae-b5d0-4d74f1f4038f”,
  3. aggregate_type”:”Incremental Snapshot”,
  4. type”:”STARTED”,
  5. additional_data”:{
  6. connector_name”:”my-connector”,
  7. data_collections”:”table1, table2
  8. },
  9. timestamp”: 1695817046353
  10. }

Paused

  1. {
  2. id”:”068d07a5-d16b-4c4a-b95f-8ad061a69d51”,
  3. aggregate_type”:”Incremental Snapshot”,
  4. type”:”PAUSED”,
  5. additional_data”:{
  6. connector_name”:”my-connector”,
  7. data_collections”:”table1, table2
  8. },
  9. timestamp”: 1695817046353
  10. }

Resumed

  1. {
  2. id”:”a9468204-769d-430f-96d2-b0933d4839f3”,
  3. aggregate_type”:”Incremental Snapshot”,
  4. type”:”RESUMED”,
  5. additional_data”:{
  6. connector_name”:”my-connector”,
  7. data_collections”:”table1, table2
  8. },
  9. timestamp”: 1695817046353
  10. }

Stopped

  1. {
  2. id”:”83fb3d6c-190b-4e40-96eb-f8f427bf482c”,
  3. aggregate_type”:”Incremental Snapshot”,
  4. type”:”ABORTED”,
  5. additional_data”:{
  6. connector_name”:”my-connector
  7. },
  8. timestamp”: 1695817046353
  9. }

Processing chunk

  1. {
  2. id”:”d02047d6-377f-4a21-a4e9-cb6e817cf744”,
  3. aggregate_type”:”Incremental Snapshot”,
  4. type”:”IN_PROGRESS”,
  5. additional_data”:{
  6. connector_name”:”my-connector”,
  7. data_collections”:”table1, table2”,
  8. current_collection_in_progress”:”table1”,
  9. maximum_key”:”100”,
  10. last_processed_key”:”50
  11. },
  12. timestamp”: 1695817046353
  13. }

Snapshot completed for a table

  1. {
  2. id”:”6d82a3ec-ba86-4b36-9168-7423b0dd5c1d”,
  3. aggregate_type”:”Incremental Snapshot”,
  4. type”:”TABLE_SCAN_COMPLETED”,
  5. additional_data”:{
  6. connector_name”:”my-connector”,
  7. data_collection”:”table1, table2”,
  8. scanned_collection”:”table1”,
  9. total_rows_scanned”:”100”,
  10. status”:”SUCCEEDED (1)
  11. },
  12. timestamp”: 1695817046353
  13. }
1The possible values are:
  • EMPTY - table is empty

  • NO_PRIMARY_KEY - table has no primary key necessary for snapshot

  • SKIPPED - snapshot for this kind of table is not supported, check logs for details

  • SQL_EXCEPTION - SQL exception caught while processing a snapshot

  • SUCCEEDED - snapshot completed successfully

  • UNKNOWN_SCHEMA - schema not found for table, check logs for the list of known tables

Completed

  1. {
  2. id”:”6d82a3ec-ba86-4b36-9168-7423b0dd5c1d”,
  3. aggregate_type”:”Incremental Snapshot”,
  4. type”:”COMPLETED”,
  5. additional_data”:{
  6. connector_name”:”my-connector
  7. },
  8. timestamp”: 1695817046353
  9. }

Enabling Debezium notifications

To enable Debezium to emit notifications, specify a list of notification channels by setting the notification.enabled.channels configuration property. By default, the following notification channels are available:

  • sink

  • log

  • jmx

To use the sink notification channel, you must also set the notification.sink.topic.name configuration property to the name of the topic where you want Debezium to send notifications.

Access to Debezium JMX notifications

To enable Debezium to report events that are exposed through JMX beans, complete the following configuration steps:

  1. Enable the JMX MBean Server to expose the notification bean.

  2. Add jmx to the notification.enabled.channels property in the connector configuration.

  3. Connect your preferred JMX client to the MBean Server.

Notifications are exposed through the Notifications attribute of a bean with the name debezium._<connector-type>_.management.notifications._<server>_.

The following image shows a notification that reports the start of an incremental snapshot:

Fields in the JMX `Notifications` attribute

To discard a notification, call the reset operation on the bean.

The notifications are also exposed as a JMX notification with type debezium.notification. To enable an application to listen for the JMX notifications that an MBean emits, subscribe the application to the notifications.

Custom notification channels

The notification mechanism is designed to be extensible. You can implement channels as needed to deliver notifications in a manner that works best in your environment. Adding a notification channel involves several steps:

  1. Create a Java project for the channel to implement the channel, and add Debezium Core as a dependency.

  2. Deploy the notification channel.

  3. Enable connectors to use the custom notification channel by modifying the connector configuration.

Configuring custom notification channels

Custom notification channels are Java classes that implement the io.debezium.pipeline.notification.channels.NotificationChannel service provider interface (SPI). For example:

  1. public interface NotificationChannel {
  2. String name(); (1)
  3. void init(CommonConnectorConfig config); (2)
  4. void send(Notification notification); (3)
  5. void close(); (4)
  6. }
1The name of the channel. To enable Debezium to use the channel, specify this name in the connector’s notification.enabled.channels property.
2Initializes specific configuration, variables, or connections that the channel requires.
3Sends the notification on the channel. Debezium calls this method to report its status.
4Closes all allocated resources. Debezium calls this method when the connector is stopped.

Debezium core module dependencies

A custom notification channel Java project has compile dependencies on the Debezium core module. You must include these compile dependencies in your project’s pom.xml file, as shown in the following example:

  1. <dependency>
  2. <groupId>io.debezium</groupId>
  3. <artifactId>debezium-core</artifactId>
  4. <version>${version.debezium}</version> (1)
  5. </dependency>
1${version.debezium} represents the version of the Debezium connector.

Declare your implementation in the META-INF/services/io.debezium.pipeline.notification.channels.NotificationChannel file.

Deploying a custom notification channel

Prerequisites

  • You have a custom notification channel Java program.

Procedure

  • To use a notification channel with a Debezium connector, export the Java project to a JAR file, and copy the file to the directory that contains the JAR file for each Debezium connector that you want to use it with.

    For example, in a typical deployment, the Debezium connector files are stored in subdirectories of a Kafka Connect directory (/kafka/connect), with each connector JAR in its own subdirectory (/kafka/connect/debezium-connector-db2, /kafka/connect/debezium-connector-mysql, and so forth). To use a signaling channel with a connector, add the converter JAR file to the connector’s subdirectory.

To use a custom notification channel with multiple connectors, you must place a copy of the notification channel JAR file in each connector subdirectory.

Configuring connectors to use a custom notification channel

In the connector configuration, add the name of the custom notification channel to the notification.enabled.channels property.