Tutorial

Introduction to Debezium

Debezium is a distributed platform that converts information from your existing databases into event streams, enabling applications to detect, and immediately respond to row-level changes in the databases.

Debezium is built on top of Apache Kafka and provides a set of Kafka Connect compatible connectors. Each of the connectors works with a specific database management system (DBMS). Connectors record the history of data changes in the DBMS by detecting changes as they occur, and streaming a record of each change event to a Kafka topic. Consuming applications can then read the resulting event records from the Kafka topic.

By taking advantage of Kafka’s reliable streaming platform, Debezium makes it possible for applications to consume changes that occur in a database correctly and completely. Even if your application stops unexpectedly, or loses its connection, it does not miss events that occur during the outage. After the application restarts, it resumes reading from the topic from the point where it left off.

The tutorial that follows shows you how to deploy and use the Debezium MySQL connector with a simple configuration. For more information about deploying and using Debezium connectors, see the connector documentation.

Additional resources

Starting the services

Using Debezium requires three separate services: ZooKeeper, Kafka, and the Debezium connector service. In this tutorial, you will set up a single instance of each service using Docker and the Debezium container images.

To start the services needed for this tutorial, you must:

Considerations for running Debezium with Docker

This tutorial uses Docker and the Debezium container images to run the ZooKeeper, Kafka, Debezium, and MySQL services. Running each service in a separate container simplifies the setup so that you can see Debezium in action.

In a production environment, you would run multiple instances of each service to provide performance, reliability, replication, and fault tolerance. Typically, you would either deploy these services on a platform like OpenShift or Kubernetes that manages multiple Docker containers running on multiple hosts and machines, or you would install on dedicated hardware.

You should be aware of the following considerations for running Debezium with Docker:

  • The containers for ZooKeeper and Kafka are ephemeral.

    ZooKeeper and Kafka would typically store their data locally inside the containers, which would require you to mount directories on the host machine as volumes. That way, when the containers are stopped, the persisted data remains. However, this tutorial skips this setup - when a container is stopped, all persisted data is lost. This way, cleanup is simple when you complete the tutorial.

    For more information about storing persistent data, see the documentation for the container images.

  • This tutorial requires you to run each service in a different container.

    To avoid confusion, you will run each container in the foreground in a separate terminal. This way, all of the output of a container will be displayed in the terminal used to run it.

    Docker also allows you to run a container in detached mode (with the -d option), where the container is started and the docker command returns immediately. However, detached mode containers do not display their output in the terminal. To see the output, you would need to use the docker logs —follow —name <container-name> command. For more information, see the Docker documentation.

Starting Zookeeper

ZooKeeper is the first service you must start.

Procedure

  1. Open a terminal and use it to start ZooKeeper in a container.

    This command runs a new container using version 2.1 of the quay.io/debezium/zookeeper image:

    1. $ docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:2.1

    -it

    The container is interactive, which means the terminal’s standard input and output are attached to the container.

    --rm

    The container will be removed when it is stopped.

    --name zookeeper

    The name of the container.

    -p 2181:2181 -p 2888:2888 -p 3888:3888

    Maps three of the container’s ports to the same ports on the Docker host. This enables other containers (and applications outside of the container) to communicate with ZooKeeper.

If you use Podman, run the following command:s

  1. $ sudo podman pod create name=dbz publish 9092,3306,8083
  2. $ sudo podman run -it rm name zookeeper pod dbz quay.io/debezium/zookeeper:2.1
  1. Verify that ZooKeeper started and is listening on port 2181.

    You should see output similar to the following:

    1. Starting up in standalone mode
    2. ZooKeeper JMX enabled by default
    3. Using config: /zookeeper/conf/zoo.cfg
    4. 2017-09-21 07:15:55,417 - INFO [main:QuorumPeerConfig@134] - Reading configuration from: /zookeeper/conf/zoo.cfg
    5. 2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
    6. 2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1
    7. ...
    8. port 0.0.0.0/0.0.0.0:2181 (1)
    1This line indicates that ZooKeeper is ready and listening on port 2181. The terminal will continue to show additional output as ZooKeeper generates it.

Starting Kafka

After starting ZooKeeper, you can start Kafka in a new container.

Debezium 2.1.4.Final has been tested against multiple versions of Kafka Connect. Please refer to the Debezium Test Matrix to determine compatibility between Debezium and Kafka Connect.

Procedure

  1. Open a new terminal and use it to start Kafka in a container.

    This command runs a new container using version 2.1 of the quay.io/debezium/kafka image:

    1. $ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:2.1

    -it

    The container is interactive, which means the terminal’s standard input and output are attached to the container.

    --rm

    The container will be removed when it is stopped.

    --name kafka

    The name of the container.

    -p 9092:9092

    Maps port 9092 in the container to the same port on the Docker host so that applications outside of the container can communicate with Kafka.

    --link zookeeper:zookeeper

    Tells the container that it can find ZooKeeper in the zookeeper container, which is running on the same Docker host.

    If you use Podman, run the following command:

    1. $ sudo podman run -it rm name kafka pod dbz quay.io/debezium/kafka:2.1

    In this tutorial, you will always connect to Kafka from within a Docker container. Any of these containers can communicate with the kafka container by linking to it. If you needed to connect to Kafka from outside of a Docker container, you would have to set the -e option to advertise the Kafka address through the Docker host (-e ADVERTISED_HOST_NAME= followed by either the IP address or resolvable host name of the Docker host).

  2. Verify that Kafka started.

    You should see output similar to the following:

    1. ...
    2. 2017-09-21 07:16:59,085 - INFO [main-EventThread:ZkClient@713] - zookeeper state changed (SyncConnected)
    3. 2017-09-21 07:16:59,218 - INFO [main:Logging$class@70] - Cluster ID = LPtcBFxzRvOzDSXhc6AamA
    4. ...
    5. 2017-09-21 07:16:59,649 - INFO [main:Logging$class@70] - [Kafka Server 1], started (1)
    1The Kafka broker has successfully started and is ready for client connections. The terminal will continue to show additional output as Kafka generates it.

Starting a MySQL database

At this point, you have started ZooKeeper and Kafka, but you still need a database server from which Debezium can capture changes. In this procedure, you will start a MySQL server with an example database.

Procedure

  1. Open a new terminal, and use it to start a new container that runs a MySQL database server preconfigured with an inventory database.

    This command runs a new container using version 2.1 of the quay.io/debezium/example-mysql image, which is based on the mysql:8.0 image. It also defines and populates a sample inventory database:

    1. $ docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:2.1

    -it

    The container is interactive, which means the terminal’s standard input and output are attached to the container.

    --rm

    The container will be removed when it is stopped.

    --name mysql

    The name of the container.

    -p 3306:3306

    Maps port 3306 (the default MySQL port) in the container to the same port on the Docker host so that applications outside of the container can connect to the database server.

    -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw

    Creates a user and password that has the minimum privileges required by the Debezium MySQL connector.

