ConvertCloudEventToSaveableForm

Example

To convert a CloudEvent to a form suitable for JdbcSinkConnector, configure the ConvertCloudEventToSaveableForm SMT in the Kafka Connect configuration for a connector. For example, to convert CloudEvents that were serialized and deserialized with JSON provided that the target database has id, source, type, and payload columns, use the following configuration for the SMT:

  1. "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
  2. ...
  3. "transforms": "convertCloudEvent",
  4. "transforms.convertCloudEvent.type": "io.debezium.connector.jdbc.transforms.ConvertCloudEventToSaveableForm",
  5. "transforms.convertCloudEvent.fields.mapping": "id,source,type,data:payload",
  6. "transforms.convertCloudEvent.serializer.type": "json",
  7. "value.converter": "io.debezium.converters.CloudEventsConverter",
  8. "value.converter.serializer.type": "json",
  9. "value.converter.data.serializer.type": "json"
  10. ...

The following example shows the value of a record before and after the transformation is applied.

Example 1. Effect of applying the ConvertCloudEventToSaveableForm SMT

Value before the SMT processes the record

  1. {
  2. "id": "624e6565-99ee-4fdb-9228-653138c3a7b3",
  3. "source": "/debezium/postgresql/book",
  4. "specversion": "1.0",
  5. "type": "BookCreated",
  6. "time": "2023-11-11T07:11:01.825Z",
  7. "datacontenttype": "application/json",
  8. "data": {
  9. "id": 4,
  10. "name": "1984",
  11. "publicationYear": 1949
  12. }
  13. }

Value after the SMT processes the record

  1. {
  2. "id": "624e6565-99ee-4fdb-9228-653138c3a7b3",
  3. "source": "/debezium/postgresql/book",
  4. "type": "BookCreated",
  5. "payload": "{"id": 4, "name": "1984", "publicationYear": 1949}"
  6. }

Configuration options

The following table lists the configuration options that you can use with the ConvertCloudEventToSaveableForm SMT.

Table 1. ConvertCloudEventToSaveableForm SMT configuration options

Property

Description

Type

Default

Valid Values

Importance

A comma-separated list of pairs each of which contains the name of a CloudEvents field followed by the name of a target table column

list

No default value

non-empty list

high

Serializer type that is used to serialize and deserialize CloudEvents. Should have the same value as value.converter.serializer.type.

string

No default value

json or avro

high