Avro Serialization
Technical Information
A system that wants to use Avro serialization needs to complete two steps:
Deploy a Schema Registry instance
Use these properties to configure Apache Connect instance
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
Note: In addition to setting key/value converters,it is strongly recommended to set internal key/value converters to use JSON converters for easier analysis of stored configuration and offsets. If you would still prefer to use Avro converter it is not possible now due to a known issue.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
Debezium Docker Images
See the MySQL and the Avro message format tutorial example for a quickstart with MySQL.
Deploy a Schema Registry instance:
docker run -it --rm --name schema-registry \
--link zookeeper \
-e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 \
-p 8181:8181 confluentinc/cp-schema-registry
Run a Kafka Connect image configured to use Avro:
docker run -it --rm --name connect \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
--link schema-registry:schema-registry \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e KEY_CONVERTER=io.confluent.connect.avro.AvroConverter \
-e VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter \
-e INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
-e INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
-p 8083:8083 debezium/connect:1.0
Run a console consumer which reads new Avro messages from the db.myschema.mytable
topic and decodes to JSON:
docker run -it --rm --name avro-consumer \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
--link schema-registry:schema-registry \
debezium/connect:1.0 \
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--property print.key=true \
--formatter io.confluent.kafka.formatter.AvroMessageFormatter \
--property schema.registry.url=http://schema-registry:8081 \
--topic db.myschema.mytable
Naming
As stated in the Avro documentation, names must adhere to the following rules:
Start with
[A-Za-z_]
Subsequently contain only
[A-Za-z0-9_]
characters
Debezium uses the column’s name as the basis for the Avro field. This can lead to problems during serialization if the column name does not also adhere to the Avro naming rules above. Debezium provides a configuration option, sanitize.field.names
that can be set to true
if you have columns that do not adhere to the rule-set above, allowing those fields to be serialized without having to actually modify your schema.
Getting More Information
This post from the Debezium blog describes the concepts of serializers, converters etc. and discusses the advantages of using Avro.
For first steps with Kafka Connect, there’s a helpful quickstart in Confluent’s documentation.