Tutorial
Introduction to Debezium
Debezium is a distributed platform that converts information from your existing databases into event streams, enabling applications to detect, and immediately respond to row-level changes in the databases.
Debezium is built on top of Apache Kafka and provides a set of Kafka Connect compatible connectors. Each of the connectors works with a specific database management system (DBMS). Connectors record the history of data changes in the DBMS by detecting changes as they occur, and streaming a record of each change event to a Kafka topic. Consuming applications can then read the resulting event records from the Kafka topic.
By taking advantage of Kafka’s reliable streaming platform, Debezium makes it possible for applications to consume changes that occur in a database correctly and completely. Even if your application stops unexpectedly, or loses its connection, it does not miss events that occur during the outage. After the application restarts, it resumes reading from the topic from the point where it left off.
The tutorial that follows shows you how to deploy and use the Debezium MySQL connector with a simple configuration. For more information about deploying and using Debezium connectors, see the connector documentation.
Additional resources
Starting the services
Using Debezium requires three separate services: ZooKeeper, Kafka, and the Debezium connector service. In this tutorial, you will set up a single instance of each service using Docker and the Debezium container images.
To start the services needed for this tutorial, you must:
Considerations for running Debezium with Docker
This tutorial uses Docker and the Debezium container images to run the ZooKeeper, Kafka, Debezium, and MySQL services. Running each service in a separate container simplifies the setup so that you can see Debezium in action.
In a production environment, you would run multiple instances of each service to provide performance, reliability, replication, and fault tolerance. Typically, you would either deploy these services on a platform like OpenShift or Kubernetes that manages multiple Docker containers running on multiple hosts and machines, or you would install on dedicated hardware. |
You should be aware of the following considerations for running Debezium with Docker:
The containers for ZooKeeper and Kafka are ephemeral.
ZooKeeper and Kafka would typically store their data locally inside the containers, which would require you to mount directories on the host machine as volumes. That way, when the containers are stopped, the persisted data remains. However, this tutorial skips this setup - when a container is stopped, all persisted data is lost. This way, cleanup is simple when you complete the tutorial.
For more information about storing persistent data, see the documentation for the container images.
This tutorial requires you to run each service in a different container.
To avoid confusion, you will run each container in the foreground in a separate terminal. This way, all of the output of a container will be displayed in the terminal used to run it.
Docker also allows you to run a container in detached mode (with the
-d
option), where the container is started and thedocker
command returns immediately. However, detached mode containers do not display their output in the terminal. To see the output, you would need to use thedocker logs —follow —name <container-name>
command. For more information, see the Docker documentation.
Starting Zookeeper
ZooKeeper is the first service you must start.
Procedure
Open a terminal and use it to start ZooKeeper in a container.
This command runs a new container using version 2.4 of the
quay.io/debezium/zookeeper
image:$ docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:2.4
-it
The container is interactive, which means the terminal’s standard input and output are attached to the container.
--rm
The container will be removed when it is stopped.
--name zookeeper
The name of the container.
-p 2181:2181 -p 2888:2888 -p 3888:3888
Maps three of the container’s ports to the same ports on the Docker host. This enables other containers (and applications outside of the container) to communicate with ZooKeeper.
If you use Podman, run the following command:s
|
Verify that ZooKeeper started and is listening on port
2181
.You should see output similar to the following:
Starting up in standalone mode
ZooKeeper JMX enabled by default
Using config: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,417 - INFO [main:QuorumPeerConfig@134] - Reading configuration from: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1
...
port 0.0.0.0/0.0.0.0:2181 (1)
1 This line indicates that ZooKeeper is ready and listening on port 2181. The terminal will continue to show additional output as ZooKeeper generates it.
Starting Kafka
After starting ZooKeeper, you can start Kafka in a new container.
Debezium 2.5.4.Final has been tested against multiple versions of Kafka Connect. Please refer to the Debezium Test Matrix to determine compatibility between Debezium and Kafka Connect. |
Procedure
Open a new terminal and use it to start Kafka in a container.
This command runs a new container using version 2.4 of the
quay.io/debezium/kafka
image:$ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:2.4
-it
The container is interactive, which means the terminal’s standard input and output are attached to the container.
--rm
The container will be removed when it is stopped.
--name kafka
The name of the container.
-p 9092:9092
Maps port
9092
in the container to the same port on the Docker host so that applications outside of the container can communicate with Kafka.--link zookeeper:zookeeper
Tells the container that it can find ZooKeeper in the
zookeeper
container, which is running on the same Docker host.If you use Podman, run the following command:
$ sudo podman run -it —rm —name kafka —pod dbz quay.io/debezium/kafka:2.4
In this tutorial, you will always connect to Kafka from within a Docker container. Any of these containers can communicate with the
kafka
container by linking to it. If you needed to connect to Kafka from outside of a Docker container, you would have to set the-e
option to advertise the Kafka address through the Docker host (-e ADVERTISED_HOST_NAME=
followed by either the IP address or resolvable host name of the Docker host).Verify that Kafka started.
You should see output similar to the following:
...
2017-09-21 07:16:59,085 - INFO [main-EventThread:ZkClient@713] - zookeeper state changed (SyncConnected)
2017-09-21 07:16:59,218 - INFO [main:Logging$class@70] - Cluster ID = LPtcBFxzRvOzDSXhc6AamA
...
2017-09-21 07:16:59,649 - INFO [main:Logging$class@70] - [Kafka Server 1], started (1)
1 The Kafka broker has successfully started and is ready for client connections. The terminal will continue to show additional output as Kafka generates it.
Starting a MySQL database
At this point, you have started ZooKeeper and Kafka, but you still need a database server from which Debezium can capture changes. In this procedure, you will start a MySQL server with an example database.
Procedure
Open a new terminal, and use it to start a new container that runs a MySQL database server preconfigured with an
inventory
database.This command runs a new container using version 2.4 of the
quay.io/debezium/example-mysql
image, which is based on the mysql:8.2 image. It also defines and populates a sampleinventory
database:$ docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:2.4
-it
The container is interactive, which means the terminal’s standard input and output are attached to the container.
--rm
The container will be removed when it is stopped.
--name mysql
The name of the container.
-p 3306:3306
Maps port
3306
(the default MySQL port) in the container to the same port on the Docker host so that applications outside of the container can connect to the database server.-e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw
Creates a user and password that has the minimum privileges required by the Debezium MySQL connector.
If you use Podman, run the following command:
|
Verify that the MySQL server starts.
The MySQL server starts and stops a few times as the configuration is modified. You should see output similar to the following:
...
[System] [MY-010931] [Server] /usr/sbin/mysqld: ready for connections. Version: '8.0.27' socket: '/var/run/mysqld/mysqld.sock' port: 3306 MySQL Community Server - GPL.
[System] [MY-011323] [Server] X Plugin ready for connections. Bind-address: '::' port: 33060, socket: /var/run/mysqld/mysqlx.sock
Starting a MySQL command line client
After starting MySQL, you start a MySQL command line client so that you access the sample inventory
database.
Procedure
Open a new terminal, and use it to start the MySQL command line client in a container.
This command runs a new container using the mysql:8.2 image, and defines a shell command to run the MySQL command line client with the correct options:
$ docker run -it --rm --name mysqlterm --link mysql --rm mysql:8.2 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
-it
The container is interactive, which means the terminal’s standard input and output are attached to the container.
--rm
The container will be removed when it is stopped.
--name mysqlterm
The name of the container.
--link mysql
Links the container to the
mysql
container.
If you use Podman, run the following command:
|
Verify that the MySQL command line client started.
You should see output similar to the following:
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 9
Server version: 8.0.27 MySQL Community Server - GPL
Copyright (c) 2000, 2021, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql>
At the
mysql>
command prompt, switch to the inventory database:mysql> use inventory;
List the tables in the database:
mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses |
| customers |
| geom |
| orders |
| products |
| products_on_hand |
+---------------------+
6 rows in set (0.00 sec)
Use the MySQL command line client to explore the database and view the pre-loaded data in the database.
For example:
mysql> SELECT * FROM customers;
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)
Starting Kafka Connect
After starting MySQL and connecting to the inventory
database with the MySQL command line client, you start the Kafka Connect service. This service exposes a REST API to manage the Debezium MySQL connector.
Procedure
Open a new terminal, and use it to start the Kafka Connect service in a container.
This command runs a new container using the 2.4 version of the
quay.io/debezium/connect
image:$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:2.4
-it
The container is interactive, which means the terminal’s standard input and output are attached to the container.
--rm
The container will be removed when it is stopped.
--name connect
The name of the container.
-p 8083:8083
Maps port
8083
in the container to the same port on the Docker host. This enables applications outside of the container to use Kafka Connect’s REST API to set up and manage new container instances.-e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses
Sets environment variables required by the Debezium image.
--link kafka:kafka --link mysql:mysql
Links the container to the containers that are running Kafka and the MySQL server.
If you use Podman, run the following command:
|
If you provide a If this is a problem then set environment variable |
Verify that Kafka Connect started and is ready to accept connections.
You should see output similar to the following:
...
2020-02-06 15:48:33,939 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser]
...
2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset -1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
Use the Kafka Connect REST API to check the status of the Kafka Connect service.
Kafka Connect exposes a REST API to manage Debezium connectors. To communicate with the Kafka Connect service, you can use the
curl
command to send API requests to port 8083 of the Docker host (which you mapped to port 8083 in theconnect
container when you started Kafka Connect).These commands use
localhost
. If you are using a non-native Docker platform (such as Docker Toolbox), replacelocalhost
with the IP address of your Docker host.Open a new terminal and check the status of the Kafka Connect service:
$ curl -H "Accept:application/json" localhost:8083/
{"version":"3.6.1","commit":"cb8625948210849f"} (1)
1 The response shows that Kafka Connect version 3.6.1 is running. Check the list of connectors registered with Kafka Connect:
$ curl -H "Accept:application/json" localhost:8083/connectors/
[] (1)
1 No connectors are currently registered with Kafka Connect.
Deploying the MySQL connector
After starting the Debezium and MySQL services, you are ready to deploy the Debezium MySQL connector so that it can start monitoring the sample MySQL database (inventory
).
At this point, you are running the Debezium services, a MySQL database server with a sample inventory
database, and the MySQL command line client that is connected to the database. To deploy the MySQL connector, you must:
Register the MySQL connector to monitor the inventory database
After the connector is registered, it will start monitoring the database server’s
binlog
and it will generate change events for each row that changes.Watch the MySQL connector start
Reviewing the Kafka Connect log output as the connector starts helps you to better understand each task it must complete before it can start monitoring the
binlog
.
Registering a connector to monitor the inventory
database
By registering the Debezium MySQL connector, the connector will start monitoring the MySQL database server’s binlog
. The binlog
records all of the database’s transactions (such as changes to individual rows and changes to the schemas). When a row in the database changes, Debezium generates a change event.
In a production environment, you would typically either use the Kafka tools to manually create the necessary topics, including specifying the number of replicas, or you’d use the Kafka Connect mechanism for customizing the settings of auto-created topics. However, for this tutorial, Kafka is configured to automatically create the topics with just one replica. |
Procedure
Review the configuration of the Debezium MySQL connector that you will register.
Before registering the connector, you should be familiar with its configuration. In the next step, you will register the following connector:
{
"name": "inventory-connector", (1)
"config": { (2)
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1", (3)
"database.hostname": "mysql", (4)
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054", (5)
"topic.prefix": "dbserver1", (5)
"database.include.list": "inventory", (6)
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092", (7)
"schema.history.internal.kafka.topic": "schema-changes.inventory" (7)
}
}
1 The name of the connector. 2 The connector’s configuration. 3 Only one task should operate at any one time. Because the MySQL connector reads the MySQL server’s binlog
, using a single connector task ensures proper order and event handling. The Kafka Connect service uses connectors to start one or more tasks that do the work, and it automatically distributes the running tasks across the cluster of Kafka Connect services. If any of the services stop or crash, those tasks will be redistributed to running services.4 The database host, which is the name of the Docker container running the MySQL server ( mysql
). Docker manipulates the network stack within the containers so that each linked container can be resolved with/etc/hosts
using the container name for the host name. If MySQL were running on a normal network, you would specify the IP address or resolvable host name for this value.5 A unique topic prefix. This name will be used as the prefix for all Kafka topics. 6 Only changes in the inventory
database will be detected.7 The connector will store the history of the database schemas in Kafka using this broker (the same broker to which you are sending events) and topic name. Upon restart, the connector will recover the schemas of the database that existed at the point in time in the binlog
when the connector should begin reading.For more information, see MySQL connector configuration properties.
For security reasons, you shouldn’t put passwords or other secrets in plain text into connector configurations. Instead, any secrets should be externalized via the mechanism defined in KIP-297(“Externalizing Secrets for Connect Configurations”). |
Open a new terminal, and use the
curl
command to register the Debezium MySQL connector.This command uses the Kafka Connect service’s API to submit a
POST
request against the/connectors
resource with a JSON document that describes the new connector (calledinventory-connector
).This command uses
localhost
to connect to the Docker host. If you are using a non-native Docker platform, replacelocalhost
with the IP address of of your Docker host.$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "topic.prefix": "dbserver1", "database.include.list": "inventory", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory" } }'
Windows users may need to escape the double-quotes. For example:
$ curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” localhost:8083/connectors/ -d ‘{ \”name\”: \”inventory-connector\”, \”config\”: { \”connector.class\”: \”io.debezium.connector.mysql.MySqlConnector\”, \”tasks.max\”: \”1\”, \”database.hostname\”: \”mysql\”, \”database.port\”: \”3306\”, \”database.user\”: \”debezium\”, \”database.password\”: \”dbz\”, \”database.server.id\”: \”184054\”, \”topic.prefix\”: \”dbserver1\”, \”database.include.list\”: \”inventory\”, \”schema.history.internal.kafka.bootstrap.servers\”: \”kafka:9092\”, \”schema.history.internal.kafka.topic\”: \”schemahistory.inventory\” } }’
Otherwise, you might see an error like the following:
{“error_code”:500,”message”:”Unexpected character (‘n’ (code 110)): was expecting double-quote to start field name\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 4]”}
If you use Podman, run the following command:
|
Verify that
inventory-connector
is included in the list of connectors:$ curl -H "Accept:application/json" localhost:8083/connectors/
["inventory-connector"]
Review the connector’s tasks:
$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
You should see a response similar to the following (formatted for readability):
HTTP/1.1 200 OK
Date: Thu, 06 Feb 2020 22:12:03 GMT
Content-Type: application/json
Content-Length: 531
Server: Jetty(9.4.20.v20190813)
{
"name": "inventory-connector",
...
"tasks": [
{
"connector": "inventory-connector", (1)
"task": 0
}
]
}
1 The connector is running a single task (task 0
) to do its work. The connector only supports a single task, because MySQL records all of its activities in one sequentialbinlog
. This means the connector only needs one reader to get a consistent, ordered view of all of the events.
Watching the connector start
When you register a connector, it generates a large amount of log output in the Kafka Connect container. By reviewing this output, you can better understand the process that the connector goes through from the time it is created until it begins reading the MySQL server’s binlog
.
After registering the inventory-connector
connector, you can review the log output in the Kafka Connect container (connect
) to track the connector’s status.
The first few lines show the connector (inventory-connector
) being created and started:
...
2021-11-30 01:38:44,223 INFO || [Worker clientId=connect-1, groupId=1] Tasks [inventory-connector-0] configs updated [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,224 INFO || [Worker clientId=connect-1, groupId=1] Handling task config update by restarting tasks [] [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,224 INFO || [Worker clientId=connect-1, groupId=1] Rebalance started [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,224 INFO || [Worker clientId=connect-1, groupId=1] (Re-)joining group [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,227 INFO || [Worker clientId=connect-1, groupId=1] Successfully joined group with generation Generation{generationId=3, memberId='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', protocol='sessioned'} [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,230 INFO || [Worker clientId=connect-1, groupId=1] Successfully synced group in generation Generation{generationId=3, memberId='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', protocol='sessioned'} [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,231 INFO || [Worker clientId=connect-1, groupId=1] Joined group at generation 3 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', leaderUrl='http://172.17.0.7:8083/', offset=4, connectorIds=[inventory-connector], taskIds=[inventory-connector-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,232 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset 4 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,232 INFO || [Worker clientId=connect-1, groupId=1] Starting task inventory-connector-0 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
...
Further down, you should see output like the following from the connector:
...
2021-11-30 01:38:44,406 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,406 INFO || Kafka commitId: 8cb0a5e9d3441962 [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,407 INFO || Kafka startTimeMs: 1638236324406 [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,437 INFO || Database schema history topic '(name=schemahistory.inventory, numPartitions=1, replicationFactor=1, replicasAssignments=null, configs={cleanup.policy=delete, retention.ms=9223372036854775807, retention.bytes=-1})' created [io.debezium.storage.kafka.history.KafkaSchemaHistory]
2021-11-30 01:38:44,497 INFO || App info kafka.admin.client for dbserver1-schemahistory unregistered [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,499 INFO || Metrics scheduler closed [org.apache.kafka.common.metrics.Metrics]
2021-11-30 01:38:44,499 INFO || Closing reporter org.apache.kafka.common.metrics.JmxReporter [org.apache.kafka.common.metrics.Metrics]
2021-11-30 01:38:44,499 INFO || Metrics reporters closed [org.apache.kafka.common.metrics.Metrics]
2021-11-30 01:38:44,499 INFO || Reconnecting after finishing schema recovery [io.debezium.connector.mysql.MySqlConnectorTask]
2021-11-30 01:38:44,524 INFO || Requested thread factory for connector MySqlConnector, id = dbserver1 named = change-event-source-coordinator [io.debezium.util.Threads]
2021-11-30 01:38:44,525 INFO || Creating thread debezium-mysqlconnector-dbserver1-change-event-source-coordinator [io.debezium.util.Threads]
2021-11-30 01:38:44,526 INFO || WorkerSourceTask{id=inventory-connector-0} Source task finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask]
2021-11-30 01:38:44,529 INFO MySQL|dbserver1|snapshot Metrics registered [io.debezium.pipeline.ChangeEventSourceCoordinator]
2021-11-30 01:38:44,529 INFO MySQL|dbserver1|snapshot Context created [io.debezium.pipeline.ChangeEventSourceCoordinator]
2021-11-30 01:38:44,534 INFO MySQL|dbserver1|snapshot No previous offset has been found [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,534 INFO MySQL|dbserver1|snapshot According to the connector configuration both schema and data will be snapshotted [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,534 INFO MySQL|dbserver1|snapshot Snapshot step 1 - Preparing [io.debezium.relational.RelationalSnapshotChangeEventSource]
...
The Debezium log output uses mapped diagnostic contexts (MDC) to provide thread-specific information in the log output, and make it easier to understand what is happening in the multi-threaded Kafka Connect service. This includes the connector type (MySQL
in the above log messages), the logical name of the connector (dbserver1
above), and the connector’s activity (task
, snapshot
and binlog
).
In the log output above, the first few lines involve the task
activity of the connector, and report some bookkeeping information (in this case, that the connector was started with no prior offset). The next three lines involve the snapshot
activity of the connector, and report that a snapshot is being started using the debezium
MySQL user along with the MySQL grants associated with that user.
If the connector is not able to connect, or if it does not see any tables or the |
Next, the connector reports the steps that make up the snapshot operation:
...
2021-11-30 01:38:44,534 INFO MySQL|dbserver1|snapshot Snapshot step 1 - Preparing [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,535 INFO MySQL|dbserver1|snapshot Snapshot step 2 - Determining captured tables [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,535 INFO MySQL|dbserver1|snapshot Read list of available databases [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,537 INFO MySQL|dbserver1|snapshot list of available databases is: [information_schema, inventory, mysql, performance_schema, sys] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,537 INFO MySQL|dbserver1|snapshot Read list of available tables in each database [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,548 INFO MySQL|dbserver1|snapshot snapshot continuing with database(s): [inventory] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,551 INFO MySQL|dbserver1|snapshot Snapshot step 3 - Locking captured tables [inventory.addresses, inventory.customers, inventory.geom, inventory.orders, inventory.products, inventory.products_on_hand] [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,552 INFO MySQL|dbserver1|snapshot Flush and obtain global read lock to prevent writes to database [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,557 INFO MySQL|dbserver1|snapshot Snapshot step 4 - Determining snapshot offset [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,560 INFO MySQL|dbserver1|snapshot Read binlog position of MySQL primary server [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,562 INFO MySQL|dbserver1|snapshot using binlog 'mysql-bin.000003' at position '156' and gtid '' [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,562 INFO MySQL|dbserver1|snapshot Snapshot step 5 - Reading structure of captured tables [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,562 INFO MySQL|dbserver1|snapshot All eligible tables schema should be captured, capturing: [inventory.addresses, inventory.customers, inventory.geom, inventory.orders, inventory.products, inventory.products_on_hand] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,058 INFO MySQL|dbserver1|snapshot Reading structure of database 'inventory' [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,187 INFO MySQL|dbserver1|snapshot Snapshot step 6 - Persisting schema history [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,273 INFO MySQL|dbserver1|snapshot Releasing global read lock to enable MySQL writes [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,274 INFO MySQL|dbserver1|snapshot Writes to MySQL tables prevented for a total of 00:00:00.717 [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,274 INFO MySQL|dbserver1|snapshot Snapshot step 7 - Snapshotting data [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,275 INFO MySQL|dbserver1|snapshot Snapshotting contents of 6 tables while still in transaction [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,275 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.addresses' (1 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,276 INFO MySQL|dbserver1|snapshot For table 'inventory.addresses' using select statement: 'SELECT `id`, `customer_id`, `street`, `city`, `state`, `zip`, `type` FROM `inventory`.`addresses`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,295 INFO MySQL|dbserver1|snapshot Finished exporting 7 records for table 'inventory.addresses'; total duration '00:00:00.02' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,296 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.customers' (2 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,296 INFO MySQL|dbserver1|snapshot For table 'inventory.customers' using select statement: 'SELECT `id`, `first_name`, `last_name`, `email` FROM `inventory`.`customers`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,304 INFO MySQL|dbserver1|snapshot Finished exporting 4 records for table 'inventory.customers'; total duration '00:00:00.008' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,304 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.geom' (3 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,305 INFO MySQL|dbserver1|snapshot For table 'inventory.geom' using select statement: 'SELECT `id`, `g`, `h` FROM `inventory`.`geom`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,316 INFO MySQL|dbserver1|snapshot Finished exporting 3 records for table 'inventory.geom'; total duration '00:00:00.011' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,316 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.orders' (4 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,316 INFO MySQL|dbserver1|snapshot For table 'inventory.orders' using select statement: 'SELECT `order_number`, `order_date`, `purchaser`, `quantity`, `product_id` FROM `inventory`.`orders`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,325 INFO MySQL|dbserver1|snapshot Finished exporting 4 records for table 'inventory.orders'; total duration '00:00:00.008' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,325 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.products' (5 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,325 INFO MySQL|dbserver1|snapshot For table 'inventory.products' using select statement: 'SELECT `id`, `name`, `description`, `weight` FROM `inventory`.`products`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,343 INFO MySQL|dbserver1|snapshot Finished exporting 9 records for table 'inventory.products'; total duration '00:00:00.017' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,344 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.products_on_hand' (6 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,344 INFO MySQL|dbserver1|snapshot For table 'inventory.products_on_hand' using select statement: 'SELECT `product_id`, `quantity` FROM `inventory`.`products_on_hand`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,353 INFO MySQL|dbserver1|snapshot Finished exporting 9 records for table 'inventory.products_on_hand'; total duration '00:00:00.009' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,355 INFO MySQL|dbserver1|snapshot Snapshot - Final stage [io.debezium.pipeline.source.AbstractSnapshotChangeEventSource]
2021-11-30 01:38:45,356 INFO MySQL|dbserver1|snapshot Snapshot ended with SnapshotResult [status=COMPLETED, offset=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=156, currentRowNumber=0, serverId=0, sourceTime=2021-11-30T01:38:45.352Z, threadId=-1, currentQuery=null, tableIds=[inventory.products_on_hand], databaseName=inventory], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=156, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]] [io.debezium.pipeline.ChangeEventSourceCoordinator]
...
Each of these steps reports what the connector is doing to perform the consistent snapshot. For example, Step 6 involves reverse engineering the DDL create
statements for the tables that are being captured and the global write lock just 1 second after acquiring it, and Step 7 reads all of the rows in each of the tables and reports the time taken and number of rows found. In this case, the connector completed its consistent snapshot in just under 1 second.
The snapshot process will take longer with your databases, but the connector outputs enough log messages that you can track what it is working on, even when the tables have a large number of rows. And although an exclusive write lock is used at the beginning of the snapshot process, it should not last very long even for large databases. This is because the lock is released before any data is copied. For more information, see the MySQL connector documentation. |
Next, Kafka Connect reports some “errors”. However, you can safely ignore these warnings: these messages just mean that new Kafka topics were created and that Kafka had to assign a new leader for each one:
...
2021-11-30 01:38:45,555 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 3 : {dbserver1=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:45,691 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 9 : {dbserver1.inventory.addresses=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:45,813 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 13 : {dbserver1.inventory.customers=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:45,927 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 18 : {dbserver1.inventory.geom=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:46,043 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 22 : {dbserver1.inventory.orders=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:46,153 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 26 : {dbserver1.inventory.products=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:46,269 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 31 : {dbserver1.inventory.products_on_hand=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
...
Finally, the log output shows that the connector has transitioned from its snapshot mode into continuously reading the MySQL server’s binlog
:
...
2021-11-30 01:38:45,362 INFO MySQL|dbserver1|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator]
...
Nov 30, 2021 1:38:45 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to mysql:3306 at mysql-bin.000003/156 (sid:184054, cid:13)
2021-11-30 01:38:45,392 INFO MySQL|dbserver1|binlog Connected to MySQL binlog at mysql:3306, starting at MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=156, currentRowNumber=0, serverId=0, sourceTime=2021-11-30T01:38:45.352Z, threadId=-1, currentQuery=null, tableIds=[inventory.products_on_hand], databaseName=inventory], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=156, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]] [io.debezium.connector.mysql.MySqlStreamingChangeEventSource]
2021-11-30 01:38:45,392 INFO MySQL|dbserver1|streaming Waiting for keepalive thread to start [io.debezium.connector.mysql.MySqlStreamingChangeEventSource]
2021-11-30 01:38:45,393 INFO MySQL|dbserver1|binlog Creating thread debezium-mysqlconnector-dbserver1-binlog-client [io.debezium.util.Threads]
...
Viewing change events
After deploying the Debezium MySQL connector, it starts monitoring the inventory
database for data change events.
When you watched the connector start up, you saw that events were written to the following topics with the dbserver1
prefix (the name of the connector):
dbserver1
The schema change topic to which all of the DDL statements are written.
dbserver1.inventory.products
Captures change events for the products
table in the inventory
database.
dbserver1.inventory.products_on_hand
Captures change events for the products_on_hand
table in the inventory
database.
dbserver1.inventory.customers
Captures change events for the customers
table in the inventory
database.
dbserver1.inventory.orders
Captures change events for the orders
table in the inventory
database.
For this tutorial, you will explore the dbserver1.inventory.customers
topic. In this topic, you will see different types of change events to see how the connector captured them:
Viewing a create event
By viewing the dbserver1.inventory.customers
topic, you can see how the MySQL connector captured create events in the inventory
database. In this case, the create events capture new customers being added to the database.
Procedure
Open a new terminal, and use it to start the
watch-topic
utility to watch thedbserver1.inventory.customers
topic from the beginning of the topic.The
watch-topic
utility is very simple and limited in functionality. It is not intended to be used by an application to consume events. In that scenario, you would instead use Kafka consumers and the applicable consumer libraries that offer full functionality and flexibility.This command runs the
watch-topic
utility in a new container using the 2.4 version of thedebezium/kafka
image:$ docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:2.4 watch-topic -a -k dbserver1.inventory.customers
-a
Watches all events since the topic was created. Without this option,
watch-topic
would only show the events recorded after you start watching.-k
Specifies that the output should include the event’s key. In this case, this contains the row’s primary key.
If you use Podman, run the following command:
$ sudo podman run -it —rm —name watcher —pod dbz quay.io/debezium/kafka:2.4 watch-topic -a -k dbserver1.inventory.customers
The
watch-topic
utility returns the event records from thecustomers
table. There are four events, one for each row in the table. Each event is formatted in JSON, because that is how you configured the Kafka Connect service. There are two JSON documents for each event: one for the key, and one for the value.You should see output similar to the following:
Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.7:9092
Using KAFKA_BROKER=172.17.0.3:9092
Contents of topic dbserver1.inventory.customers:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}}
...
This utility keeps watching the topic, so any new events will automatically appear as long as the utility is running.
For the last event, review the details of the key.
Here are the details of the key of the last event (formatted for readability):
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"dbserver1.inventory.customers.Key"
},
"payload":{
"id":1004
}
}
The event has two parts: a
schema
and apayload
. Theschema
contains a Kafka Connect schema describing what is in the payload. In this case, the payload is astruct
nameddbserver1.inventory.customers.Key
that is not optional and has one required field (id
of typeint32
).The
payload
has a singleid
field, with a value of1004
.By reviewing the key of the event, you can see that this event applies to the row in the
inventory.customers
table whoseid
primary key column had a value of1004
.Review the details of the same event’s value.
The event’s value shows that the row was created, and describes what it contains (in this case, the
id
,first_name
,last_name
, andemail
of the inserted row).Here are the details of the value of the last event (formatted for readability):
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "int64",
"optional": false,
"field": "ts_sec"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "boolean",
"optional": true,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "2.5.4.Final",
"name": "dbserver1",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"snapshot": true,
"thread": null,
"db": "inventory",
"table": "customers"
},
"op": "r",
"ts_ms": 1486500577691
}
}
This portion of the event is much longer, but like the event’s key, it also has a
schema
and apayload
. Theschema
contains a Kafka Connect schema nameddbserver1.inventory.customers.Envelope
(version 1) that can contain five fields:op
A required field that contains a string value describing the type of operation. Values for the MySQL connector are
c
for create (or insert),u
for update,d
for delete, andr
for read (in the case of a snapshot).before
An optional field that, if present, contains the state of the row before the event occurred. The structure will be described by the
dbserver1.inventory.customers.Value
Kafka Connect schema, which thedbserver1
connector uses for all rows in theinventory.customers
table.after
An optional field that, if present, contains the state of the row after the event occurred. The structure is described by the same
dbserver1.inventory.customers.Value
Kafka Connect schema used inbefore
.source
A required field that contains a structure describing the source metadata for the event, which in the case of MySQL, contains several fields: the connector name, the name of the
binlog
file where the event was recorded, the position in thatbinlog
file where the event appeared, the row within the event (if there is more than one), the names of the affected database and table, the MySQL thread ID that made the change, whether this event was part of a snapshot, and, if available, the MySQL server ID, and the timestamp in seconds.ts_ms
An optional field that, if present, contains the time (using the system clock in the JVM running the Kafka Connect task) at which the connector processed the event.
The JSON representations of the events are much longer than the rows they describe. This is because, with every event key and value, Kafka Connect ships the schema that describes the payload. Over time, this structure may change. However, having the schemas for the key and the value in the event itself makes it much easier for consuming applications to understand the messages, especially as they evolve over time.
The Debezium MySQL connector constructs these schemas based upon the structure of the database tables. If you use DDL statements to alter the table definitions in the MySQL databases, the connector reads these DDL statements and updates its Kafka Connect schemas. This is the only way that each event is structured exactly like the table from where it originated at the time the event occurred. However, the Kafka topic containing all of the events for a single table might have events that correspond to each state of the table definition.
The JSON converter includes the key and value schemas in every message, so it does produce very verbose events. Alternatively, you can use Apache Avro as a serialization format, which results in far smaller event messages. This is because it transforms each Kafka Connect schema into an Avro schema and stores the Avro schemas in a separate Schema Registry service. Thus, when the Avro converter serializes an event message, it places only a unique identifier for the schema along with an Avro-encoded binary representation of the value. As a result, the serialized messages that are transferred over the wire and stored in Kafka are far smaller than what you have seen here. In fact, the Avro Converter is able to use Avro schema evolution techniques to maintain the history of each schema in the Schema Registry.
Compare the event’s key and value schemas to the state of the
inventory
database. In the terminal that is running the MySQL command line client, run the following statement:mysql> SELECT * FROM customers;
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)
This shows that the event records you reviewed match the records in the database.
Updating the database and viewing the update event
Now that you have seen how the Debezium MySQL connector captured the create events in the inventory
database, you will now change one of the records and see how the connector captures it.
By completing this procedure, you will learn how to find details about what changed in a database commit, and how you can compare change events to determine when the change occurred in relation to other changes.
Procedure
In the terminal that is running the MySQL command line client, run the following statement:
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
Query OK, 1 row affected (0.05 sec)
Rows matched: 1 Changed: 1 Warnings: 0
View the updated
customers
table:mysql> SELECT * FROM customers;
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne Marie | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)
Switch to the terminal running
watch-topic
to see a new fifth event.By changing a record in the
customers
table, the Debezium MySQL connector generated a new event. You should see two new JSON documents: one for the event’s key, and one for the new event’s value.Here are the details of the key for the update event (formatted for readability):
{
"schema": {
"type": "struct",
"name": "dbserver1.inventory.customers.Key"
"optional": false,
"fields": [
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": {
"id": 1004
}
}
This key is the same as the key for the previous events.
Here is that new event’s value. There are no changes in the
schema
section, so only thepayload
section is shown (formatted for readability):{
"schema": {...},
"payload": {
"before": { (1)
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": { (2)
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { (3)
"name": "2.5.4.Final",
"name": "dbserver1",
"server_id": 223344,
"ts_sec": 1486501486,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 364,
"row": 0,
"snapshot": null,
"thread": 3,
"db": "inventory",
"table": "customers"
},
"op": "u", (4)
"ts_ms": 1486501486308 (5)
}
}
Table 1. Descriptions of fields in the payload of an update
event valueItem Description 1
The
before
field shows the values present in the row before the database commit. The originalfirst_name
value isAnne
.2
The
after
field shows the state of the row after the change event. Thefirst_name
value is nowAnne Marie
.3
The
source
field structure has many of the same values as before, except that thets_sec
andpos
fields have changed (thefile
might have changed in other circumstances).4
The
op
field value is nowu
, signifying that this row changed because of an update.5
The
ts_ms
field shows a timestamp that indicates when Debezium processed this event.By viewing the
payload
section, you can learn several important things about the update event:By comparing the
before
andafter
structures, you can determine what actually changed in the affected row because of the commit.By reviewing the
source
structure, you can find information about MySQL’s record of the change (providing traceability).By comparing the
payload
section of an event to other events in the same topic (or a different topic), you can determine whether the event occurred before, after, or as part of the same MySQL commit as another event.
Deleting a record in the database and viewing the delete event
Now that you have seen how the Debezium MySQL connector captured the create and update events in the inventory
database, you will now delete one of the records and see how the connector captures it.
By completing this procedure, you will learn how to find details about delete events, and how Kafka uses log compaction to reduce the number of delete events while still enabling consumers to get all of the events.
Procedure
In the terminal that is running the MySQL command line client, run the following statement:
mysql> DELETE FROM customers WHERE id=1004;
Query OK, 1 row affected (0.00 sec)
If the above command fails with a foreign key constraint violation, then you must remove the reference of the customer address from the addresses table using the following statement:
mysql> DELETE FROM addresses WHERE customer_id=1004;
Switch to the terminal running
watch-topic
to see two new events.By deleting a row in the
customers
table, the Debezium MySQL connector generated two new events.Review the key and value for the first new event.
Here are the details of the key for the first new event (formatted for readability):
{
"schema": {
"type": "struct",
"name": "dbserver1.inventory.customers.Key"
"optional": false,
"fields": [
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": {
"id": 1004
}
}
This key is the same as the key in the previous two events you looked at.
Here is the value of the first new event (formatted for readability):
{
"schema": {...},
"payload": {
"before": { (1)
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": null, (2)
"source": { (3)
"name": "2.5.4.Final",
"name": "dbserver1",
"server_id": 223344,
"ts_sec": 1486501558,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 725,
"row": 0,
"snapshot": null,
"thread": 3,
"db": "inventory",
"table": "customers"
},
"op": "d", (4)
"ts_ms": 1486501558315 (5)
}
}
1 The before
field now has the state of the row that was deleted with the database commit.2 The after
field isnull
because the row no longer exists.3 The source
field structure has many of the same values as before, except thets_sec
andpos
fields have changed (thefile
might have changed in other circumstances).4 The op
field value is nowd
, signifying that this row was deleted.5 The ts_ms
field shows the time stamp for when Debezium processes this event.Thus, this event provides a consumer with the information that it needs to process the removal of the row. The old values are also provided, because some consumers might require them to properly handle the removal.
Review the key and value for the second new event.
Here is the key for the second new event (formatted for readability):
{
"schema": {
"type": "struct",
"name": "dbserver1.inventory.customers.Key"
"optional": false,
"fields": [
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": {
"id": 1004
}
}
Once again, this key is exactly the same key as in the previous three events you looked at.
Here is the value of that same event (formatted for readability):
{
"schema": null,
"payload": null
}
If Kafka is set up to be log compacted, it will remove older messages from the topic if there is at least one message later in the topic with same key. This last event is called a tombstone event, because it has a key and an empty value. This means that Kafka will remove all prior messages with the same key. Even though the prior messages will be removed, the tombstone event means that consumers can still read the topic from the beginning and not miss any events.
Restarting the Kafka Connect service
Now that you have seen how the Debezium MySQL connector captures create, update, and delete events, you will now see how it can capture change events even when it is not running.
The Kafka Connect service automatically manages tasks for its registered connectors. Therefore, if it goes offline, when it restarts, it will start any non-running tasks. This means that even if Debezium is not running, it can still report changes in a database.
In this procedure, you will stop Kafka Connect, change some data in the database, and then restart Kafka Connect to see the change events.
Procedure
Open a new terminal and use it to stop the
connect
container that is running the Kafka Connect service:$ docker stop connect
The
connect
container is stopped, and the Kafka Connect service gracefully shuts down.Because you ran the container with the
--rm
option, Docker removes the container once it stops.While the service is down, switch to the terminal for the MySQL command line client, and add a few records:
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
mysql> INSERT INTO customers VALUES (default, "Kenneth", "Anderson", "kander@acme.com");
The records are added to the database. However, because Kafka Connect is not running,
watch-topic
does not record any updates.In a production system, you would have enough brokers to handle the producers and consumers, and to maintain a minimum number of in-sync replicas for each topic. Therefore, if enough brokers failed such that there were no longer the minimum number of ISRs, Kafka would become unavailable. In this scenario, producers (like the Debezium connectors) and consumers will wait for the Kafka cluster or network to recover. This means that, temporarily, your consumers might not see any change events as data is changed in the databases. This is because no change events are being produced. As soon as the Kafka cluster is restarted or the network recovers, Debezium will resume producing change events, and your consumers will resume consuming events where they left off.
Open a new terminal, and use it to restart the Kafka Connect service in a container.
This command starts Kafka Connect using the same options you used when you initially started it:
$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:2.4
The Kafka Connect service starts, connects to Kafka, reads the previous service’s configuration, and starts the registered connectors that will resume where they last left off.
Here are the last few lines from this restarted service:
...
2021-11-30 01:49:07,938 INFO || Get all known binlogs from MySQL [io.debezium.connector.mysql.MySqlConnection]
2021-11-30 01:49:07,941 INFO || MySQL has the binlog file 'mysql-bin.000003' required by the connector [io.debezium.connector.mysql.MySqlConnectorTask]
2021-11-30 01:49:07,967 INFO || Requested thread factory for connector MySqlConnector, id = dbserver1 named = change-event-source-coordinator [io.debezium.util.Threads]
2021-11-30 01:49:07,968 INFO || Creating thread debezium-mysqlconnector-dbserver1-change-event-source-coordinator [io.debezium.util.Threads]
2021-11-30 01:49:07,968 INFO || WorkerSourceTask{id=inventory-connector-0} Source task finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask]
2021-11-30 01:49:07,971 INFO MySQL|dbserver1|snapshot Metrics registered [io.debezium.pipeline.ChangeEventSourceCoordinator]
2021-11-30 01:49:07,971 INFO MySQL|dbserver1|snapshot Context created [io.debezium.pipeline.ChangeEventSourceCoordinator]
2021-11-30 01:49:07,976 INFO MySQL|dbserver1|snapshot A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted. [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:49:07,977 INFO MySQL|dbserver1|snapshot Snapshot ended with SnapshotResult [status=SKIPPED, offset=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=156, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null], snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=156, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]] [io.debezium.pipeline.ChangeEventSourceCoordinator]
2021-11-30 01:49:07,981 INFO MySQL|dbserver1|streaming Requested thread factory for connector MySqlConnector, id = dbserver1 named = binlog-client [io.debezium.util.Threads]
2021-11-30 01:49:07,983 INFO MySQL|dbserver1|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator]
...
These lines show that the service found the offsets previously recorded by the last task before it was shut down, connected to the MySQL database, started reading the
binlog
from that position, and generated events from any changes in the MySQL database since that point in time.Switch to the terminal running
watch-topic
to see events for the two new records you created when Kafka Connect was offline:{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"Sarah","last_name":"Thompson","email":"kitt@acme.com"},"source":{"version":"2.5.4.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635153,"gtid":null,"file":"mysql-bin.000003","pos":1046,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181455}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1006}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1006,"first_name":"Kenneth","last_name":"Anderson","email":"kander@acme.com"},"source":{"version":"2.5.4.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635160,"gtid":null,"file":"mysql-bin.000003","pos":1356,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181456}}
These events are create events that are similar to what you saw previously. As you see, Debezium still reports all of the changes in a database even when it is not running (as long as it is restarted before the MySQL database purges from its
binlog
the commits that were missed).
Cleaning up
After you are finished with the tutorial, you can use Docker to stop all of the running containers.
Procedure
Stop each of the containers:
$ docker stop mysqlterm watcher connect mysql kafka zookeeper
Docker stops each container. Because you used the
--rm
option when you started them, Docker also removes them.
If you use Podman, run the following command:
|
Verify that all of the processes have stopped and have been removed:
$ docker ps -a
If any of the processes are still running, stop them using
docker stop _<process-name>_
ordocker stop _<containerId>_
.
Next steps
After completing the tutorial, consider the following next steps:
Explore the tutorial further.
Use the MySQL command line client to add, modify, and remove rows in the database tables, and see the effect on the topics. You may need to run a separate
watch-topic
command for each topic. Keep in mind that you cannot remove a row that is referenced by a foreign key.Try running the tutorial with Debezium connectors for Postgres, MongoDB, SQL Server, and Oracle.
You can use the Docker Compose version of this tutorial located in the Debezium examples repository. Docker Compose files are provided for running the tutorial with MySQL, Postgres, MongoDB, SQL Server, and Oracle.