Debezium notifications

Overview

Debezium notifications provide a mechanism to obtain status information about the connector. Notifications can be sent to the following channels:

SinkNotificationChannel

Sends notifications through the Connect API to a configured topic.

LogNotificationChannel

Notifications are appended to the log.

JmxNotificationChannel

Notifications are exposed as an attribute in a JMX bean.

Custom

Notifications are sent to a custom channel that you implement.

Debezium notification format

Notification messages contain the following information:

PropertyDescription

id

A unique identifier that is assigned to the notification. For incremental snapshot notifications, the id is the same sent with the execute-snapshot signal.

aggregate_type

The data type of the aggregate root to which a notification is related. In domain-driven design, exported events should always refer to an aggregate.

type

Provides status information about the event specified in the aggregate_type field.

additional_data

A Map<String,String> with detailed information about the notification. For an example, see Debezium notifications about the progress of incremental snapshots.

timestamp

The time when the notification was created. The value represents the number of milliseconds since the UNIX epoch.

Available notifications

Debezium notifications deliver information about the progress of initial snapshots or incremental snapshots.

Debezium notifications about the status of an initial snapshot

The following example shows a typical notification that provides the status of an initial snapshot:

  1. {
  2. "id": "5563ae14-49f8-4579-9641-c1bbc2d76f99",
  3. "aggregate_type": "Initial Snapshot",
  4. "type": "COMPLETED", (1)
  5. "additional_data" : {
  6. "connector_name": "myConnector"
  7. },
  8. "timestamp": "1695817046353"
  9. }
ItemDescription

1

The type field can contain one of the following values:

  • COMPLETED

  • ABORTED

  • SKIPPED

The following table shows examples of the different payloads that might be present in notifications that report the status of initial snapshots:

StatusPayload

STARTED

  1. {
  2. id”:”ff81ba59-15ea-42ae-b5d0-4d74f1f4038f”,
  3. aggregate_type”:”Initial Snapshot”,
  4. type”:”STARTED”,
  5. additional_data”:{
  6. connector_name”:”my-connector
  7. },
  8. timestamp”: 1695817046353
  9. }

IN_PROGRESS

  1. {
  2. id”:”6d82a3ec-ba86-4b36-9168-7423b0dd5c1d”,
  3. aggregate_type”:”Initial Snapshot”,
  4. type”:”IN_PROGRESS”,
  5. additional_data”:{
  6. connector_name”:”my-connector”,
  7. data_collections”:”table1, table2”,
  8. current_collection_in_progress”:”table1
  9. },
  10. timestamp”: 1695817046353
  11. }

Field data_collection are currently not supported for MongoDB connector

TABLE_SCAN_COMPLETED

  1. {
  2. id”:”6d82a3ec-ba86-4b36-9168-7423b0dd5c1d”,
  3. aggregate_type”:”Initial Snapshot”,
  4. type”:”TABLE_SCAN_COMPLETED”,
  5. additional_data”:{
  6. connector_name”:”my-connector”,
  7. data_collection”:”table1, table2”,
  8. scanned_collection”:”table1”,
  9. total_rows_scanned”:”100”,
  10. status”:”SUCCEEDED
  11. },
  12. timestamp”: 1695817046353
  13. }

In the preceding example, the additional_data.status field can contain one of the following values:

    SQL_EXCEPTION

    A SQL exception occurred while performing the snapshot.

    SUCCEEDED

    The snapshot completed successfully.

Fields total_rows_scanned and data_collection are currently not supported for MongoDB connector

COMPLETED

  1. {
  2. id”:”ff81ba59-15ea-42ae-b5d0-4d74f1f4038f”,
  3. aggregate_type”:”Initial Snapshot”,
  4. type”:”COMPLETED”,
  5. additional_data”:{
  6. connector_name”:”my-connector
  7. },
  8. timestamp”: 1695817046353
  9. }