If you use Podman, run the following command:

  1. $ sudo podman run -it rm name mysql pod dbz -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:2.1
  1. Verify that the MySQL server starts.

    The MySQL server starts and stops a few times as the configuration is modified. You should see output similar to the following:

    1. ...
    2. [System] [MY-010931] [Server] /usr/sbin/mysqld: ready for connections. Version: '8.0.27' socket: '/var/run/mysqld/mysqld.sock' port: 3306 MySQL Community Server - GPL.
    3. [System] [MY-011323] [Server] X Plugin ready for connections. Bind-address: '::' port: 33060, socket: /var/run/mysqld/mysqlx.sock

Starting a MySQL command line client

After starting MySQL, you start a MySQL command line client so that you access the sample inventory database.

Procedure

  1. Open a new terminal, and use it to start the MySQL command line client in a container.

    This command runs a new container using the mysql:8.0 image, and defines a shell command to run the MySQL command line client with the correct options:

    1. $ docker run -it --rm --name mysqlterm --link mysql --rm mysql:8.0 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

    -it

    The container is interactive, which means the terminal’s standard input and output are attached to the container.

    --rm

    The container will be removed when it is stopped.

    --name mysqlterm

    The name of the container.

    --link mysql

    Links the container to the mysql container.

If you use Podman, run the following command:

  1. $ sudo podman run -it rm name mysqlterm pod dbz rm mysql:5.7 sh -c exec mysql -h 0.0.0.0 -uroot -pdebezium
  1. Verify that the MySQL command line client started.

    You should see output similar to the following:

    1. mysql: [Warning] Using a password on the command line interface can be insecure.
    2. Welcome to the MySQL monitor. Commands end with ; or \g.
    3. Your MySQL connection id is 9
    4. Server version: 8.0.27 MySQL Community Server - GPL
    5. Copyright (c) 2000, 2021, Oracle and/or its affiliates.
    6. Oracle is a registered trademark of Oracle Corporation and/or its
    7. affiliates. Other names may be trademarks of their respective
    8. owners.
    9. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
    10. mysql>
  2. At the mysql> command prompt, switch to the inventory database:

    1. mysql> use inventory;
  3. List the tables in the database:

    1. mysql> show tables;
    2. +---------------------+
    3. | Tables_in_inventory |
    4. +---------------------+
    5. | addresses |
    6. | customers |
    7. | geom |
    8. | orders |
    9. | products |
    10. | products_on_hand |
    11. +---------------------+
    12. 6 rows in set (0.00 sec)
  4. Use the MySQL command line client to explore the database and view the pre-loaded data in the database.

    For example:

    1. mysql> SELECT * FROM customers;
    2. +------+------------+-----------+-----------------------+
    3. | id | first_name | last_name | email |
    4. +------+------------+-----------+-----------------------+
    5. | 1001 | Sally | Thomas | sally.thomas@acme.com |
    6. | 1002 | George | Bailey | gbailey@foobar.com |
    7. | 1003 | Edward | Walker | ed@walker.com |
    8. | 1004 | Anne | Kretchmar | annek@noanswer.org |
    9. +------+------------+-----------+-----------------------+
    10. 4 rows in set (0.00 sec)

Starting Kafka Connect

After starting MySQL and connecting to the inventory database with the MySQL command line client, you start the Kafka Connect service. This service exposes a REST API to manage the Debezium MySQL connector.

Procedure

  1. Open a new terminal, and use it to start the Kafka Connect service in a container.

    This command runs a new container using the 2.1 version of the quay.io/debezium/connect image:

    1. $ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:2.1

    -it

    The container is interactive, which means the terminal’s standard input and output are attached to the container.

    --rm

    The container will be removed when it is stopped.

    --name connect

    The name of the container.

    -p 8083:8083

    Maps port 8083 in the container to the same port on the Docker host. This enables applications outside of the container to use Kafka Connect’s REST API to set up and manage new container instances.

    -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses

    Sets environment variables required by the Debezium image.

    --link kafka:kafka --link mysql:mysql

    Links the container to the containers that are running Kafka and the MySQL server.

If you use Podman, run the following command:

  1. $ sudo podman run -it rm name connect pod dbz -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses quay.io/debezium/connect:2.1

If you provide a —hostname command option then Kafka Connect REST API will not listen on the localhost interface. This can cause issues when the REST port is being exposed.

If this is a problem then set environment variable REST_HOST_NAME=0.0.0.0 which will ensure that REST API will be accessible from all interfaces.

  1. Verify that Kafka Connect started and is ready to accept connections.

    You should see output similar to the following:

    1. ...
    2. 2020-02-06 15:48:33,939 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser]
    3. ...
    4. 2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset -1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
    5. 2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
  2. Use the Kafka Connect REST API to check the status of the Kafka Connect service.

    Kafka Connect exposes a REST API to manage Debezium connectors. To communicate with the Kafka Connect service, you can use the curl command to send API requests to port 8083 of the Docker host (which you mapped to port 8083 in the connect container when you started Kafka Connect).

    These commands use localhost. If you are using a non-native Docker platform (such as Docker Toolbox), replace localhost with the IP address of your Docker host.

    1. Open a new terminal and check the status of the Kafka Connect service:

      1. $ curl -H "Accept:application/json" localhost:8083/
      2. {"version":"3.3.1","commit":"cb8625948210849f"} (1)
      1The response shows that Kafka Connect version 3.3.1 is running.
    2. Check the list of connectors registered with Kafka Connect:

      1. $ curl -H "Accept:application/json" localhost:8083/connectors/
      2. [] (1)
      1No connectors are currently registered with Kafka Connect.

Deploying the MySQL connector

After starting the Debezium and MySQL services, you are ready to deploy the Debezium MySQL connector so that it can start monitoring the sample MySQL database (inventory).

At this point, you are running the Debezium services, a MySQL database server with a sample inventory database, and the MySQL command line client that is connected to the database. To deploy the MySQL connector, you must:

Registering a connector to monitor the inventory database

By registering the Debezium MySQL connector, the connector will start monitoring the MySQL database server’s binlog. The binlog records all of the database’s transactions (such as changes to individual rows and changes to the schemas). When a row in the database changes, Debezium generates a change event.

In a production environment, you would typically either use the Kafka tools to manually create the necessary topics, including specifying the number of replicas, or you’d use the Kafka Connect mechanism for customizing the settings of auto-created topics. However, for this tutorial, Kafka is configured to automatically create the topics with just one replica.

Procedure

  1. Review the configuration of the Debezium MySQL connector that you will register.

    Before registering the connector, you should be familiar with its configuration. In the next step, you will register the following connector:

    1. {
    2. "name": "inventory-connector", (1)
    3. "config": { (2)
    4. "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    5. "tasks.max": "1", (3)
    6. "database.hostname": "mysql", (4)
    7. "database.port": "3306",
    8. "database.user": "debezium",
    9. "database.password": "dbz",
    10. "database.server.id": "184054", (5)
    11. "topic.prefix": "dbserver1", (5)
    12. "database.include.list": "inventory", (6)
    13. "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", (7)
    14. "schema.history.internal.kafka.topic": "schema-changes.inventory" (7)
    15. }
    16. }
    1The name of the connector.
    2The connector’s configuration.
    3Only one task should operate at any one time. Because the MySQL connector reads the MySQL server’s binlog, using a single connector task ensures proper order and event handling. The Kafka Connect service uses connectors to start one or more tasks that do the work, and it automatically distributes the running tasks across the cluster of Kafka Connect services. If any of the services stop or crash, those tasks will be redistributed to running services.
    4The database host, which is the name of the Docker container running the MySQL server (mysql). Docker manipulates the network stack within the containers so that each linked container can be resolved with /etc/hosts using the container name for the host name. If MySQL were running on a normal network, you would specify the IP address or resolvable host name for this value.
    5A unique topic prefix. This name will be used as the prefix for all Kafka topics.
    6Only changes in the inventory database will be detected.
    7The connector will store the history of the database schemas in Kafka using this broker (the same broker to which you are sending events) and topic name. Upon restart, the connector will recover the schemas of the database that existed at the point in time in the binlog when the connector should begin reading.

    For more information, see MySQL connector configuration properties.

