Exporting CloudEvents

Example event format

The following example shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is configured to use JSON as the CloudEvents format envelope and also as the data format.

  1. {
  2. "id" : "name:test_server;lsn:29274832;txId:565", (1)
  3. "source" : "/debezium/postgresql/test_server", (2)
  4. "specversion" : "1.0", (3)
  5. "type" : "io.debezium.postgresql.datachangeevent", (4)
  6. "time" : "2020-01-13T13:55:39.738Z", (5)
  7. "datacontenttype" : "application/json", (6)
  8. "iodebeziumop" : "r", (7)
  9. "iodebeziumversion" : "2.5.4.Final", (8)
  10. "iodebeziumconnector" : "postgresql",
  11. "iodebeziumname" : "test_server",
  12. "iodebeziumtsms" : "1578923739738",
  13. "iodebeziumsnapshot" : "true",
  14. "iodebeziumdb" : "postgres",
  15. "iodebeziumschema" : "s1",
  16. "iodebeziumtable" : "a",
  17. "iodebeziumlsn" : "29274832",
  18. "iodebeziumxmin" : null,
  19. "iodebeziumtxid": "565", (9)
  20. "iodebeziumtxtotalorder": "1",
  21. "iodebeziumtxdatacollectionorder": "1",
  22. "data" : { (10)
  23. "before" : null,
  24. "after" : {
  25. "pk" : 1,
  26. "name" : "Bob"
  27. }
  28. }
  29. }
Table 1. Descriptions of fields in a CloudEvents change event record
ItemDescription

1

Unique ID that the connector generates for the change event based on the change event’s content.

2

The source of the event, which is the logical name of the database as specified by the topic.prefix property in the connector’s configuration.

3

The CloudEvents specification version.

4

Connector type that generated the change event. The format of this field is io.debezium.CONNECTOR_TYPE.datachangeevent. Valid values for CONNECTOR_TYPE are db2, informix, mongodb, mysql, oracle, postgresql, or sqlserver.

5

Time of the change in the source database.

6

Describes the content type of the data attribute. Possible values are json, as in this example, or avro.

7

An operation identifier. Possible values are r for read, c for create, u for update, or d for delete.

8

All source attributes that are known from Debezium change events are mapped to CloudEvents extension attributes by using the iodebezium prefix for the attribute name.

9

When enabled in the connector, each transaction attribute that is known from Debezium change events is mapped to a CloudEvents extension attribute by using the iodebeziumtx prefix for the attribute name.

10

The actual data change. Depending on the operation and the connector, the data might contain before, after, or patch fields.

The following example also shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is again configured to use JSON as the CloudEvents format envelope, but this time the connector is configured to use Avro for the data format.

  1. {
  2. "id" : "name:test_server;lsn:33227720;txId:578",
  3. "source" : "/debezium/postgresql/test_server",
  4. "specversion" : "1.0",
  5. "type" : "io.debezium.postgresql.datachangeevent",
  6. "time" : "2020-01-13T14:04:18.597Z",
  7. "datacontenttype" : "application/avro", (1)
  8. "dataschema" : "http://my-registry/schemas/ids/1", (2)
  9. "iodebeziumop" : "r",
  10. "iodebeziumversion" : "2.5.4.Final",
  11. "iodebeziumconnector" : "postgresql",
  12. "iodebeziumname" : "test_server",
  13. "iodebeziumtsms" : "1578924258597",
  14. "iodebeziumsnapshot" : "true",
  15. "iodebeziumdb" : "postgres",
  16. "iodebeziumschema" : "s1",
  17. "iodebeziumtable" : "a",
  18. "iodebeziumtxId" : "578",
  19. "iodebeziumlsn" : "33227720",
  20. "iodebeziumxmin" : null,
  21. "iodebeziumtxid": "578",
  22. "iodebeziumtxtotalorder": "1",
  23. "iodebeziumtxdatacollectionorder": "1",
  24. "data" : "AAAAAAEAAgICAg==" (3)
  25. }
Table 2. Descriptions of fields in a CloudEvents event record for a connector that uses Avro to format data
ItemDescription

1

Indicates that the data attribute contains Avro binary data.

2

URI of the schema to which the Avro data adheres.

3

The data attribute contains base64-encoded Avro binary data.

It is also possible to use Avro for the envelope as well as the data attribute.

Example configuration

Configure io.debezium.converters.CloudEventsConverter in your Debezium connector configuration. The following example shows how to configure the CloudEvents converter to emit change event records that have the following characteristics:

  • Use JSON as the envelope.

  • Use the schema registry at http://my-registry/schemas/ids/1 to serialize the data attribute as binary Avro data.

  1. ...
  2. "value.converter": "io.debezium.converters.CloudEventsConverter",
  3. "value.converter.serializer.type" : "json", (1)
  4. "value.converter.data.serializer.type" : "avro",
  5. "value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1"
  6. ...