ABORTED

  1. {
  2. id”:”ff81ba59-15ea-42ae-b5d0-4d74f1f4038f”,
  3. aggregate_type”:”Initial Snapshot”,
  4. type”:”ABORTED”,
  5. additional_data”:{
  6. connector_name”:”my-connector
  7. },
  8. timestamp”: 1695817046353
  9. }

SKIPPED

  1. {
  2. id”:”ff81ba59-15ea-42ae-b5d0-4d74f1f4038f”,
  3. aggregate_type”:”Initial Snapshot”,
  4. type”:”SKIPPED”,
  5. additional_data”:{
  6. connector_name”:”my-connector
  7. },
  8. timestamp”: 1695817046353
  9. }

Debezium notifications about the progress of incremental snapshots

The following table shows examples of the different payloads that might be present in notifications that report the status of incremental snapshots:

StatusPayload

Start

  1. {
  2. id”:”ff81ba59-15ea-42ae-b5d0-4d74f1f4038f”,
  3. aggregate_type”:”Incremental Snapshot”,
  4. type”:”STARTED”,
  5. additional_data”:{
  6. connector_name”:”my-connector”,
  7. data_collections”:”table1, table2
  8. },
  9. timestamp”: 1695817046353
  10. }

Paused

  1. {
  2. id”:”068d07a5-d16b-4c4a-b95f-8ad061a69d51”,
  3. aggregate_type”:”Incremental Snapshot”,
  4. type”:”PAUSED”,
  5. additional_data”:{
  6. connector_name”:”my-connector”,
  7. data_collections”:”table1, table2
  8. },
  9. timestamp”: 1695817046353
  10. }

Resumed

  1. {
  2. id”:”a9468204-769d-430f-96d2-b0933d4839f3”,
  3. aggregate_type”:”Incremental Snapshot”,
  4. type”:”RESUMED”,
  5. additional_data”:{
  6. connector_name”:”my-connector”,
  7. data_collections”:”table1, table2
  8. },
  9. timestamp”: 1695817046353
  10. }

Stopped

  1. {
  2. id”:”83fb3d6c-190b-4e40-96eb-f8f427bf482c”,
  3. aggregate_type”:”Incremental Snapshot”,
  4. type”:”ABORTED”,
  5. additional_data”:{
  6. connector_name”:”my-connector
  7. },
  8. timestamp”: 1695817046353
  9. }

Processing chunk

  1. {
  2. id”:”d02047d6-377f-4a21-a4e9-cb6e817cf744”,
  3. aggregate_type”:”Incremental Snapshot”,
  4. type”:”IN_PROGRESS”,
  5. additional_data”:{
  6. connector_name”:”my-connector”,
  7. data_collections”:”table1, table2”,
  8. current_collection_in_progress”:”table1”,
  9. maximum_key”:”100”,
  10. last_processed_key”:”50
  11. },
  12. timestamp”: 1695817046353
  13. }

Snapshot completed for a table

  1. {
  2. id”:”6d82a3ec-ba86-4b36-9168-7423b0dd5c1d”,
  3. aggregate_type”:”Incremental Snapshot”,
  4. type”:”TABLE_SCAN_COMPLETED”,
  5. additional_data”:{
  6. connector_name”:”my-connector”,
  7. data_collection”:”table1, table2”,
  8. scanned_collection”:”table1”,
  9. total_rows_scanned”:”100”,
  10. status”:”SUCCEEDED
  11. },
  12. timestamp”: 1695817046353
  13. }

In the preceding example, the additional_data.status field can contain one of the following values:

    EMPTY

    The table contains no values.

    NO_PRIMARY_KEY

    Cannot complete snapshot; table has no primary key.

    SKIPPED

    Cannot complete a snapshots for this type of table. Refer to the logs for details.

    SQL_EXCEPTION

    A SQL exception occurred while performing the snapshot.

    SUCCEEDED

    The snapshot completed successfully.

    UNKNOWN_SCHEMA

    Could not find a schema for the table. Check the logs for the list of known tables.

