Debezium Server

Installation

To install the server download and unpack the server distribution archive:

A directory named debezium-server will be created with these contents:

  1. debezium-server/
  2. |-- CHANGELOG.md
  3. |-- conf
  4. |-- CONTRIBUTE.md
  5. |-- COPYRIGHT.txt
  6. |-- debezium-server-2.0.0.Final-runner.jar
  7. |-- lib
  8. |-- LICENSE-3rd-PARTIES.txt
  9. |-- LICENSE.txt
  10. |-- README.md
  11. `-- run.sh

The server is started using run.sh script, dependencies are stored in the lib directory, and the directory conf contains configuration files.

In case of using the Oracle connector you will have to add to the lib directory the ORACLE JDBC driver (if using XStream also the XStream API files), explained here: Obtaining the Oracle JDBC driver and XStream API files

Configuration

Debezium Server uses MicroProfile Configuration for configuration. This means that the application can be configured from disparate sources like configuration files, environment variables, system properties etc.

The main configuration file is conf/application.properties. There are multiple sections configured:

  • debezium.source is for source connector configuration; each instance of Debezium Server runs exactly one connector

  • debezium.sink is for the sink system configuration

  • debezium.format is for the output serialization format configuration

  • debezium.transforms is for the configuration of message transformations

An example configuration file can look like so:

  1. debezium.sink.type=kinesis
  2. debezium.sink.kinesis.region=eu-central-1
  3. debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
  4. debezium.source.offset.storage.file.filename=data/offsets.dat
  5. debezium.source.offset.flush.interval.ms=0
  6. debezium.source.database.hostname=localhost
  7. debezium.source.database.port=5432
  8. debezium.source.database.user=postgres
  9. debezium.source.database.password=postgres
  10. debezium.source.database.dbname=postgres
  11. debezium.source.topic.prefix=tutorial
  12. debezium.source.schema.include.list=inventory

When the server is started it generates a seqeunce of log messages like this:

  1. __ ____ __ _____ ___ __ ____ ______
  2. --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
  3. -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
  4. --\___\_\____/_/ |_/_/|_/_/|_|\____/___/
  5. 2020-05-15 11:33:12,189 INFO [io.deb.ser.kin.KinesisChangeConsumer] (main) Using 'io.debezium.server.kinesis.KinesisChangeConsumer$$Lambda$119/0x0000000840130c40@f58853c' stream name mapper
  6. 2020-05-15 11:33:12,628 INFO [io.deb.ser.kin.KinesisChangeConsumer] (main) Using default KinesisClient 'software.amazon.awssdk.services.kinesis.DefaultKinesisClient@d1f74b8'
  7. 2020-05-15 11:33:12,628 INFO [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.kinesis.KinesisChangeConsumer' instantiated
  8. 2020-05-15 11:33:12,754 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
  9. converter.type = key
  10. decimal.format = BASE64
  11. schemas.cache.size = 1000
  12. schemas.enable = true
  13. 2020-05-15 11:33:12,757 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
  14. converter.type = value
  15. decimal.format = BASE64
  16. schemas.cache.size = 1000
  17. schemas.enable = false
  18. 2020-05-15 11:33:12,763 INFO [io.deb.emb.EmbeddedEngine$EmbeddedConfig] (main) EmbeddedConfig values:
  19. access.control.allow.methods =
  20. access.control.allow.origin =
  21. admin.listeners = null
  22. bootstrap.servers = [localhost:9092]
  23. client.dns.lookup = default
  24. config.providers = []
  25. connector.client.config.override.policy = None
  26. header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
  27. internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
  28. internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
  29. key.converter = class org.apache.kafka.connect.json.JsonConverter
  30. listeners = null
  31. metric.reporters = []
  32. metrics.num.samples = 2
  33. metrics.recording.level = INFO
  34. metrics.sample.window.ms = 30000
  35. offset.flush.interval.ms = 0
  36. offset.flush.timeout.ms = 5000
  37. offset.storage.file.filename = data/offsets.dat
  38. offset.storage.partitions = null
  39. offset.storage.replication.factor = null
  40. offset.storage.topic =
  41. plugin.path = null
  42. rest.advertised.host.name = null
  43. rest.advertised.listener = null
  44. rest.advertised.port = null
  45. rest.extension.classes = []
  46. rest.host.name = null
  47. rest.port = 8083
  48. ssl.client.auth = none
  49. task.shutdown.graceful.timeout.ms = 5000
  50. topic.tracking.allow.reset = true
  51. topic.tracking.enable = true
  52. value.converter = class org.apache.kafka.connect.json.JsonConverter
  53. 2020-05-15 11:33:12,763 INFO [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.key.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
  54. 2020-05-15 11:33:12,763 INFO [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.value.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
  55. 2020-05-15 11:33:12,765 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
  56. converter.type = key
  57. decimal.format = BASE64
  58. schemas.cache.size = 1000
  59. schemas.enable = true
  60. 2020-05-15 11:33:12,765 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
  61. converter.type = value
  62. decimal.format = BASE64
  63. schemas.cache.size = 1000
  64. schemas.enable = true
  65. 2020-05-15 11:33:12,767 INFO [io.deb.ser.DebeziumServer] (main) Engine executor started
  66. 2020-05-15 11:33:12,773 INFO [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Starting FileOffsetBackingStore with file data/offsets.dat
  67. 2020-05-15 11:33:12,835 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Starting PostgresConnectorTask with configuration:
  68. 2020-05-15 11:33:12,837 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) connector.class = io.debezium.connector.postgresql.PostgresConnector
  69. 2020-05-15 11:33:12,837 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) offset.flush.interval.ms = 0
  70. 2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.user = postgres
  71. 2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.dbname = postgres
  72. 2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) offset.storage.file.filename = data/offsets.dat
  73. 2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.hostname = localhost
  74. 2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.password = ********
  75. 2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) name = kinesis
  76. 2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) topic.prefix = tutorial
  77. 2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.port = 5432
  78. 2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) schema.include.list = inventory
  79. 2020-05-15 11:33:12,908 INFO [io.quarkus] (main) debezium-server 1.2.0-SNAPSHOT (powered by Quarkus 1.4.1.Final) started in 1.198s. Listening on: http://0.0.0.0:8080
  80. 2020-05-15 11:33:12,911 INFO [io.quarkus] (main) Profile prod activated.
  81. 2020-05-15 11:33:12,911 INFO [io.quarkus] (main) Installed features: [cdi, smallrye-health]

Source configuration

The source configuration uses the same configuration properties that are described on the specific connector documentation pages (just with debezium.source prefix), together with few more specific ones, necessary for running outside of Kafka Connect:

PropertyDefaultDescription

The name of the Java class implementing the source connector.

org.apache.kafka.connect.storage.FileOffsetBackingStore

Class to use for storing and retrieving offsets for non-Kafka deployments. To use Redis to store offsets, use io.debezium.server.redis.RedisOffsetBackingStore

If using a file offset store (default), the file in which connector offsets are stored for non-Kafka deployments.

Defines how frequently the offsets are flushed into the file.

(Optional) If using Redis to store offsets, an address, formatted as host:port, at which the Redis target streams are provided. If not supplied, will attempt to read debezium.sink.redis.address

(Optional) If using Redis to store offsets, a user name used to communicate with Redis. If the redis.address configuration is not supplied, and the redis.address is taken from the Redis sink, will attempt to load the value from debezium.sink.redis.user

(Optional) If using Redis to store offsets, a password (of respective user) used to communicate with Redis. A password must be set if a user is set. If the redis.address configuration is not supplied, and the redis.address is taken from the Redis sink, will attempt to load the value from debezium.sink.redis.password

(Optional) If using Redis to store offsets, whether or not to use SSL to communicate with Redis. If the redis.address configuration is not supplied, and the redis.address is taken from the Redis sink, will attempt to load the value from debezium.sink.redis.ssl.enabled. Default is ‘false’

(Optional) If using Redis to store offsets, define the hash key in redis. If the redis.key configuration is not supplied, and the default value is metadata:debezium:offsets

io.debezium.storage.kafka.history.KafkaSchemaHistory

Some of the connectors (e.g MySQL, SQL Server, Db2, Oracle) track the database schema evolution over time and stores this data in a database schema history. This is by default based on Kafka. There are also other options available

  • io.debezium.storage.file.history.FileSchemaHistory for non-Kafka deployments

  • io.debezium.relational.history.MemorySchemaHistory volatile store for test environments

  • io.debezium.server.redis.RedisSchemaHistory volatile store for test environments

The name and location of the file to which FileSchemaHistory persists its data.

The Redis host:port to connect to if using RedisSchemaHistory.

The Redis user to use if using RedisSchemaHistory.

The Redis password to use if using RedisSchemaHistory.

Use SSL connection if using RedisSchemaHistory.

The Redis key to use for storage if using RedisSchemaHistory. Default: metadata:debezium:schema_history

The initial delay in case of a connection retry to Redis if using RedisSchemaHistory. Default: 300 (ms)

The maximum delay in case of a connection retry to Redis if using RedisSchemaHistory. Default: 10000 (ms)

Connection timeout of Redis client if using RedisSchemaHistory. Default: 2000 (ms)

Socket timeout of Redis client if using RedisSchemaHistory. Default: 2000 (ms)

Format configuration

The message output format can be configured for both key and value separately. By default the output is in JSON format but an arbitrary implementation of Kafka Connect’s Converter can be used.

PropertyDefaultDescription

json

The name of the output format for key, one of json/avro/protobuf.

Configuration properties passed to the key converter.

json

The name of the output format for value, one of json/avro/protobuf/cloudevents.

Configuration properties passed to the value converter.

Transformation configuration

Before the messages are delivered to the sink, they can run through a sequence of transformations. The server supports single message transformations defined by Kafka Connect. The configuration will need to contain the list of transformations, implementation class for each transformation and configuration options for each of the transformations.

PropertyDefaultDescription [id=”debezium-transforms”]

debezium.transforms

The comma separated list of symbolic names of transformations.

debezium.transforms.<name>.type

The name of Java class implementing the transformation with name <name>.

debezium.transforms.<name>.*

Configuration properties passed to the transformation with name <name>.

Additional configuration

Debezium Server runs on top Quarkus framework. All configuration options exposed by Quarkus are available in Debezium Server too. The most frequent used are:

PropertyDefaultDescription [id=”debezium-quarkus-http-port”]

quarkus.http.port

8080

The port on which Debezim exposes Microprofile Health endpoint and other exposed status information.

quarkus.log.level

INFO

The default log level for every log category.

quarkus.log.console.json

true

Determine whether to enable the JSON console formatting extension, which disables “normal” console formatting.

JSON logging can be disabled by setting quarkus.log.console.json=false in the conf/application.properties file, as demonstrated in the conf/application.properties.example file.

Enabling message filtering

Debezium server provide filter STM capability, see Message Filtering for more details. However, for security reasons it’s not enabled by default and has to be explicitly enabled when Debezium server is started. To enable it, set environment variable ENABLE_DEBEZIUM_SCRIPTING to true. This will add debezium-scripting jar file and JSR 223 implementations (currently Groovy and graalvm.js) jar files into the server class path. These jar files are contained in opt_lib directory of the Debezium server distribution.

Sink configuration

Sink configuration is specific for each sink type.

The sink is selected by configuration property debezium.sink.type.

Amazon Kinesis

Amazon Kinesis is an implementation of data streaming system with support for stream sharding and other techniques for high scalability. Kinesis exposes a set of REST APIs and provides a (not-only) Java SDK that is used to implement the sink.

PropertyDefaultDescription

Must be set to kinesis.

A region name in which the Kinesis target streams are provided.

endpoint determined by aws sdk

(Optional) An endpoint url at which the Kinesis target streams are provided.

default

A credentials profile name used to communicate with Amazon API.

default

Kinesis does not support the notion of messages without key. So this string will be used as message key for messages from tables without primary key.

Injection points

The Kinesis sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

InterfaceCDI classifierDescription

@CustomConsumerBuilder

Custom configured instance of a KinesisClient used to send messages to target streams.

Custom implementation maps the planned destination (topic) name into a physical Kinesis stream name. By default the same name is used.

Google Cloud Pub/Sub

Google Cloud Pub/Sub is an implementation of messaging/eventing system designed for scalable batch and stream processing applications. Pub/Sub exposes a set of REST APIs and provides a (not-only) Java SDK that is used to implement the sink.

PropertyDefaultDescription

Must be set to pubsub.

system-wide default project id

A project name in which the target topics are created.

true

Pub/Sub can optionally use a message key to guarantee the delivery of the messages in the same order as were sent for messages with the same order key. This feature can be disabled.

default

Tables without primary key sends messages with null key. This is not supported by Pub/Sub so a surrogate key must be used.

100

The maximum amount of time to wait to reach element count or request bytes threshold before publishing outstanding messages to Pub/Sub.

100L

Once this many messages are queued, send all of the messages in a single call, even if the delay threshold hasn’t elapsed yet.

10000000L

Once the number of bytes in the batched request reaches this threshold, send all of the messages in a single call, even if neither the delay or message count thresholds have been exceeded yet.

false

When enabled, configures your publisher client with flow control to limit the rate of publish requests.

Long.MAX_VALUE

(Optional) If flow control enabled, the maxmium number of messages before messages are blocked from being published

Long.MAX_VALUE

(Optional) If flow control enabled, the maxmium number of bytes before messages are blocked from being published

60000

The total timeout for a call to publish (including retries) to Pub/Sub.

5

The initial amount of time to wait before retrying the request.

2.0

The previous wait time is multiplied by this multiplier to come up with the next wait time, until the max is reached.

Long.MAX_VALUE

The maximum amount of time to wait before retrying. i.e. after this value is reached, the wait time will not increase further by the multiplier.

10000

Controls the timeout for the initial Remote Procedure Call

2.0

The previous RPC timeout is multiplied by this multipler to come up with the next RPC timeout value, until the max is reached

10000

The max timeout for individual publish requests to Cloud Pub/Sub.

The address of the pubsub emulator. Only to be used in a dev or test environment with the pubsub emulator. Unless this value is set, debezium-server will connect to a cloud pubsub instance running in a gcp project, which is the desired behavior in a production environment.

Injection points

The Pub/Sub sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

InterfaceCDI classifierDescription

@CustomConsumerBuilder

A class that provides custom configured instance of a Publisher used to send messages to a dedicated topic.

Custom implementation maps the planned destination (topic) name into a physical Pub/Sub topic name. By default the same name is used.

Pub/Sub Lite

Google Cloud Pub/Sub Lite is a cost-effective alternative to Google Cloud Pub/Sub. Pub/Sub exposes a set of REST APIs and provides a (not-only) Java SDK that is used to implement the sink.

PropertyDefaultDescription

Must be set to pubsublite

system-wide default project id

A project name or project id in which the target topics are created.

Region where the topics are being created. Example us-east1-b.

true

Pub/Sub Lite can optionally use a message key to guarantee the delivery of the messages in with the same key to the same partition. This feature can be disabled.

default

Tables without primary key sends messages with null key. This is not supported by Pub/Sub Lite so a surrogate key must be used.

Injection points

The Pub/Sub Lite sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

InterfaceCDI classifierDescription

@CustomConsumerBuilder

A class that provides custom configured instance of a Publisher used to send messages to a dedicated topic.

Custom implementation maps the planned destination (topic) name into a physical Pub/Sub Lite topic name. By default the same name is used.

HTTP Client

The HTTP Client will stream changes to any HTTP Server for additional processing with the original design goal to have Debezium act as a Knative Event Source.

PropertyDefaultDescription

Must be set to http

The HTTP Server URL to stream events to. This can also be set by defining the K_SINK environment variable, which is used by the Knative source framework.

60000

The number of seconds to wait for a response from the server before timing out. (default of 60s)

5

The number retries before exception is thrown (default 5 times).

1000

The number of milliseconds to wait before another attempt to send record is made after failure (default of 1s).

Apache Pulsar

Apache Pulsar is high-performance, low-latency server for server-to-server messaging. Pulsar exposes a REST APIs and a native endpoint provides a (not-only) Java client that is used to implement the sink.

PropertyDefaultDescription

Must be set to pulsar.

The Pulsar module supports pass-through configuration. The client configuration properties are passed to the client with the prefix removed. At least serviceUrl must be provided.

The Pulsar module supports pass-through configuration. The message producer configuration properties are passed to the producer with the prefix removed. The topic is set by Debezium.

default

Tables without primary key sends messages with null key. This is not supported by Pulsar so a surrogate key must be used.

Injection points

The Pulsar sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

InterfaceCDI classifierDescription

Custom implementation maps the planned destination (topic) name into a physical Pulsar topic name. By default the same name is used.

Azure Event Hubs

Azure Event Hubs is a big data streaming platform and event ingestion service that can receive and process millions of events per second. Data sent to an event hub can be transformed and stored by using any real-time analytics provider or batching/storage adapters.

PropertyDefaultDescription

Must be set to eventhubs.

Connection string required to communicate with Event Hubs. The format is: Endpoint=sb://<NAMESPACE>/;SharedAccessKeyName=<ACCESS_KEY_NAME>;SharedAccessKey=<ACCESS_KEY_VALUE>

Name of the Event Hub

(Optional) The identifier of the Event Hub partition that the events will be sent to. Use this if you want all the change events received by Debezium to be sent to a specific partition in Event Hubs. Do not use if you have specified debezium.sink.eventhubs.partitionkey

(Optional) The partition key will be used to hash the events. Use this if you want all the change events received by Debezium to be sent to a specific partition in Event Hubs. Do not use if you have specified debezium.sink.eventhubs.partitionid

Sets the maximum size for the batch of events, in bytes.

Injection points

The default sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

InterfaceCDI classifierDescription

@CustomConsumerBuilder

Custom configured instance of a EventHubProducerClient used to send messages.

Redis (Stream)

Redis is an open source (BSD licensed) in-memory data structure store, used as a database, cache and message broker. The Stream is a data type which models a log data structure in a more abstract way. It implements powerful operations to overcome the limitations of a log file.

PropertyDefaultDescription

Must be set to redis.

An address, formatted as host:port, at which the Redis target streams are provided.

(Optional) A user name used to communicate with Redis.

(Optional) A password (of respective user) used to communicate with Redis. A password must be set if a user is set.

(Optional) Use SSL to communicate with Redis. Default ‘false’

default

Redis does not support the notion of data without key. So this string will be used as key for records without primary key.

default

Redis does not support the notion of null payloads, as is the case with tombstone events. So this string will be used as value for records without a payload.

500

Number of change records to insert in a single batch write (Pipelined transaction).

300

Initial retry delay when encountering Redis connection or OOM issues. This value will be doubled upon every retry but won’t exceed debezium.sink.redis.retry.max.delay.ms

10000

Max delay when encountering Redis connection or OOM issues.

2000

Connection timeout for Redis client.

2000

Socket timeout for Redis client.

compact

The format of the message sent to the Redis stream. Possible values are extended(newer format) and compact(the until now, old format). Read more about the message format below.

Message Format

We have seen above the debezium.sink.redis.message.format property which configures the message format in two ways which look like this in Redis:

  • the extended format, using two pairs {1), 2)}={“key”, “message key”} and {3), 4)}={“value”, “message value”}:
  1. 1) 1) "1639304527499-0"
  2. 2) 1) "key"
  3. 2) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Key\"}, \"payload\": {\"empno\": 11}}"
  4. 3) "value"
  5. 4) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"before\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"after\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"version\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"connector\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"name\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"ts_ms\"}, {\"type\": \"string\", \"optional\": true, \"name\": \"io.debezium.data.Enum\", \"version\": 1, \"parameters\": {\"allowed\": \"true,last,false\"}, \"default\": \"false\", \"field\": \"snapshot\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"db\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"sequence\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"schema\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"table\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"change_lsn\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"commit_lsn\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"event_serial_no\"}], \"optional\": false, \"name\": \"io.debezium.connector.sqlserver.Source\", \"field\": \"source\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"op\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"ts_ms\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"id\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"total_order\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"data_collection_order\"}], \"optional\": true, \"field\": \"transaction\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Envelope\"}, \"payload\": {\"before\": {\"empno\": 11, \"fname\": \"Yossi\", \"lname\": \"Mague\", \"job\": \"PFE\", \"mgr\": 1, \"hiredate\": 1562630400000, \"sal\": \"dzWUAA==\", \"comm\": \"AYag\", \"dept\": 3}, \"after\": null, \"source\": {\"version\": \"1.6.0.Final\", \"connector\": \"sqlserver\", \"name\": \"redislabs\", \"ts_ms\": 1637859764960, \"snapshot\": \"false\", \"db\": \"RedisConnect\", \"sequence\": null, \"schema\": \"dbo\", \"table\": \"emp\", \"change_lsn\": \"0000003a:00002f50:0002\", \"commit_lsn\": \"0000003a:00002f50:0005\", \"event_serial_no\": 1}, \"op\": \"d\", \"ts_ms\": 1637859769370, \"transaction\": null}}"
  • and the compact format, using only one pair {1), 2)}={“message key”, “message value”}:
  1. 1) 1) "1639304527499-0"
  2. 2) 1) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Key\"}, \"payload\": {\"empno\": 11}}"
  3. 2) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"before\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"after\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"version\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"connector\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"name\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"ts_ms\"}, {\"type\": \"string\", \"optional\": true, \"name\": \"io.debezium.data.Enum\", \"version\": 1, \"parameters\": {\"allowed\": \"true,last,false\"}, \"default\": \"false\", \"field\": \"snapshot\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"db\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"sequence\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"schema\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"table\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"change_lsn\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"commit_lsn\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"event_serial_no\"}], \"optional\": false, \"name\": \"io.debezium.connector.sqlserver.Source\", \"field\": \"source\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"op\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"ts_ms\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"id\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"total_order\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"data_collection_order\"}], \"optional\": true, \"field\": \"transaction\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Envelope\"}, \"payload\": {\"before\": {\"empno\": 11, \"fname\": \"Yossi\", \"lname\": \"Mague\", \"job\": \"PFE\", \"mgr\": 1, \"hiredate\": 1562630400000, \"sal\": \"dzWUAA==\", \"comm\": \"AYag\", \"dept\": 3}, \"after\": null, \"source\": {\"version\": \"1.6.0.Final\", \"connector\": \"sqlserver\", \"name\": \"redislabs\", \"ts_ms\": 1637859764960, \"snapshot\": \"false\", \"db\": \"RedisConnect\", \"sequence\": null, \"schema\": \"dbo\", \"table\": \"emp\", \"change_lsn\": \"0000003a:00002f50:0002\", \"commit_lsn\": \"0000003a:00002f50:0005\", \"event_serial_no\": 1}, \"op\": \"d\", \"ts_ms\": 1637859769370, \"transaction\": null}}"

You can read more about Redis Streams here.

Injection points

The Redis sink behavior can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

InterfaceCDI classifierDescription

Custom implementation maps the planned destination (topic) name into a physical Redis stream name. By default the same name is used.

NATS Streaming

NATS Streaming is a data streaming system powered by NATS, and written in the Go programming language.

PropertyDefaultDescription

Must be set to nats-streaming.

URL (or comma separated list of URLs) to a node or nodes in the cluster formatted as nats://host:port.

NATS Streaming Cluster ID.

NATS Streaming Client ID.

Injection points

The NATS Streaming sink behavior can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

InterfaceCDI classifierDescription

@CustomConsumerBuilder

Custom configured instance of a StreamingConnection used to publish messages to target subjects.

Custom implementation maps the planned destination (topic) name into a physical NATS Streaming subject name. By default the same name is used.

Apache Kafka

Apache Kafka is a popular open-source platform for distributed event streaming. Debezium server supports publishing captured change events to a configured Kafka message broker.

PropertyDefaultDescription

Must be set to kafka.

The Kafka sink adapter supports pass-through configuration. This means that all Kafka producer configuration properties are passed to the producer with the prefix removed. At least bootstrap.servers, key.serializer and value.serializer properties must be provided. The topic is set by Debezium.

Pravega

Pravega is a cloud-native storage system for event streams and data streams. This sink offers two modes: non-transactional and transactional. The non-transactional mode individually writes each event in a Debezium batch to Pravega. The transactional mode writes the Debezium batch to a Pravega transaction that commits when the batch is completed.

The Pravega sink expects destination scope and streams to already be created.

PropertyDefaultDescription

Must be set to pravega.

tcp://localhost:9090

The connection string to a Controller in the Pravega cluster.

The name of the scope in which to find the destination streams.

false

Set to true to have the sink use Pravega transactions for each Debezium batch.

Injection points

Pravega sink behavior can be modified by custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

InterfaceCDI classifierDescription

Custom implementation maps the planned destination (stream) name into a physical Pravega stream name. By default the same name is used.

Extensions

Debezium Server uses the Quarkus framework and relies on dependency injection to enable developer to extend its behaviour. Note that only the JVM mode of Quarkus is supported, but not native execution via GraalVM. The server can be extended in two ways by providing a custom logic:

  • implementation of a new sink

  • customization of an existing sink - i.e. non-standard configuration

Implementation of a new sink

The new sink can be implemented as a CDI bean implementing interface DebeziumEngine.ChangeConsumer and with annotation @Named and unique name and scope @Dependent. The name of the bean is used as the debezium.sink.type option.

The sink needs to read the configuration using Microprofile Config API. The execution path must pass the messages into the target system and regularly commit the passed/processed messages.

See the Kinesis sink implementation for further details.

Customization of an existing sink

Some of the sinks exposes dependency injections points that enable users to provide its own bean that would modify the behaviour of the sink. Typical examples are fine tuning of the target client setup, the destination naming etc.

See an example of a custom topic naming policy implementation for further details.