Exporting CloudEvents
Example event format
The following example shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is configured to use JSON as the CloudEvents format envelope and also as the data
format.
{
"id" : "name:test_server;lsn:29274832;txId:565", (1)
"source" : "/debezium/postgresql/test_server", (2)
"specversion" : "1.0", (3)
"type" : "io.debezium.connector.postgresql.DataChangeEvent", (4)
"time" : "2020-01-13T13:55:39.738Z", (5)
"datacontenttype" : "application/json", (6)
"iodebeziumop" : "r", (7)
"iodebeziumversion" : "2.7.2.Final", (8)
"iodebeziumconnector" : "postgresql",
"iodebeziumname" : "test_server",
"iodebeziumtsms" : "1578923739738",
"iodebeziumsnapshot" : "true",
"iodebeziumdb" : "postgres",
"iodebeziumschema" : "s1",
"iodebeziumtable" : "a",
"iodebeziumlsn" : "29274832",
"iodebeziumxmin" : null,
"iodebeziumtxid": "565", (9)
"iodebeziumtxtotalorder": "1",
"iodebeziumtxdatacollectionorder": "1",
"data" : { (10)
"before" : null,
"after" : {
"pk" : 1,
"name" : "Bob"
}
}
}
Item | Description |
---|---|
1 | Unique ID that the connector generates for the change event based on the change event’s content. |
2 | The source of the event, which is the logical name of the database as specified by the |
3 | The CloudEvents specification version. |
4 | Connector type that generated the change event. The format of this field is |
5 | Time of the change in the source database. |
6 | Describes the content type of the |
7 | An operation identifier. Possible values are |
8 | All |
9 | When enabled in the connector, each |
10 | The actual data change. Depending on the operation and the connector, the data might contain |
The following example also shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is again configured to use JSON as the CloudEvents format envelope, but this time the connector is configured to use Avro for the data
format.
{
"id" : "name:test_server;lsn:33227720;txId:578",
"source" : "/debezium/postgresql/test_server",
"specversion" : "1.0",
"type" : "io.debezium.connector.postgresql.DataChangeEvent",
"time" : "2020-01-13T14:04:18.597Z",
"datacontenttype" : "application/avro", (1)
"dataschema" : "http://my-registry/schemas/ids/1", (2)
"iodebeziumop" : "r",
"iodebeziumversion" : "2.7.2.Final",
"iodebeziumconnector" : "postgresql",
"iodebeziumname" : "test_server",
"iodebeziumtsms" : "1578924258597",
"iodebeziumsnapshot" : "true",
"iodebeziumdb" : "postgres",
"iodebeziumschema" : "s1",
"iodebeziumtable" : "a",
"iodebeziumtxId" : "578",
"iodebeziumlsn" : "33227720",
"iodebeziumxmin" : null,
"iodebeziumtxid": "578",
"iodebeziumtxtotalorder": "1",
"iodebeziumtxdatacollectionorder": "1",
"data" : "AAAAAAEAAgICAg==" (3)
}
Item | Description |
---|---|
1 | Indicates that the |
2 | URI of the schema to which the Avro data adheres. |
3 | The |
It is also possible to use Avro for the envelope as well as the data
attribute.
Example configuration
Configure io.debezium.converters.CloudEventsConverter
in your Debezium connector configuration. The following example shows how to configure the CloudEvents converter to emit change event records that have the following characteristics:
Use JSON as the envelope.
Use the schema registry at
http://my-registry/schemas/ids/1
to serialize thedata
attribute as binary Avro data.
...
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type" : "json", (1)
"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1"
...
Item | Description |
---|---|
1 | Specifying the |
The CloudEvents converter converts Kafka record values. In the same connector configuration, you can specify key.converter
if you want to operate on record keys. For example, you might specify StringConverter
, LongConverter
, JsonConverter
, or AvroConverter
.
Configuration of sources of metadata and some CloudEvents fields
By default, the metadata.source
property consists of three parts, as seen in the following example:
"value,id:generate,type:generate,dataSchemaName:generate"
The first part specifies the source for retrieving a record’s metadata; the permitted values are value
and header
. The next parts specify how the converter populates values for the following metadata fields:
id
type
dataSchemaName
(the name under which the schema is registered in the Schema Registry)
The converter can use one of the following methods to populate each field:
generate
The converter generates a value for the field.
header
The converter obtain values for the field from a message header.
Obtaining record metadata
To construct a CloudEvent, the converter requires source, operation, and transaction metadata. Generally, the converter can retrieve the metadata from a record’s value. But in some cases, before the converter receives a record, the record might be processed in such a way that metadata is not present in its value, for example, after the record is processed by the Outbox Event Router SMT. To preserve the required metadata, you can use the following approach to pass the metadata in the record headers.
Procedure
Implement a mechanism for recording the metadata in the record’s headers before the record reaches the converter, for example, by using the
HeaderFrom
SMT.Set the value of the converter’s
metadata.source
property toheader
.
The following example shows the configuration for a connector that uses the Outbox Event Router SMT, and the HeaderFrom
SMT:
...
"tombstones.on.delete": false,
"transforms": "addMetadataHeaders,outbox",
"transforms.addMetadataHeaders.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.addMetadataHeaders.fields": "source,op,transaction",
"transforms.addMetadataHeaders.headers": "source,op,transaction",
"transforms.addMetadataHeaders.operation": "copy",
"transforms.addMetadataHeaders.predicate": "isHeartbeat",
"transforms.addMetadataHeaders.negate": true,
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.expand.json.payload": true,
"transforms.outbox.table.fields.additional.placement": "type:header",
"predicates": "isHeartbeat",
"predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHeartbeat.pattern": "__debezium-heartbeat.*",
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.metadata.source": "header",
"header.converter": "org.apache.kafka.connect.json.JsonConverter",
"header.converter.schemas.enable": true
...
To use the HeaderFrom transformation, it might be necessary to filter tombstone and heartbeat messages. |
The header
value of the metadata.source
property is a global setting. As a result, even if you omit parts of a property’s value, such as the id
and type
sources, the converter generates header
values for the omitted parts.
Obtaining CloudEvent metadata
By default, the CloudEvents converter automatically generates values for the id
and type
fields of a CloudEvent, and generates the schema name for its data
field. You can customize the way that the converter populates these fields by changing the defaults and specifying the fields’ values in the appropriate headers. For example:
"value.converter.metadata.source": "value,id:header,type:header,dataSchemaName:header"
With the preceding configuration in effect, you could configure upstream functions to add id
and type
headers with the values that you want to pass to the CloudEvents converter.
If you want to provide values only for id
header, use:
"value.converter.metadata.source": "value,id:header,type:generate,dataSchemaName:generate"
To configure the converter to obtain id
, type
, and dataSchemaName
metadata from headers, use the following short syntax:
"value.converter.metadata.source": "header"
To enable the converter to retrieve the data schema name from a header field, you must set schema.data.name.source.header.enable to true
.
Configuration options
When you configure a Debezium connector to use the CloudEvent converter you can specify the following options.
Option | Default | Description |
| The encoding type to use for the CloudEvents envelope structure. The value can be | |
| The encoding type to use for the | |
N/A | Any configuration options to be passed through to the underlying converter when using JSON. The | |
N/A | Any configuration options to be passed through to the underlying converter when using Avro. The | |
none | Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. The value can be | |
none | Specifies CloudEvents schema name under which the schema is registered in a Schema Registry. The setting is ignored when | |
false | Specifies whether the converter can retrieve the schema name of the CloudEvents | |
| Specifies whether the converter includes extension attributes when it generates a cloud event. The value can be | |
| A comma-separated list that specifies the sources from which the converter retrieves metadata values (source, operation, transaction) for CloudEvent
For configuration examples, see Configuration of sources of metadata and some CloudEvents fields. |