Completed

  1. {
  2. id”:”6d82a3ec-ba86-4b36-9168-7423b0dd5c1d”,
  3. aggregate_type”:”Incremental Snapshot”,
  4. type”:”COMPLETED”,
  5. additional_data”:{
  6. connector_name”:”my-connector
  7. },
  8. timestamp”: 1695817046353
  9. }

Enabling Debezium notifications

To enable Debezium to emit notifications, specify a list of notification channels by setting the notification.enabled.channels configuration property. By default, the following notification channels are available:

  • sink

  • log

  • jmx

To use the sink notification channel, you must also set the notification.sink.topic.name configuration property to the name of the topic where you want Debezium to send notifications.

Access to Debezium JMX notifications

To enable Debezium to report events that are exposed through JMX beans, complete the following configuration steps:

  1. Enable the JMX MBean Server to expose the notification bean.

  2. Add jmx to the notification.enabled.channels property in the connector configuration.

  3. Connect your preferred JMX client to the MBean Server.

Notifications are exposed through the Notifications attribute of a bean with the name debezium._<connector-type>_.management.notifications._<server>_.

The following image shows a notification that reports the start of an incremental snapshot:

Fields in the JMX `Notifications` attribute

To discard a notification, call the reset operation on the bean.

The notifications are also exposed as a JMX notification with type debezium.notification. To enable an application to listen for the JMX notifications that an MBean emits, subscribe the application to the notifications.

Custom notification channels

The notification mechanism is designed to be extensible. You can implement channels as needed to deliver notifications in a manner that works best in your environment. Adding a notification channel involves several steps:

  1. Create a Java project for the channel to implement the channel, and add Debezium Core as a dependency.

  2. Deploy the notification channel.

  3. Enable connectors to use the custom notification channel by modifying the connector configuration.

Configuring custom notification channels

Custom notification channels are Java classes that implement the io.debezium.pipeline.notification.channels.NotificationChannel service provider interface (SPI). For example:

  1. public interface NotificationChannel {
  2. String name(); (1)
  3. void init(CommonConnectorConfig config); (2)
  4. void send(Notification notification); (3)
  5. void close(); (4)
  6. }
ItemDescription

1

The name of the channel. To enable Debezium to use the channel, specify this name in the connector’s notification.enabled.channels property.

2

Initializes specific configuration, variables, or connections that the channel requires.

3

ends the notification on the channel. Debezium calls this method to report its status.

4

Closes all allocated resources. Debezium calls this method when the connector is stopped.

Debezium core module dependencies

A custom notification channel Java project has compile dependencies on the Debezium core module. You must include these compile dependencies in your project’s pom.xml file, as shown in the following example:

  1. <dependency>
  2. <groupId>io.debezium</groupId>
  3. <artifactId>debezium-core</artifactId>
  4. <version>${version.debezium}</version> (1)
  5. </dependency>
ItemDescription

1

${version.debezium} represents the version of the Debezium connector.

Declare your implementation in the META-INF/services/io.debezium.pipeline.notification.channels.NotificationChannel file.

Deploying a custom notification channel

Prerequisites

  • You have a custom notification channel Java program.

Procedure

  • To use a notification channel with a Debezium connector, export the Java project to a JAR file, and copy the file to the directory that contains the JAR file for each Debezium connector that you want to use it with.

    For example, in a typical deployment, the Debezium connector files are stored in subdirectories of a Kafka Connect directory (/kafka/connect), with each connector JAR in its own subdirectory (/kafka/connect/debezium-connector-db2, /kafka/connect/debezium-connector-mysql, and so forth). To use a signaling channel with a connector, add the converter JAR file to the connector’s subdirectory.

To use a custom notification channel with multiple connectors, you must place a copy of the notification channel JAR file in each connector subdirectory.

Configuring connectors to use a custom notification channel

In the connector configuration, add the name of the custom notification channel to the notification.enabled.channels property.