For security reasons, you shouldn’t put passwords or other secrets in plain text into connector configurations. Instead, any secrets should be externalized via the mechanism defined in KIP-297(“Externalizing Secrets for Connect Configurations”).

  1. Open a new terminal, and use the curl command to register the Debezium MySQL connector.

    This command uses the Kafka Connect service’s API to submit a POST request against the /connectors resource with a JSON document that describes the new connector (called inventory-connector).

    This command uses localhost to connect to the Docker host. If you are using a non-native Docker platform, replace localhost with the IP address of of your Docker host.

    1. $ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "topic.prefix": "dbserver1", "database.include.list": "inventory", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory" } }'

    Windows users may need to escape the double-quotes. For example:

    1. $ curl -i -X POST -H Accept:application/json -H Content-Type:application/json localhost:8083/connectors/ -d ‘{ \”name\”: \”inventory-connector\”, \”config\”: { \”connector.class\”: \”io.debezium.connector.mysql.MySqlConnector\”, \”tasks.max\”: \”1\”, \”database.hostname\”: \”mysql\”, \”database.port\”: \”3306\”, \”database.user\”: \”debezium\”, \”database.password\”: \”dbz\”, \”database.server.id\”: \”184054\”, \”topic.prefix\”: \”dbserver1\”, \”database.include.list\”: \”inventory\”, \”schema.history.internal.kafka.bootstrap.servers\”: \”kafka:9092\”, \”schema.history.internal.kafka.topic\”: \”schemahistory.inventory\” } }’

    Otherwise, you might see an error like the following:

    1. {“error_code”:500,”message”:”Unexpected character (‘n (code 110)): was expecting double-quote to start field name\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 4]”}

If you use Podman, run the following command:

  1. $ curl -i -X POST -H Accept:application/json -H Content-Type:application/json localhost:8083/connectors/ -d ‘{ name”: inventory-connector”, config”: { connector.class”: io.debezium.connector.mysql.MySqlConnector”, tasks.max”: 1”, database.hostname”: 0.0.0.0”, database.port”: 3306”, database.user”: debezium”, database.password”: dbz”, database.server.id”: 184054”, topic.prefix”: dbserver1”, database.include.list”: inventory”, schema.history.internal.kafka.bootstrap.servers”: 0.0.0.0:9092”, schema.history.internal.kafka.topic”: schemahistory.inventory } }’
  1. Verify that inventory-connector is included in the list of connectors:

    1. $ curl -H "Accept:application/json" localhost:8083/connectors/
    2. ["inventory-connector"]
  2. Review the connector’s tasks:

    1. $ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector

    You should see a response similar to the following (formatted for readability):

    1. HTTP/1.1 200 OK
    2. Date: Thu, 06 Feb 2020 22:12:03 GMT
    3. Content-Type: application/json
    4. Content-Length: 531
    5. Server: Jetty(9.4.20.v20190813)
    6. {
    7. "name": "inventory-connector",
    8. ...
    9. "tasks": [
    10. {
    11. "connector": "inventory-connector", (1)
    12. "task": 0
    13. }
    14. ]
    15. }
    1The connector is running a single task (task 0) to do its work. The connector only supports a single task, because MySQL records all of its activities in one sequential binlog. This means the connector only needs one reader to get a consistent, ordered view of all of the events.

Watching the connector start

When you register a connector, it generates a large amount of log output in the Kafka Connect container. By reviewing this output, you can better understand the process that the connector goes through from the time it is created until it begins reading the MySQL server’s binlog.

After registering the inventory-connector connector, you can review the log output in the Kafka Connect container (connect) to track the connector’s status.

