Deploying Debezium on OpenShift

Debezium Deployment

For setting up Kafka and Kafka Connect on OpenShift, a set of images provided by the Strimzi project can be used, which offers “Kafka as a Service”. It consists of enterprise grade configuration files and images that bring Kafka to OpenShift.

First, install the operators and templates for the Kafka broker and Kafka Connect into our OpenShift project:

  1. export STRIMZI_VERSION=0.13.0
  2. git clone -b $STRIMZI_VERSION https://github.com/strimzi/strimzi-kafka-operator
  3. cd strimzi-kafka-operator
  4. # We need to create security objects as part of installation so it is necessary to switch to admin user
  5. oc login -u system:admin
  6. oc create -f install/cluster-operator && oc create -f examples/templates/cluster-operator

Next, deploy a Kafka broker cluster and a Kafka Connect cluster and then create a Kafka Connect image with the Debezium connectors installed:

  1. # Deploy an ephemeral single instance Kafka broker
  2. oc process strimzi-ephemeral -p CLUSTER_NAME=broker -p ZOOKEEPER_NODE_COUNT=1 -p KAFKA_NODE_COUNT=1 -p KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -p KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 | oc apply -f -
  3. # Deploy a single instance of Kafka Connect with no plug-in installed
  4. oc process strimzi-connect-s2i -p CLUSTER_NAME=debezium -p KAFKA_CONNECT_BOOTSTRAP_SERVERS=broker-kafka-bootstrap:9092 -p KAFKA_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 -p KAFKA_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 -p KAFKA_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 -p KAFKA_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false -p KAFKA_CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false | oc apply -f -
  5. # Build a Debezium image
  6. export DEBEZIUM_VERSION=1.1.2.Final
  7. mkdir -p plugins && cd plugins && \
  8. for PLUGIN in {mongodb,mysql,postgres}; do \
  9. curl http://central.maven.org/maven2/io/debezium/debezium-connector-$PLUGIN/$DEBEZIUM_VERSION/debezium-connector-$PLUGIN-$DEBEZIUM_VERSION-plugin.tar.gz | tar xz; \
  10. done && \
  11. oc start-build debezium-connect --from-dir=. --follow && \
  12. cd .. && rm -rf plugins

After a while all parts should be up and running:

  1. oc get pods
  2. NAME READY STATUS RESTARTS AGE
  3. broker-entity-operator-5fb7bc8b9b-r86nz 3/3 Running 1 4m
  4. broker-kafka-0 2/2 Running 0 4m
  5. broker-zookeeper-0 2/2 Running 0 5m
  6. debezium-connect-3-4sdjr 1/1 Running 0 1m
  7. strimzi-cluster-operator-d77476b8f-rblqf 1/1 Running 0 5m

Alternatively, you can go to the “Pods” view of your OpenShift Web Console (https://myhost:8443/console/project/myproject/browse/pods) to confirm all pods are up and running:

openshift pods

Verifying the Deployment

Verify whether the deployment is correct by emulating the Debezium Tutorial in the OpenShift environment.

Begin by starting a MySQL server instance that contains some example tables:

  1. # Deploy pre-populated MySQL instance
  2. oc new-app --name=mysql debezium/example-mysql:1.1
  3. # Configure credentials for the database
  4. oc set env dc/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpw

A new pod with MySQL server should be up and running:

  1. oc get pods
  2. NAME READY STATUS RESTARTS AGE
  3. ...
  4. mysql-1-4503l 1/1 Running 0 2s
  5. mysql-1-deploy 1/1 Running 0 4s
  6. ...

Register the Debezium MySQL connector to run against the deployed MySQL instance:

  1. oc exec -i -c kafka broker-kafka-0 -- curl -X POST \
  2. -H "Accept:application/json" \
  3. -H "Content-Type:application/json" \
  4. http://debezium-connect-api:8083/connectors -d @- <<'EOF'
  5. {
  6. "name": "inventory-connector",
  7. "config": {
  8. "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  9. "tasks.max": "1",
  10. "database.hostname": "mysql",
  11. "database.port": "3306",
  12. "database.user": "debezium",
  13. "database.password": "dbz",
  14. "database.server.id": "184054",
  15. "database.server.name": "dbserver1",
  16. "database.whitelist": "inventory",
  17. "database.history.kafka.bootstrap.servers": "broker-kafka-bootstrap:9092",
  18. "database.history.kafka.topic": "schema-changes.inventory"
  19. }
  20. }
  21. EOF

Kafka Connect’s log file should contain messages regarding execution of the initial snapshot:

  1. oc logs $(oc get pods -o name -l strimzi.io/name=debezium-connect)

Now you can read change events for the customers table from the corresponding Kafka topic:

  1. oc exec -it broker-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
  2. --bootstrap-server localhost:9092 \
  3. --from-beginning \
  4. --property print.key=true \
  5. --topic dbserver1.inventory.customers

You should see an output like the following (formatted for the sake of readability):

  1. # Message 1
  2. {
  3. "id": 1001
  4. }
  5. # Message 1 Value
  6. {
  7. "before": null,
  8. "after": {
  9. "id": 1001,
  10. "first_name": "Sally",
  11. "last_name": "Thomas",
  12. "email": "sally.thomas@acme.com"
  13. },
  14. "source": {
  15. "name": "dbserver1",
  16. "server_id": 0,
  17. "ts_sec": 0,
  18. "gtid": null,
  19. "file": "mysql-bin.000003",
  20. "pos": 154,
  21. "row": 0,
  22. "snapshot": true,
  23. "thread": null,
  24. "db": "inventory",
  25. "table": "customers"
  26. },
  27. "op": "c",
  28. "ts_ms": 1509530901446
  29. }
  30. # Message 2 Key
  31. {
  32. "id": 1002
  33. }
  34. # Message 2 Value
  35. {
  36. "before": null,
  37. "after": {
  38. "id": 1002,
  39. "first_name": "George",
  40. "last_name": "Bailey",
  41. "email": "gbailey@foobar.com"
  42. },
  43. "source": {
  44. "name": "dbserver1",
  45. "server_id": 0,
  46. "ts_sec": 0,
  47. "gtid": null,
  48. "file": "mysql-bin.000003",
  49. "pos": 154,
  50. "row": 0,
  51. "snapshot": true,
  52. "thread": null,
  53. "db": "inventory",
  54. "table": "customers"
  55. },
  56. "op": "c",
  57. "ts_ms": 1509530901446
  58. }
  59. ...

Finally, the next example modifies some records in the customers table of the database:

  1. oc exec -it $(oc get pods -o custom-columns=NAME:.metadata.name --no-headers -l app=mysql) \
  2. -- bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
  3. # For example, run UPDATE customers SET email="sally.thomas@example.com" WHERE ID = 1001;

You should now see additional change messages in the consumer started before.

If you have any questions or requests related to running Debezium on OpenShift, let us know via our user group or in the Debezium developer’s chat.