Avro Serialization
Apicurio API and Schema Registry
About the Apicurio API and Schema Registry
The Apicurio Registry open-source project provides several components that work with Avro:
An Avro converter that you can specify in Debezium connector configurations. This converter maps Kafka Connect schemas to Avro schemas. The converter then uses the Avro schemas to serialize the record keys and values into Avro’s compact binary form.
An API and schema registry that tracks:
Avro schemas that are used in Kafka topics.
Where the Avro converter sends the generated Avro schemas.
Because the Avro schemas are stored in this registry, each record needs to contain only a tiny schema identifier. This makes each record even smaller. For an I/O bound system like Kafka, this means more total throughput for producers and consumers.
Avro Serdes (serializers and deserializers) for Kafka producers and consumers. Kafka consumer applications that you write to consume change event records can use Avro Serdes to deserialize the change event records.
To use the Apicurio Registry with Debezium, add Apicurio Registry converters and their dependencies to the Kafka Connect container image that you are using for running a Debezium connector.
The Apicurio Registry project also provides a JSON converter. This converter combines the advantage of less verbose messages with human-readable JSON. Messages do not contain the schema information themselves, but only a schema ID. |
To use converters provided by Apicurio Registry you need to provide |
Apicurio Registry deployment overview
To deploy a Debezium connector that uses Avro serialization, you must complete three main tasks:
Deploy an Apicurio API and Schema Registry instance.
Install the Avro converter from the installation package into a plug-in directory. If you are using the Debezium Connect container image, it’s not necessary to install the package. For more information, see Deploying Apicurio Registry with Debezium containers.
Configure a Debezium connector instance to use Avro serialization by setting configuration properties as follows:
key.converter=io.apicurio.registry.utils.converter.AvroConverter
key.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
key.converter.apicurio.registry.auto-register=true
key.converter.apicurio.registry.find-latest=true
value.converter=io.apicurio.registry.utils.converter.AvroConverter
value.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
value.converter.apicurio.registry.auto-register=true
value.converter.apicurio.registry.find-latest=true
schema.name.adjustment.mode=avro
Internally, Kafka Connect always uses JSON key/value converters for storing configuration and offsets.
Deploying Apicurio Registry with Debezium containers
In your environment, you might want to use a provided Debezium container image to deploy Debezium connectors that use Avro serialization. Follow the procedure here to do that. In this procedure, you enable Apicurio converters on the Debezium Kafka Connect container image, and configure the Debezium connector to use the Avro converter.
Prerequisites
You have Docker installed and sufficient rights to create and manage containers.
You downloaded the Debezium connector plug-in(s) that you want to deploy with Avro serialization.
Procedure
Deploy an instance of Apicurio Registry.
The following example uses a non-production, in-memory, Apicurio Registry instance:
docker run -it --rm --name apicurio \
-p 8080:8080 apicurio/apicurio-registry-mem:2.5.11.Final
Run the Debezium container image for Kafka Connect, configuring it to provide the Avro converter by enabling Apicurio via
ENABLE_APICURIO_CONVERTERS=true
environment variable:docker run -it --rm --name connect \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
--link apicurio:apicurio \
-e ENABLE_APICURIO_CONVERTERS=true \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
-e VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
-e CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
-e CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \
-e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \
-e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \
-e CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
-e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \
-e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \
-e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \
-e CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE=avro \
-p 8083:8083 quay.io/debezium/connect:2.7
Confluent Schema Registry
There is an alternative schema registry implementation provided by Confluent.
Confluent Schema Registry deployment overview
For information about installing the standalone Confluent Schema Registry, see the Confluent Platform deployment documentation.
As an alternative, you can install the standalone Confluent Schema Registry as a container.
Deploying Confluent Schema Registry with Debezium containers
Beginning with Debezium 2.0.0, Confluent Schema Registry support is not included in the Debezium containers. To enable the Confluent Schema Registry for a Debezium container, install the following Confluent Avro converter JAR files into the Connect plugin directory:
kafka-connect-avro-converter
kafka-connect-avro-data
kafka-avro-serializer
kafka-schema-serializer
kafka-schema-converter
kafka-schema-registry-client
common-config
common-utils
You can download the preceding files from the Confluent Maven repository.
There are also some other JAR files required to be located into the Connect plugin directory:
avro
commons-compress
failureaccess
guava
minimal-json
re2j
slf4j-api
snakeyaml
swagger-annotations
jackson-databind
jackson-core
jackson-annotations
jackson-dataformat-csv
logredactor
logredactor-metrics
You can download the preceding files from the Maven repository.
The configuration is slightly different.
In your Debezium connector configuration, specify the following properties:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
Deploy an instance of the Confluent Schema Registry:
docker run -it --rm --name schema-registry \
--link zookeeper \
-e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 \
-p 8181:8181 confluentinc/cp-schema-registry
Run a Kafka Connect image configured to use Avro:
docker run -it --rm --name connect \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
--link schema-registry:schema-registry \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e KEY_CONVERTER=io.confluent.connect.avro.AvroConverter \
-e VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
-p 8083:8083 quay.io/debezium/connect:2.7
Run a console consumer that reads new Avro messages from the
db.myschema.mytable
topic and decodes to JSON:docker run -it --rm --name avro-consumer \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
--link schema-registry:schema-registry \
quay.io/debezium/connect:2.7 \
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--property print.key=true \
--formatter io.confluent.kafka.formatter.AvroMessageFormatter \
--property schema.registry.url=http://schema-registry:8081 \
--topic db.myschema.mytable
Naming
As stated in the Avro documentation, names must adhere to the following rules:
Start with
[A-Za-z_]
Subsequently contains only
[A-Za-z0-9_]
characters
Debezium uses the column’s name as the basis for the corresponding Avro field. This can lead to problems during serialization if the column name does not also adhere to the Avro naming rules. Each Debezium connector provides a configuration property, field.name.adjustment.mode
that you can set to avro
if you have columns that do not adhere to Avro rules for names. Setting field.name.adjustment.mode
to avro
allows serialization of non-conformant fields without having to actually modify your schema.
Getting More Information
This post from the Debezium blog describes the concepts of serializers, converters, and other components, and discusses the advantages of using Avro. Some Kafka Connect converter details have slightly changed since that post was written.
For a complete example of using Avro as the message format for Debezium change data events, see MySQL and the Avro message format.