Exporting CloudEvents

Example event format

The following example shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is configured to use JSON as the CloudEvents format envelope and also as the data format.

  1. {
  2. "id" : "name:test_server;lsn:29274832;txId:565", (1)
  3. "source" : "/debezium/postgresql/test_server", (2)
  4. "specversion" : "1.0", (3)
  5. "type" : "io.debezium.postgresql.datachangeevent", (4)
  6. "time" : "2020-01-13T13:55:39.738Z", (5)
  7. "datacontenttype" : "application/json", (6)
  8. "iodebeziumop" : "r", (7)
  9. "iodebeziumversion" : "2.4.1.Final", (8)
  10. "iodebeziumconnector" : "postgresql",
  11. "iodebeziumname" : "test_server",
  12. "iodebeziumtsms" : "1578923739738",
  13. "iodebeziumsnapshot" : "true",
  14. "iodebeziumdb" : "postgres",
  15. "iodebeziumschema" : "s1",
  16. "iodebeziumtable" : "a",
  17. "iodebeziumlsn" : "29274832",
  18. "iodebeziumxmin" : null,
  19. "iodebeziumtxid": "565", (9)
  20. "iodebeziumtxtotalorder": "1",
  21. "iodebeziumtxdatacollectionorder": "1",
  22. "data" : { (10)
  23. "before" : null,
  24. "after" : {
  25. "pk" : 1,
  26. "name" : "Bob"
  27. }
  28. }
  29. }
1Unique ID that the connector generates for the change event based on the change event’s content.
2The source of the event, which is the logical name of the database as specified by the topic.prefix property in the connector’s configuration.
3The CloudEvents specification version.
4Connector type that generated the change event. The format of this field is io.debezium.CONNECTOR_TYPE.datachangeevent. The value of CONNECTOR_TYPE is mongodb, mysql, postgresql, or sqlserver.
5Time of the change in the source database.
6Describes the content type of the data attribute, which is JSON in this example. The only alternative is Avro.
7An operation identifier. Possible values are r for read, c for create, u for update, or d for delete.
8All source attributes that are known from Debezium change events are mapped to CloudEvents extension attributes by using the iodebezium prefix for the attribute name.
9When enabled in the connector, each transaction attribute that is known from Debezium change events is mapped to a CloudEvents extension attribute by using the iodebeziumtx prefix for the attribute name.
10The actual data change itself. Depending on the operation and the connector, the data might contain before, after and/or patch fields.

The following example also shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is again configured to use JSON as the CloudEvents format envelope, but this time the connector is configured to use Avro for the data format.

  1. {
  2. "id" : "name:test_server;lsn:33227720;txId:578",
  3. "source" : "/debezium/postgresql/test_server",
  4. "specversion" : "1.0",
  5. "type" : "io.debezium.postgresql.datachangeevent",
  6. "time" : "2020-01-13T14:04:18.597Z",
  7. "datacontenttype" : "application/avro", (1)
  8. "dataschema" : "http://my-registry/schemas/ids/1", (2)
  9. "iodebeziumop" : "r",
  10. "iodebeziumversion" : "2.4.1.Final",
  11. "iodebeziumconnector" : "postgresql",
  12. "iodebeziumname" : "test_server",
  13. "iodebeziumtsms" : "1578924258597",
  14. "iodebeziumsnapshot" : "true",
  15. "iodebeziumdb" : "postgres",
  16. "iodebeziumschema" : "s1",
  17. "iodebeziumtable" : "a",
  18. "iodebeziumtxId" : "578",
  19. "iodebeziumlsn" : "33227720",
  20. "iodebeziumxmin" : null,
  21. "iodebeziumtxid": "578",
  22. "iodebeziumtxtotalorder": "1",
  23. "iodebeziumtxdatacollectionorder": "1",
  24. "data" : "AAAAAAEAAgICAg==" (3)
  25. }
1Indicates that the data attribute contains Avro binary data.
2URI of the schema to which the Avro data adheres.
3The data attribute contains base64-encoded Avro binary data.

It is also possible to use Avro for the envelope as well as the data attribute.

Example configuration

Configure io.debezium.converters.CloudEventsConverter in your Debezium connector configuration. The following example shows how to configure the CloudEvents converter to emit change event records that have the following characteristics:

  • Use JSON as the envelope.

  • Use the schema registry at http://my-registry/schemas/ids/1 to serialize the data attribute as binary Avro data.

  1. ...
  2. "value.converter": "io.debezium.converters.CloudEventsConverter",
  3. "value.converter.serializer.type" : "json", (1)
  4. "value.converter.data.serializer.type" : "avro",
  5. "value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1"
  6. ...
1Specifying the serializer.type is optional, because json is the default.

The CloudEvents converter converts Kafka record values. In the same connector configuration, you can specify key.converter if you want to operate on record keys. For example, you might specify StringConverter, LongConverter, JsonConverter, or AvroConverter.

Configuration options

When you configure a Debezium connector to use the CloudEvent converter you can specify the following options.

Table 1. Descriptions of CloudEvents converter configuration options

Option

Default

Description

json

The encoding type to use for the CloudEvents envelope structure. The value can be json or avro.

json

The encoding type to use for the data attribute. The value can be json or avro.

N/A

Any configuration options to be passed through to the underlying converter when using JSON. The json. prefix is removed.

N/A

Any configuration options to be passed through to the underlying converter when using Avro. The avro. prefix is removed. For example, for Avro data, you would specify the avro.schema.registry.url option.

none

Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. The value can be none or avro.