The first few lines show the connector (inventory-connector) being created and started:

  1. ...
  2. 2021-11-30 01:38:44,223 INFO || [Worker clientId=connect-1, groupId=1] Tasks [inventory-connector-0] configs updated [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
  3. 2021-11-30 01:38:44,224 INFO || [Worker clientId=connect-1, groupId=1] Handling task config update by restarting tasks [] [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
  4. 2021-11-30 01:38:44,224 INFO || [Worker clientId=connect-1, groupId=1] Rebalance started [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
  5. 2021-11-30 01:38:44,224 INFO || [Worker clientId=connect-1, groupId=1] (Re-)joining group [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
  6. 2021-11-30 01:38:44,227 INFO || [Worker clientId=connect-1, groupId=1] Successfully joined group with generation Generation{generationId=3, memberId='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', protocol='sessioned'} [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
  7. 2021-11-30 01:38:44,230 INFO || [Worker clientId=connect-1, groupId=1] Successfully synced group in generation Generation{generationId=3, memberId='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', protocol='sessioned'} [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
  8. 2021-11-30 01:38:44,231 INFO || [Worker clientId=connect-1, groupId=1] Joined group at generation 3 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', leaderUrl='http://172.17.0.7:8083/', offset=4, connectorIds=[inventory-connector], taskIds=[inventory-connector-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
  9. 2021-11-30 01:38:44,232 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset 4 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
  10. 2021-11-30 01:38:44,232 INFO || [Worker clientId=connect-1, groupId=1] Starting task inventory-connector-0 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
  11. ...

Further down, you should see output like the following from the connector:

  1. ...
  2. 2021-11-30 01:38:44,406 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser]
  3. 2021-11-30 01:38:44,406 INFO || Kafka commitId: 8cb0a5e9d3441962 [org.apache.kafka.common.utils.AppInfoParser]
  4. 2021-11-30 01:38:44,407 INFO || Kafka startTimeMs: 1638236324406 [org.apache.kafka.common.utils.AppInfoParser]
  5. 2021-11-30 01:38:44,437 INFO || Database schema history topic '(name=schemahistory.inventory, numPartitions=1, replicationFactor=1, replicasAssignments=null, configs={cleanup.policy=delete, retention.ms=9223372036854775807, retention.bytes=-1})' created [io.debezium.storage.kafka.history.KafkaSchemaHistory]
  6. 2021-11-30 01:38:44,497 INFO || App info kafka.admin.client for dbserver1-schemahistory unregistered [org.apache.kafka.common.utils.AppInfoParser]
  7. 2021-11-30 01:38:44,499 INFO || Metrics scheduler closed [org.apache.kafka.common.metrics.Metrics]
  8. 2021-11-30 01:38:44,499 INFO || Closing reporter org.apache.kafka.common.metrics.JmxReporter [org.apache.kafka.common.metrics.Metrics]
  9. 2021-11-30 01:38:44,499 INFO || Metrics reporters closed [org.apache.kafka.common.metrics.Metrics]
  10. 2021-11-30 01:38:44,499 INFO || Reconnecting after finishing schema recovery [io.debezium.connector.mysql.MySqlConnectorTask]
  11. 2021-11-30 01:38:44,524 INFO || Requested thread factory for connector MySqlConnector, id = dbserver1 named = change-event-source-coordinator [io.debezium.util.Threads]
  12. 2021-11-30 01:38:44,525 INFO || Creating thread debezium-mysqlconnector-dbserver1-change-event-source-coordinator [io.debezium.util.Threads]
  13. 2021-11-30 01:38:44,526 INFO || WorkerSourceTask{id=inventory-connector-0} Source task finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask]
  14. 2021-11-30 01:38:44,529 INFO MySQL|dbserver1|snapshot Metrics registered [io.debezium.pipeline.ChangeEventSourceCoordinator]
  15. 2021-11-30 01:38:44,529 INFO MySQL|dbserver1|snapshot Context created [io.debezium.pipeline.ChangeEventSourceCoordinator]
  16. 2021-11-30 01:38:44,534 INFO MySQL|dbserver1|snapshot No previous offset has been found [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
  17. 2021-11-30 01:38:44,534 INFO MySQL|dbserver1|snapshot According to the connector configuration both schema and data will be snapshotted [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
  18. 2021-11-30 01:38:44,534 INFO MySQL|dbserver1|snapshot Snapshot step 1 - Preparing [io.debezium.relational.RelationalSnapshotChangeEventSource]
  19. ...

The Debezium log output uses mapped diagnostic contexts (MDC) to provide thread-specific information in the log output, and make it easier to understand what is happening in the multi-threaded Kafka Connect service. This includes the connector type (MySQL in the above log messages), the logical name of the connector (dbserver1 above), and the connector’s activity (task, snapshot and binlog).

In the log output above, the first few lines involve the task activity of the connector, and report some bookkeeping information (in this case, that the connector was started with no prior offset). The next three lines involve the snapshot activity of the connector, and report that a snapshot is being started using the debezium MySQL user along with the MySQL grants associated with that user.

If the connector is not able to connect, or if it does not see any tables or the binlog, check these grants to ensure that all of those listed above are included.

Next, the connector reports the steps that make up the snapshot operation:

  1. ...
  2. 2021-11-30 01:38:44,534 INFO MySQL|dbserver1|snapshot Snapshot step 1 - Preparing [io.debezium.relational.RelationalSnapshotChangeEventSource]
  3. 2021-11-30 01:38:44,535 INFO MySQL|dbserver1|snapshot Snapshot step 2 - Determining captured tables [io.debezium.relational.RelationalSnapshotChangeEventSource]
  4. 2021-11-30 01:38:44,535 INFO MySQL|dbserver1|snapshot Read list of available databases [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
  5. 2021-11-30 01:38:44,537 INFO MySQL|dbserver1|snapshot list of available databases is: [information_schema, inventory, mysql, performance_schema, sys] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
  6. 2021-11-30 01:38:44,537 INFO MySQL|dbserver1|snapshot Read list of available tables in each database [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
  7. 2021-11-30 01:38:44,548 INFO MySQL|dbserver1|snapshot snapshot continuing with database(s): [inventory] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
  8. 2021-11-30 01:38:44,551 INFO MySQL|dbserver1|snapshot Snapshot step 3 - Locking captured tables [inventory.addresses, inventory.customers, inventory.geom, inventory.orders, inventory.products, inventory.products_on_hand] [io.debezium.relational.RelationalSnapshotChangeEventSource]
  9. 2021-11-30 01:38:44,552 INFO MySQL|dbserver1|snapshot Flush and obtain global read lock to prevent writes to database [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
  10. 2021-11-30 01:38:44,557 INFO MySQL|dbserver1|snapshot Snapshot step 4 - Determining snapshot offset [io.debezium.relational.RelationalSnapshotChangeEventSource]
  11. 2021-11-30 01:38:44,560 INFO MySQL|dbserver1|snapshot Read binlog position of MySQL primary server [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
  12. 2021-11-30 01:38:44,562 INFO MySQL|dbserver1|snapshot using binlog 'mysql-bin.000003' at position '156' and gtid '' [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
  13. 2021-11-30 01:38:44,562 INFO MySQL|dbserver1|snapshot Snapshot step 5 - Reading structure of captured tables [io.debezium.relational.RelationalSnapshotChangeEventSource]
  14. 2021-11-30 01:38:44,562 INFO MySQL|dbserver1|snapshot All eligible tables schema should be captured, capturing: [inventory.addresses, inventory.customers, inventory.geom, inventory.orders, inventory.products, inventory.products_on_hand] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
  15. 2021-11-30 01:38:45,058 INFO MySQL|dbserver1|snapshot Reading structure of database 'inventory' [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
  16. 2021-11-30 01:38:45,187 INFO MySQL|dbserver1|snapshot Snapshot step 6 - Persisting schema history [io.debezium.relational.RelationalSnapshotChangeEventSource]
  17. 2021-11-30 01:38:45,273 INFO MySQL|dbserver1|snapshot Releasing global read lock to enable MySQL writes [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
  18. 2021-11-30 01:38:45,274 INFO MySQL|dbserver1|snapshot Writes to MySQL tables prevented for a total of 00:00:00.717 [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
  19. 2021-11-30 01:38:45,274 INFO MySQL|dbserver1|snapshot Snapshot step 7 - Snapshotting data [io.debezium.relational.RelationalSnapshotChangeEventSource]
  20. 2021-11-30 01:38:45,275 INFO MySQL|dbserver1|snapshot Snapshotting contents of 6 tables while still in transaction [io.debezium.relational.RelationalSnapshotChangeEventSource]
  21. 2021-11-30 01:38:45,275 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.addresses' (1 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
  22. 2021-11-30 01:38:45,276 INFO MySQL|dbserver1|snapshot For table 'inventory.addresses' using select statement: 'SELECT `id`, `customer_id`, `street`, `city`, `state`, `zip`, `type` FROM `inventory`.`addresses`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
  23. 2021-11-30 01:38:45,295 INFO MySQL|dbserver1|snapshot Finished exporting 7 records for table 'inventory.addresses'; total duration '00:00:00.02' [io.debezium.relational.RelationalSnapshotChangeEventSource]
  24. 2021-11-30 01:38:45,296 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.customers' (2 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
  25. 2021-11-30 01:38:45,296 INFO MySQL|dbserver1|snapshot For table 'inventory.customers' using select statement: 'SELECT `id`, `first_name`, `last_name`, `email` FROM `inventory`.`customers`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
  26. 2021-11-30 01:38:45,304 INFO MySQL|dbserver1|snapshot Finished exporting 4 records for table 'inventory.customers'; total duration '00:00:00.008' [io.debezium.relational.RelationalSnapshotChangeEventSource]
  27. 2021-11-30 01:38:45,304 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.geom' (3 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
  28. 2021-11-30 01:38:45,305 INFO MySQL|dbserver1|snapshot For table 'inventory.geom' using select statement: 'SELECT `id`, `g`, `h` FROM `inventory`.`geom`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
  29. 2021-11-30 01:38:45,316 INFO MySQL|dbserver1|snapshot Finished exporting 3 records for table 'inventory.geom'; total duration '00:00:00.011' [io.debezium.relational.RelationalSnapshotChangeEventSource]
  30. 2021-11-30 01:38:45,316 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.orders' (4 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
  31. 2021-11-30 01:38:45,316 INFO MySQL|dbserver1|snapshot For table 'inventory.orders' using select statement: 'SELECT `order_number`, `order_date`, `purchaser`, `quantity`, `product_id` FROM `inventory`.`orders`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
  32. 2021-11-30 01:38:45,325 INFO MySQL|dbserver1|snapshot Finished exporting 4 records for table 'inventory.orders'; total duration '00:00:00.008' [io.debezium.relational.RelationalSnapshotChangeEventSource]
  33. 2021-11-30 01:38:45,325 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.products' (5 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
  34. 2021-11-30 01:38:45,325 INFO MySQL|dbserver1|snapshot For table 'inventory.products' using select statement: 'SELECT `id`, `name`, `description`, `weight` FROM `inventory`.`products`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
  35. 2021-11-30 01:38:45,343 INFO MySQL|dbserver1|snapshot Finished exporting 9 records for table 'inventory.products'; total duration '00:00:00.017' [io.debezium.relational.RelationalSnapshotChangeEventSource]
  36. 2021-11-30 01:38:45,344 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.products_on_hand' (6 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
  37. 2021-11-30 01:38:45,344 INFO MySQL|dbserver1|snapshot For table 'inventory.products_on_hand' using select statement: 'SELECT `product_id`, `quantity` FROM `inventory`.`products_on_hand`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
  38. 2021-11-30 01:38:45,353 INFO MySQL|dbserver1|snapshot Finished exporting 9 records for table 'inventory.products_on_hand'; total duration '00:00:00.009' [io.debezium.relational.RelationalSnapshotChangeEventSource]
  39. 2021-11-30 01:38:45,355 INFO MySQL|dbserver1|snapshot Snapshot - Final stage [io.debezium.pipeline.source.AbstractSnapshotChangeEventSource]
  40. 2021-11-30 01:38:45,356 INFO MySQL|dbserver1|snapshot Snapshot ended with SnapshotResult [status=COMPLETED, offset=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=156, currentRowNumber=0, serverId=0, sourceTime=2021-11-30T01:38:45.352Z, threadId=-1, currentQuery=null, tableIds=[inventory.products_on_hand], databaseName=inventory], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=156, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]] [io.debezium.pipeline.ChangeEventSourceCoordinator]
  41. ...

Each of these steps reports what the connector is doing to perform the consistent snapshot. For example, Step 6 involves reverse engineering the DDL create statements for the tables that are being captured and the global write lock just 1 second after acquiring it, and Step 7 reads all of the rows in each of the tables and reports the time taken and number of rows found. In this case, the connector completed its consistent snapshot in just under 1 second.

The snapshot process will take longer with your databases, but the connector outputs enough log messages that you can track what it is working on, even when the tables have a large number of rows. And although an exclusive write lock is used at the beginning of the snapshot process, it should not last very long even for large databases. This is because the lock is released before any data is copied. For more information, see the MySQL connector documentation.

Next, Kafka Connect reports some “errors”. However, you can safely ignore these warnings: these messages just mean that new Kafka topics were created and that Kafka had to assign a new leader for each one:

  1. ...
  2. 2021-11-30 01:38:45,555 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 3 : {dbserver1=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
  3. 2021-11-30 01:38:45,691 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 9 : {dbserver1.inventory.addresses=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
  4. 2021-11-30 01:38:45,813 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 13 : {dbserver1.inventory.customers=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
  5. 2021-11-30 01:38:45,927 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 18 : {dbserver1.inventory.geom=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
  6. 2021-11-30 01:38:46,043 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 22 : {dbserver1.inventory.orders=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
  7. 2021-11-30 01:38:46,153 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 26 : {dbserver1.inventory.products=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
  8. 2021-11-30 01:38:46,269 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 31 : {dbserver1.inventory.products_on_hand=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
  9. ...

Finally, the log output shows that the connector has transitioned from its snapshot mode into continuously reading the MySQL server’s binlog:

  1. ...
  2. 2021-11-30 01:38:45,362 INFO MySQL|dbserver1|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator]
  3. ...
  4. Nov 30, 2021 1:38:45 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
  5. INFO: Connected to mysql:3306 at mysql-bin.000003/156 (sid:184054, cid:13)
  6. 2021-11-30 01:38:45,392 INFO MySQL|dbserver1|binlog Connected to MySQL binlog at mysql:3306, starting at MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=156, currentRowNumber=0, serverId=0, sourceTime=2021-11-30T01:38:45.352Z, threadId=-1, currentQuery=null, tableIds=[inventory.products_on_hand], databaseName=inventory], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=156, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]] [io.debezium.connector.mysql.MySqlStreamingChangeEventSource]
  7. 2021-11-30 01:38:45,392 INFO MySQL|dbserver1|streaming Waiting for keepalive thread to start [io.debezium.connector.mysql.MySqlStreamingChangeEventSource]
  8. 2021-11-30 01:38:45,393 INFO MySQL|dbserver1|binlog Creating thread debezium-mysqlconnector-dbserver1-binlog-client [io.debezium.util.Threads]
  9. ...

Viewing change events

After deploying the Debezium MySQL connector, it starts monitoring the inventory database for data change events.

When you watched the connector start up, you saw that events were written to the following topics with the dbserver1 prefix (the name of the connector):

dbserver1

The schema change topic to which all of the DDL statements are written.

dbserver1.inventory.products

Captures change events for the products table in the inventory database.

dbserver1.inventory.products_on_hand

Captures change events for the products_on_hand table in the inventory database.

dbserver1.inventory.customers

Captures change events for the customers table in the inventory database.

dbserver1.inventory.orders

Captures change events for the orders table in the inventory database.

For this tutorial, you will explore the dbserver1.inventory.customers topic. In this topic, you will see different types of change events to see how the connector captured them:

Viewing a create event

By viewing the dbserver1.inventory.customers topic, you can see how the MySQL connector captured create events in the inventory database. In this case, the create events capture new customers being added to the database.

Procedure

  1. Open a new terminal, and use it to start the watch-topic utility to watch the dbserver1.inventory.customers topic from the beginning of the topic.

    The watch-topic utility is very simple and limited in functionality. It is not intended to be used by an application to consume events. In that scenario, you would instead use Kafka consumers and the applicable consumer libraries that offer full functionality and flexibility.

    This command runs the watch-topic utility in a new container using the 2.1 version of the debezium/kafka image:

    1. $ docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:2.1 watch-topic -a -k dbserver1.inventory.customers

    -a

    Watches all events since the topic was created. Without this option, watch-topic would only show the events recorded after you start watching.

    -k

    Specifies that the output should include the event’s key. In this case, this contains the row’s primary key.

    If you use Podman, run the following command:

    1. $ sudo podman run -it rm name watcher pod dbz quay.io/debezium/kafka:2.1 watch-topic -a -k dbserver1.inventory.customers

    The watch-topic utility returns the event records from the customers table. There are four events, one for each row in the table. Each event is formatted in JSON, because that is how you configured the Kafka Connect service. There are two JSON documents for each event: one for the key, and one for the value.

    You should see output similar to the following:

    1. Using ZOOKEEPER_CONNECT=172.17.0.2:2181
    2. Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.7:9092
    3. Using KAFKA_BROKER=172.17.0.3:9092
    4. Contents of topic dbserver1.inventory.customers:
    5. {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}}
    6. ...

    This utility keeps watching the topic, so any new events will automatically appear as long as the utility is running.

  2. For the last event, review the details of the key.

    Here are the details of the key of the last event (formatted for readability):

    1. {
    2. "schema":{
    3. "type":"struct",
    4. "fields":[
    5. {
    6. "type":"int32",
    7. "optional":false,
    8. "field":"id"
    9. }
    10. ],
    11. "optional":false,
    12. "name":"dbserver1.inventory.customers.Key"
    13. },
    14. "payload":{
    15. "id":1004
    16. }
    17. }

    The event has two parts: a schema and a payload. The schema contains a Kafka Connect schema describing what is in the payload. In this case, the payload is a struct named dbserver1.inventory.customers.Key that is not optional and has one required field (id of type int32).

    The payload has a single id field, with a value of 1004.

    By reviewing the key of the event, you can see that this event applies to the row in the inventory.customers table whose id primary key column had a value of 1004.

  3. Review the details of the same event’s value.

    The event’s value shows that the row was created, and describes what it contains (in this case, the id, first_name, last_name, and email of the inserted row).

    Here are the details of the value of the last event (formatted for readability):

    1. {
    2. "schema": {
    3. "type": "struct",
    4. "fields": [
    5. {
    6. "type": "struct",
    7. "fields": [
    8. {
    9. "type": "int32",
    10. "optional": false,
    11. "field": "id"
    12. },
    13. {
    14. "type": "string",
    15. "optional": false,
    16. "field": "first_name"
    17. },
    18. {
    19. "type": "string",
    20. "optional": false,
    21. "field": "last_name"
    22. },
    23. {
    24. "type": "string",
    25. "optional": false,
    26. "field": "email"
    27. }
    28. ],
    29. "optional": true,
    30. "name": "dbserver1.inventory.customers.Value",
    31. "field": "before"
    32. },
    33. {
    34. "type": "struct",
    35. "fields": [
    36. {
    37. "type": "int32",
    38. "optional": false,
    39. "field": "id"
    40. },
    41. {
    42. "type": "string",
    43. "optional": false,
    44. "field": "first_name"
    45. },
    46. {
    47. "type": "string",
    48. "optional": false,
    49. "field": "last_name"
    50. },
    51. {
    52. "type": "string",
    53. "optional": false,
    54. "field": "email"
    55. }
    56. ],
    57. "optional": true,
    58. "name": "dbserver1.inventory.customers.Value",
    59. "field": "after"
    60. },
    61. {
    62. "type": "struct",
    63. "fields": [
    64. {
    65. "type": "string",
    66. "optional": true,
    67. "field": "version"
    68. },
    69. {
    70. "type": "string",
    71. "optional": false,
    72. "field": "name"
    73. },
    74. {
    75. "type": "int64",
    76. "optional": false,
    77. "field": "server_id"
    78. },
    79. {
    80. "type": "int64",
    81. "optional": false,
    82. "field": "ts_sec"
    83. },
    84. {
    85. "type": "string",
    86. "optional": true,
    87. "field": "gtid"
    88. },
    89. {
    90. "type": "string",
    91. "optional": false,
    92. "field": "file"
    93. },
    94. {
    95. "type": "int64",
    96. "optional": false,
    97. "field": "pos"
    98. },
    99. {
    100. "type": "int32",
    101. "optional": false,
    102. "field": "row"
    103. },
    104. {
    105. "type": "boolean",
    106. "optional": true,
    107. "field": "snapshot"
    108. },
    109. {
    110. "type": "int64",
    111. "optional": true,
    112. "field": "thread"
    113. },
    114. {
    115. "type": "string",
    116. "optional": true,
    117. "field": "db"
    118. },
    119. {
    120. "type": "string",
    121. "optional": true,
    122. "field": "table"
    123. }
    124. ],
    125. "optional": false,
    126. "name": "io.debezium.connector.mysql.Source",
    127. "field": "source"
    128. },
    129. {
    130. "type": "string",
    131. "optional": false,
    132. "field": "op"
    133. },
    134. {
    135. "type": "int64",
    136. "optional": true,
    137. "field": "ts_ms"
    138. }
    139. ],
    140. "optional": false,
    141. "name": "dbserver1.inventory.customers.Envelope",
    142. "version": 1
    143. },
    144. "payload": {
    145. "before": null,
    146. "after": {
    147. "id": 1004,
    148. "first_name": "Anne",
    149. "last_name": "Kretchmar",
    150. "email": "annek@noanswer.org"
    151. },
    152. "source": {
    153. "version": "2.1.4.Final",
    154. "name": "dbserver1",
    155. "server_id": 0,
    156. "ts_sec": 0,
    157. "gtid": null,
    158. "file": "mysql-bin.000003",
    159. "pos": 154,
    160. "row": 0,
    161. "snapshot": true,
    162. "thread": null,
    163. "db": "inventory",
    164. "table": "customers"
    165. },
    166. "op": "r",
    167. "ts_ms": 1486500577691
    168. }
    169. }

    This portion of the event is much longer, but like the event’s key, it also has a schema and a payload. The schema contains a Kafka Connect schema named dbserver1.inventory.customers.Envelope (version 1) that can contain five fields:

    op

    A required field that contains a string value describing the type of operation. Values for the MySQL connector are c for create (or insert), u for update, d for delete, and r for read (in the case of a snapshot).

    before

    An optional field that, if present, contains the state of the row before the event occurred. The structure will be described by the dbserver1.inventory.customers.Value Kafka Connect schema, which the dbserver1 connector uses for all rows in the inventory.customers table.

    after

    An optional field that, if present, contains the state of the row after the event occurred. The structure is described by the same dbserver1.inventory.customers.Value Kafka Connect schema used in before.

    source

    A required field that contains a structure describing the source metadata for the event, which in the case of MySQL, contains several fields: the connector name, the name of the binlog file where the event was recorded, the position in that binlog file where the event appeared, the row within the event (if there is more than one), the names of the affected database and table, the MySQL thread ID that made the change, whether this event was part of a snapshot, and, if available, the MySQL server ID, and the timestamp in seconds.

    ts_ms

    An optional field that, if present, contains the time (using the system clock in the JVM running the Kafka Connect task) at which the connector processed the event.

    The JSON representations of the events are much longer than the rows they describe. This is because, with every event key and value, Kafka Connect ships the schema that describes the payload. Over time, this structure may change. However, having the schemas for the key and the value in the event itself makes it much easier for consuming applications to understand the messages, especially as they evolve over time.

    The Debezium MySQL connector constructs these schemas based upon the structure of the database tables. If you use DDL statements to alter the table definitions in the MySQL databases, the connector reads these DDL statements and updates its Kafka Connect schemas. This is the only way that each event is structured exactly like the table from where it originated at the time the event occurred. However, the Kafka topic containing all of the events for a single table might have events that correspond to each state of the table definition.

    The JSON converter includes the key and value schemas in every message, so it does produce very verbose events. Alternatively, you can use Apache Avro as a serialization format, which results in far smaller event messages. This is because it transforms each Kafka Connect schema into an Avro schema and stores the Avro schemas in a separate Schema Registry service. Thus, when the Avro converter serializes an event message, it places only a unique identifier for the schema along with an Avro-encoded binary representation of the value. As a result, the serialized messages that are transferred over the wire and stored in Kafka are far smaller than what you have seen here. In fact, the Avro Converter is able to use Avro schema evolution techniques to maintain the history of each schema in the Schema Registry.

  4. Compare the event’s key and value schemas to the state of the inventory database. In the terminal that is running the MySQL command line client, run the following statement:

    1. mysql> SELECT * FROM customers;
    2. +------+------------+-----------+-----------------------+
    3. | id | first_name | last_name | email |
    4. +------+------------+-----------+-----------------------+
    5. | 1001 | Sally | Thomas | sally.thomas@acme.com |
    6. | 1002 | George | Bailey | gbailey@foobar.com |
    7. | 1003 | Edward | Walker | ed@walker.com |
    8. | 1004 | Anne | Kretchmar | annek@noanswer.org |
    9. +------+------------+-----------+-----------------------+
    10. 4 rows in set (0.00 sec)

    This shows that the event records you reviewed match the records in the database.

Updating the database and viewing the update event

Now that you have seen how the Debezium MySQL connector captured the create events in the inventory database, you will now change one of the records and see how the connector captures it.

By completing this procedure, you will learn how to find details about what changed in a database commit, and how you can compare change events to determine when the change occurred in relation to other changes.

Procedure

  1. In the terminal that is running the MySQL command line client, run the following statement:

    1. mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
    2. Query OK, 1 row affected (0.05 sec)
    3. Rows matched: 1 Changed: 1 Warnings: 0
  2. View the updated customers table:

    1. mysql> SELECT * FROM customers;
    2. +------+------------+-----------+-----------------------+
    3. | id | first_name | last_name | email |
    4. +------+------------+-----------+-----------------------+
    5. | 1001 | Sally | Thomas | sally.thomas@acme.com |
    6. | 1002 | George | Bailey | gbailey@foobar.com |
    7. | 1003 | Edward | Walker | ed@walker.com |
    8. | 1004 | Anne Marie | Kretchmar | annek@noanswer.org |
    9. +------+------------+-----------+-----------------------+
    10. 4 rows in set (0.00 sec)
  3. Switch to the terminal running watch-topic to see a new fifth event.

    By changing a record in the customers table, the Debezium MySQL connector generated a new event. You should see two new JSON documents: one for the event’s key, and one for the new event’s value.

    Here are the details of the key for the update event (formatted for readability):

    1. {
    2. "schema": {
    3. "type": "struct",
    4. "name": "dbserver1.inventory.customers.Key"
    5. "optional": false,
    6. "fields": [
    7. {
    8. "field": "id",
    9. "type": "int32",
    10. "optional": false
    11. }
    12. ]
    13. },
    14. "payload": {
    15. "id": 1004
    16. }
    17. }

    This key is the same as the key for the previous events.

    Here is that new event’s value. There are no changes in the schema section, so only the payload section is shown (formatted for readability):

    1. {
    2. "schema": {...},
    3. "payload": {
    4. "before": { (1)
    5. "id": 1004,
    6. "first_name": "Anne",
    7. "last_name": "Kretchmar",
    8. "email": "annek@noanswer.org"
    9. },
    10. "after": { (2)
    11. "id": 1004,
    12. "first_name": "Anne Marie",
    13. "last_name": "Kretchmar",
    14. "email": "annek@noanswer.org"
    15. },
    16. "source": { (3)
    17. "name": "2.1.4.Final",
    18. "name": "dbserver1",
    19. "server_id": 223344,
    20. "ts_sec": 1486501486,
    21. "gtid": null,
    22. "file": "mysql-bin.000003",
    23. "pos": 364,
    24. "row": 0,
    25. "snapshot": null,
    26. "thread": 3,
    27. "db": "inventory",
    28. "table": "customers"
    29. },
    30. "op": "u", (4)
    31. "ts_ms": 1486501486308 (5)
    32. }
    33. }
    1The before field now has the state of the row with the values before the database commit.
    2The after field now has the updated state of the row, and the first_name value is now Anne Marie.
    3The source field structure has many of the same values as before, except that the ts_sec and pos fields have changed (the file might have changed in other circumstances).
    4The op field value is now u, signifying that this row changed because of an update.
    5The ts_ms field shows the time stamp for when Debezium processed this event.

    By viewing the payload section, you can learn several important things about the update event:

    • By comparing the before and after structures, you can determine what actually changed in the affected row because of the commit.

    • By reviewing the source structure, you can find information about MySQL’s record of the change (providing traceability).

    • By comparing the payload section of an event to other events in the same topic (or a different topic), you can determine whether the event occurred before, after, or as part of the same MySQL commit as another event.

Deleting a record in the database and viewing the delete event

Now that you have seen how the Debezium MySQL connector captured the create and update events in the inventory database, you will now delete one of the records and see how the connector captures it.

By completing this procedure, you will learn how to find details about delete events, and how Kafka uses log compaction to reduce the number of delete events while still enabling consumers to get all of the events.

Procedure

  1. In the terminal that is running the MySQL command line client, run the following statement:

    1. mysql> DELETE FROM customers WHERE id=1004;
    2. Query OK, 1 row affected (0.00 sec)

    If the above command fails with a foreign key constraint violation, then you must remove the reference of the customer address from the addresses table using the following statement:

    1. mysql> DELETE FROM addresses WHERE customer_id=1004;
  2. Switch to the terminal running watch-topic to see two new events.

    By deleting a row in the customers table, the Debezium MySQL connector generated two new events.

  3. Review the key and value for the first new event.

    Here are the details of the key for the first new event (formatted for readability):

    1. {
    2. "schema": {
    3. "type": "struct",
    4. "name": "dbserver1.inventory.customers.Key"
    5. "optional": false,
    6. "fields": [
    7. {
    8. "field": "id",
    9. "type": "int32",
    10. "optional": false
    11. }
    12. ]
    13. },
    14. "payload": {
    15. "id": 1004
    16. }
    17. }

    This key is the same as the key in the previous two events you looked at.

    Here is the value of the first new event (formatted for readability):

    1. {
    2. "schema": {...},
    3. "payload": {
    4. "before": { (1)
    5. "id": 1004,
    6. "first_name": "Anne Marie",
    7. "last_name": "Kretchmar",
    8. "email": "annek@noanswer.org"
    9. },
    10. "after": null, (2)
    11. "source": { (3)
    12. "name": "2.1.4.Final",
    13. "name": "dbserver1",
    14. "server_id": 223344,
    15. "ts_sec": 1486501558,
    16. "gtid": null,
    17. "file": "mysql-bin.000003",
    18. "pos": 725,
    19. "row": 0,
    20. "snapshot": null,
    21. "thread": 3,
    22. "db": "inventory",
    23. "table": "customers"
    24. },
    25. "op": "d", (4)
    26. "ts_ms": 1486501558315 (5)
    27. }
    28. }
    1The before field now has the state of the row that was deleted with the database commit.
    2The after field is null because the row no longer exists.
    3The source field structure has many of the same values as before, except the ts_sec and pos fields have changed (the file might have changed in other circumstances).
    4The op field value is now d, signifying that this row was deleted.
    5The ts_ms field shows the time stamp for when Debezium processes this event.

    Thus, this event provides a consumer with the information that it needs to process the removal of the row. The old values are also provided, because some consumers might require them to properly handle the removal.

  4. Review the key and value for the second new event.

    Here is the key for the second new event (formatted for readability):

    1. {
    2. "schema": {
    3. "type": "struct",
    4. "name": "dbserver1.inventory.customers.Key"
    5. "optional": false,
    6. "fields": [
    7. {
    8. "field": "id",
    9. "type": "int32",
    10. "optional": false
    11. }
    12. ]
    13. },
    14. "payload": {
    15. "id": 1004
    16. }
    17. }

    Once again, this key is exactly the same key as in the previous three events you looked at.

    Here is the value of that same event (formatted for readability):

    1. {
    2. "schema": null,
    3. "payload": null
    4. }

    If Kafka is set up to be log compacted, it will remove older messages from the topic if there is at least one message later in the topic with same key. This last event is called a tombstone event, because it has a key and an empty value. This means that Kafka will remove all prior messages with the same key. Even though the prior messages will be removed, the tombstone event means that consumers can still read the topic from the beginning and not miss any events.

Restarting the Kafka Connect service

Now that you have seen how the Debezium MySQL connector captures create, update, and delete events, you will now see how it can capture change events even when it is not running.

The Kafka Connect service automatically manages tasks for its registered connectors. Therefore, if it goes offline, when it restarts, it will start any non-running tasks. This means that even if Debezium is not running, it can still report changes in a database.

In this procedure, you will stop Kafka Connect, change some data in the database, and then restart Kafka Connect to see the change events.

Procedure

  1. Open a new terminal and use it to stop the connect container that is running the Kafka Connect service:

    1. $ docker stop connect

    The connect container is stopped, and the Kafka Connect service gracefully shuts down.

    Because you ran the container with the --rm option, Docker removes the container once it stops.

  2. While the service is down, switch to the terminal for the MySQL command line client, and add a few records:

    1. mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
    2. mysql> INSERT INTO customers VALUES (default, "Kenneth", "Anderson", "kander@acme.com");

    The records are added to the database. However, because Kafka Connect is not running, watch-topic does not record any updates.

    In a production system, you would have enough brokers to handle the producers and consumers, and to maintain a minimum number of in-sync replicas for each topic. Therefore, if enough brokers failed such that there were no longer the minimum number of ISRs, Kafka would become unavailable. In this scenario, producers (like the Debezium connectors) and consumers will wait for the Kafka cluster or network to recover. This means that, temporarily, your consumers might not see any change events as data is changed in the databases. This is because no change events are being produced. As soon as the Kafka cluster is restarted or the network recovers, Debezium will resume producing change events, and your consumers will resume consuming events where they left off.

  3. Open a new terminal, and use it to restart the Kafka Connect service in a container.

    This command starts Kafka Connect using the same options you used when you initially started it:

    1. $ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:2.1

    The Kafka Connect service starts, connects to Kafka, reads the previous service’s configuration, and starts the registered connectors that will resume where they last left off.

    Here are the last few lines from this restarted service:

    1. ...
    2. 2021-11-30 01:49:07,938 INFO || Get all known binlogs from MySQL [io.debezium.connector.mysql.MySqlConnection]
    3. 2021-11-30 01:49:07,941 INFO || MySQL has the binlog file 'mysql-bin.000003' required by the connector [io.debezium.connector.mysql.MySqlConnectorTask]
    4. 2021-11-30 01:49:07,967 INFO || Requested thread factory for connector MySqlConnector, id = dbserver1 named = change-event-source-coordinator [io.debezium.util.Threads]
    5. 2021-11-30 01:49:07,968 INFO || Creating thread debezium-mysqlconnector-dbserver1-change-event-source-coordinator [io.debezium.util.Threads]
    6. 2021-11-30 01:49:07,968 INFO || WorkerSourceTask{id=inventory-connector-0} Source task finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask]
    7. 2021-11-30 01:49:07,971 INFO MySQL|dbserver1|snapshot Metrics registered [io.debezium.pipeline.ChangeEventSourceCoordinator]
    8. 2021-11-30 01:49:07,971 INFO MySQL|dbserver1|snapshot Context created [io.debezium.pipeline.ChangeEventSourceCoordinator]
    9. 2021-11-30 01:49:07,976 INFO MySQL|dbserver1|snapshot A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted. [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
    10. 2021-11-30 01:49:07,977 INFO MySQL|dbserver1|snapshot Snapshot ended with SnapshotResult [status=SKIPPED, offset=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=156, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null], snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=156, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]] [io.debezium.pipeline.ChangeEventSourceCoordinator]
    11. 2021-11-30 01:49:07,981 INFO MySQL|dbserver1|streaming Requested thread factory for connector MySqlConnector, id = dbserver1 named = binlog-client [io.debezium.util.Threads]
    12. 2021-11-30 01:49:07,983 INFO MySQL|dbserver1|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator]
    13. ...

    These lines show that the service found the offsets previously recorded by the last task before it was shut down, connected to the MySQL database, started reading the binlog from that position, and generated events from any changes in the MySQL database since that point in time.

  4. Switch to the terminal running watch-topic to see events for the two new records you created when Kafka Connect was offline:

    1. {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"Sarah","last_name":"Thompson","email":"kitt@acme.com"},"source":{"version":"2.1.4.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635153,"gtid":null,"file":"mysql-bin.000003","pos":1046,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181455}}
    2. {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1006}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1006,"first_name":"Kenneth","last_name":"Anderson","email":"kander@acme.com"},"source":{"version":"2.1.4.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635160,"gtid":null,"file":"mysql-bin.000003","pos":1356,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181456}}

    These events are create events that are similar to what you saw previously. As you see, Debezium still reports all of the changes in a database even when it is not running (as long as it is restarted before the MySQL database purges from its binlog the commits that were missed).

Cleaning up

After you are finished with the tutorial, you can use Docker to stop all of the running containers.

Procedure

  1. Stop each of the containers:

    1. $ docker stop mysqlterm watcher connect mysql kafka zookeeper

    Docker stops each container. Because you used the --rm option when you started them, Docker also removes them.

If you use Podman, run the following command:

  1. $ sudo podman pod kill dbz
  2. $ sudo podman pod rm dbz
  1. Verify that all of the processes have stopped and have been removed:

    1. $ docker ps -a

    If any of the processes are still running, stop them using docker stop _<process-name>_ or docker stop _<containerId>_.

Next steps

After completing the tutorial, consider the following next steps:

  • Explore the tutorial further.

    Use the MySQL command line client to add, modify, and remove rows in the database tables, and see the effect on the topics. You may need to run a separate watch-topic command for each topic. Keep in mind that you cannot remove a row that is referenced by a foreign key.

  • Try running the tutorial with Debezium connectors for Postgres, MongoDB, SQL Server, and Oracle.

    You can use the Docker Compose version of this tutorial located in the Debezium examples repository. Docker Compose files are provided for running the tutorial with MySQL, Postgres, MongoDB, SQL Server, and Oracle.