Table 3. Description of fields in CloudEvents converter configuration
ItemDescription

1

Specifying the serializer.type is optional, because json is the default.

The CloudEvents converter converts Kafka record values. In the same connector configuration, you can specify key.converter if you want to operate on record keys. For example, you might specify StringConverter, LongConverter, JsonConverter, or AvroConverter.

Configuration of sources of metadata and some CloudEvents fields

By default, the metadata.source property consists of three parts, as seen in the following example:

  1. "value,id:generate,type:generate"

The first part specifies the source for retrieving a record’s metadata; the permitted values are value and header. The next parts specify how to obtain the id and type fields of a CloudEvent; the permitted values are generate and header.

Obtaining record metadata

To construct a CloudEvent, the converter requires source, operation, and transaction metadata. Generally, the converter can retrieve the metadata from a record’s value. But in some cases, before the converter receives a record, the record might be processed in such a way that metadata is not present in its value, for example, after the record is processed by the Outbox Event Router SMT. To preserve the required metadata, you can use the following approach to pass the metadata in the record headers.

Procedure

  1. Implement a mechanism for recording the metadata in the record’s headers before the record reaches the converter, for example, by using the HeaderFrom SMT.

  2. Set the value of the converter’s metadata.source property to header.

The following example shows the configuration for a connector that uses the Outbox Event Router SMT, and the HeaderFrom SMT:

  1. ...
  2. "tombstones.on.delete": false,
  3. "transforms": "addMetadataHeaders,outbox",
  4. "transforms.addMetadataHeaders.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
  5. "transforms.addMetadataHeaders.fields": "source,op,transaction",
  6. "transforms.addMetadataHeaders.headers": "source,op,transaction",
  7. "transforms.addMetadataHeaders.operation": "copy",
  8. "transforms.addMetadataHeaders.predicate": "isHeartbeat",
  9. "transforms.addMetadataHeaders.negate": true,
  10. "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
  11. "transforms.outbox.table.expand.json.payload": true,
  12. "transforms.outbox.table.fields.additional.placement": "type:header",
  13. "predicates": "isHeartbeat",
  14. "predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  15. "predicates.isHeartbeat.pattern": "__debezium-heartbeat.*",
  16. "value.converter": "io.debezium.converters.CloudEventsConverter",
  17. "value.converter.metadata.source": "header",
  18. "header.converter": "org.apache.kafka.connect.json.JsonConverter",
  19. "header.converter.schemas.enable": true
  20. ...
To use the HeaderFrom transformation, it might be necessary to filter tombstone and heartbeat messages.

The header value of the metadata.source property is a global setting. As a result, even if you omit parts of a property’s value, such as the id and type sources, the converter generates header values for the omitted parts.

Obtaining id and type of a CloudEvent

By default, the CloudEvents converter automatically generates values for id and type fields of a CloudEvent. You can customize the way that the converter populates these fields by changing the defaults and specifying the fields’ values in the appropriate headers. For example:

  1. "value.converter.metadata.source": "value,id:header,type:header"

With the preceding configuration in effect, you could configure upstream functions to add id and type headers with the values that you want to pass to the CloudEvents converter.

If you want to provide values only for id header, use:

  1. "value.converter.metadata.source": "value,id:header,type:generate"

To provide metadata, id, and type in headers, use the short syntax:

  1. "value.converter.metadata.source": "header"

Configuration options

When you configure a Debezium connector to use the CloudEvent converter you can specify the following options.

Table 4. Descriptions of CloudEvents converter configuration options

Option

Default

Description

json

The encoding type to use for the CloudEvents envelope structure. The value can be json or avro.

json

The encoding type to use for the data attribute. The value can be json or avro.

N/A

Any configuration options to be passed through to the underlying converter when using JSON. The json. prefix is removed.

N/A

Any configuration options to be passed through to the underlying converter when using Avro. The avro. prefix is removed. For example, for Avro data, you would specify the avro.schema.registry.url option.

none

Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. The value can be none or avro.

none

Specifies CloudEvents schema name under which the schema is registered in a Schema Registry. The setting is ignored when serializer.type is json in which case the value is schemaless. If not set the default algorithm will be used to generate the schema name: ${serverName}.${databaseName}.CloudEvents.Envelope.

true

Specifies whether the converter includes extension attributes when it generates a cloud event. The value can be true or false.

value,id:generate,type:generate

A comma-separated list that specifies the sources from which the converter retrieves metadata (source, operation, transaction), along with the names of the CloudEvent id, and type fields. The first element in the list is a global setting that specifies the source of the metadata. The source of metadata can be value or header. This first element is followed by a set of pairs that specify the name of a CloudEvent field (id or type), and the source for obtaining the field’s value: generate or header. Separate the values in each pair with a colon, for example:

value,id:header,type:generate