ConvertCloudEventToSaveableForm
Example
To convert a CloudEvent to a form suitable for JdbcSinkConnector
, configure the ConvertCloudEventToSaveableForm
SMT in the Kafka Connect configuration for a connector. For example, to convert CloudEvents that were serialized and deserialized with JSON provided that the target database has id
, source
, type
, and payload
columns, use the following configuration for the SMT:
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
...
"transforms": "convertCloudEvent",
"transforms.convertCloudEvent.type": "io.debezium.connector.jdbc.transforms.ConvertCloudEventToSaveableForm",
"transforms.convertCloudEvent.fields.mapping": "id,source,type,data:payload",
"transforms.convertCloudEvent.serializer.type": "json",
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type": "json",
"value.converter.data.serializer.type": "json"
...
The following example shows the value of a record before and after the transformation is applied.
Example 1. Effect of applying the ConvertCloudEventToSaveableForm
SMT
Value before the SMT processes the record
{
"id": "624e6565-99ee-4fdb-9228-653138c3a7b3",
"source": "/debezium/postgresql/book",
"specversion": "1.0",
"type": "BookCreated",
"time": "2023-11-11T07:11:01.825Z",
"datacontenttype": "application/json",
"data": {
"id": 4,
"name": "1984",
"publicationYear": 1949
}
}
Value after the SMT processes the record
{
"id": "624e6565-99ee-4fdb-9228-653138c3a7b3",
"source": "/debezium/postgresql/book",
"type": "BookCreated",
"payload": "{"id": 4, "name": "1984", "publicationYear": 1949}"
}
Configuration options
The following table lists the configuration options that you can use with the ConvertCloudEventToSaveableForm
SMT.
Property | Description | Type | Default | Valid Values | Importance |
A comma-separated list of pairs each of which contains the name of a CloudEvents field followed by the name of a target table column | list | No default value | non-empty list | high | |
Serializer type that is used to serialize and deserialize CloudEvents. Should have the same value as | string | No default value |
| high | |
CloudEvents schema name under which the schema is registered in a Schema Registry. | string | No default value | low |