MongoDB New Document State Extraction

Change event structure

The Debezium MongoDB connector generates change events that have a complex structure. Each event message includes the following parts:

Source metadata

Includes, but is not limited to the following fields:

  • Type of the operation that changed data in the collection (create/insert, update, or delete).

  • Name of the database and collection in which the change occurred.

  • Timestamp that identifies when the change was made.

  • Optional transaction information.

Document data

  • before data

    This field is present in environments that run MongoDB 6.0 and later when the capture.mode for the Debezium connector is set to one of the following values:

    • change_streams_with_pre_image.

    • change_streams_update_full_with_pre_image.

      For more information, see MongoDB pre-image support

    after data

    JSON strings that represent the values that are present in a document after the current operation. The presence of an after field in an event message depends on the type of event and the connector configuration. A create event for a MongoDB insert operation always contain an after field, regardless of the capture.mode setting. For update events, the after field is present only when capture.mode is set to one of the following values:

    • change_streams_update_full

    • change_streams_update_full_with_pre_image.

      The after value in a change event message does not necessarily represent the state of a document immediately following the event. The value is not calculated dynamically; instead, after the connector captures a change event, it queries the collection to retrieve the current value of the document.

      For example, imagine a situation in which multiple operations, a, b, and c modify a document in quick succession. When the connector processes, change a, it queries the collection for the full document. In the meantime, changes b and c occur. When the connector receives a response to its query for the full document for change a, it might receive a version of the document that is based on the subsequent changes for b or c. For more information, see the documentation for the capture.mode property.

The following fragment shows the basic structure of a create change event that the connector emits after a MongoDB insert operation:

  1. {
  2. "op": "c",
  3. "after": "{\"field1\":\"newvalue1\",\"field2\":\"newvalue1\"}",
  4. "source": { ... }
  5. }

The complex format of the after field in the preceding example provides detailed information about changes that occur in the source database. However, some consumers cannot process messages that contain nested values. To convert the complex nested fields of the original message into a simpler, more universally compatible structure, use the event flattening SMT for MongoDB. The SMT flattens the structure of nested fields in a message, as shown in the following example:

  1. {
  2. "field1" : "newvalue1",
  3. "field2" : "newvalue2"
  4. }

For more information about the default structure of messages produced by the Debezium MongoDB connector, see the connector documentation.

Behavior

The event flattening SMT for MongoDB extracts the after field from create or update change event messages emitted by the Debezium MongoDB connector. After the SMT processes the original change event message, it generates a simplified version that contains only the contents of the after field.

Depending on your use case, you can apply the ExtractNewDocumentState SMT to the Debezium MongoDB connector, or to a sink connector that consumes messages that the Debezium connector produces. If you apply the SMT to the Debezium MongoDB connector, the SMT modifies messages that the connector emits before they are sent to Apache Kafka. To ensure that Kafka retains the complete Debezium change event message in its original format, apply the SMT to a sink connector.

When you use the event flattening SMT to process a message emitted from a MongoDB connector, the SMT converts the structure of the records in the original message into properly typed Kafka Connect records that can be consumed by a typical sink connector. For example, the SMT converts the JSON strings that represent the after information in the original message into schema structures that any consumer can process.

Optionally, you can configure the event flattening SMT for MongoDB to modify messages in other ways during processing. For more information, see the configuration topic.

Configuration

Configure the event flattening (ExtractNewDocumentState) SMT for MongoDB for sink connectors that consume the messages emitted by the Debezium MongoDB connector.

Basic configuration

To obtain the default behavior of the SMT, add the SMT to the configuration of a sink connector without specifying any options, as in the following example:

  1. transforms=unwrap,...
  2. transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState

As with any Kafka Connect connector configuration, you can set transforms= to multiple, comma-separated, SMT aliases. Kafka Connect applies the transformations that you specify in the order in which they are listed.

You can set multiple options for a connector that uses the MongoDB event flattening SMT. The following example shows a configuration that sets the drop.tombstones, delete.handling.mode, and add.headers options for a connector:

  1. transforms=unwrap,...
  2. transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
  3. transforms.unwrap.drop.tombstones=false
  4. transforms.unwrap.delete.handling.mode=drop
  5. transforms.unwrap.add.headers=op

For more information about the configuration options in the preceding example, see the configuration topic.

Customizing the configuration

The connector might emit many types of event messages (for example, heartbeat messages, tombstone messages, or metadata messages about transactions). To apply the transformation to a subset of events, you can define an SMT predicate statement that selectively applies the transformation to specific events only.

Array encoding

By default, the event flattening SMT converts MongoDB arrays into arrays that are compatible with Apache Kafka Connect, or Apache Avro schemas. While MongoDB arrays can contain multiple types of elements, all elements in a Kafka array must be of the same type.

To ensure that the SMT encodes arrays in a way that meets the needs of your environment, you can specify the array.encoding configuration option. The following example shows the configuration for setting the array encoding:

  1. transforms=unwrap,...
  2. transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
  3. transforms.unwrap.array.encoding=<array|document>

Depending on the configuration, the SMT processes each instance of an array in the source message by using one of the following encoding methods:

array encoding

If array.encoding is set to array (the default), the SMT encodes uses the array datatype to encode arrays in the original message. To ensure correct processing, all elements in an array instance must be of the same type. This option is a restricting one, but it enables downstream clients to easily process arrays.

document encoding

If array.encoding is set to document, the SMT converts each array in the source into a struct of structs, in a manner that is similar to BSON serialization. The main struct contains fields named _0, _1, _2, and so on, where each field name represents the index of an element in the original array. The SMT populates each of these index fields with the values that it retrieves for the equivalent element in the source array. Index names are prefixed with underscores, because Avro encoding prohibits field names that begin with a numeric character.

The following example shows how the Debezium MongoDB connector represents a database document that contains an array that includes heterogeneous data types:

Example 1. Example: Document encoding of an array that contains multiple data types

  1. {
  2. "_id": 1,
  3. "a1": [
  4. {
  5. "a": 1,
  6. "b": "none"
  7. },
  8. {
  9. "a": "c",
  10. "d": "something"
  11. }
  12. ]
  13. }

If the array.encoding is set to document, the SMT converts the preceding document into the following format:

  1. {
  2. "_id": 1,
  3. "a1": {
  4. "_0": {
  5. "a": 1,
  6. "b": "none"
  7. },
  8. "_1": {
  9. "a": "c",
  10. "d": "something"
  11. }
  12. }
  13. }

The document encoding option enables the SMT to process arbitrary arrays that are comprised of heterogeneous elements. However, before you use this option, always verify that the sink connector and other downstream consumers are capable of processing arrays that contain multiple data types.

Nested structure flattening

When a database operation involves an embedded document, the Debezium MongoDB connector emits a Kafka event record that has a structure that reflects the hierarchical structure of the original document. That is, the event message represents nested documents as a set of nested field structure. In environments where downstream connectors cannot process messages that contain nested structures, you can configure the event flattening SMT to flatten hierarchical structures in the message. A flat message structure is better suited to table-like storage.

To configure the SMT to flatten nested structures, set the flatten.struct configuration option to true. In the converted message, field names are constructed to be consistent with the document source. The SMT renames each flattened field by concatenating the name of the parent document field with the name of the nested document field. A delimiter that is defined by the flatten.struct.delimiter option separates the components of the name. The default value of struct.delimiter is an underscore character (_).

The following example shows the configuration for specifying whether the SMT flattens nested structures:

  1. transforms=unwrap,...
  2. transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
  3. transforms.unwrap.flatten.struct=<true|false>
  4. transforms.unwrap.flatten.struct.delimiter=<string>

The following example shows an event message that is emitted by the MongoDB connector. The message includes a field for a document a that contains fields for two nested documents, b and c:

  1. {
  2. "_id": 1,
  3. "a": {
  4. "b": 1,
  5. "c": "none"
  6. },
  7. "d": 100
  8. }

The message in the following example shows the output after the SMT for MongoDB flattens the nested structures in the preceding message:

  1. {
  2. "_id": 1,
  3. "a_b": 1,
  4. "a_c": "none",
  5. "d": 100
  6. }

In the resulting message, the b and c fields that were nested in the original message are flattened and renamed. The renamed fields are formed by concatenating the name of the parent document a with the names of the nested documents: a_b and a_c. The components of the new field names are separated by an underscore character, as defined by the setting of the struct.delimiter configuration property,

MongoDB $unset handling

In MongoDB, the $unset operator and the $rename operator both remove fields from a document. Because MongoDB collections are schemaless, after an update removes fields from a document, it’s not possible to infer the name of the missing field from the updated document. To support sink connectors or other consumers that might require information about removed fields, Debezium emits update messages that include a removedFields element that lists the names of the deleted fields.

The following example shows part of an update message for an operation that results in the removal of the field a:

  1. "payload": {
  2. "op": "u",
  3. "ts_ms": "...",
  4. "ts_us" : "...",
  5. "ts_ns" : "...",
  6. "before": "{ ... }",
  7. "after": "{ ... }",
  8. "updateDescription": {
  9. "removedFields": ["a"],
  10. "updatedFields": null,
  11. "truncatedArrays": null
  12. }
  13. }

In the preceding example, the before and after represent the state of the source document before and after the document was updated. These fields are present in the event message that a connector emits only if the capture.mode for the connector is set as described in the following list:

before field

Provides the state of the document before the change. This field is present only when capture.mode is set to one of the following values:

  • change_streams_with_pre_image

  • change_streams_update_full_with_pre_image.

after field

Provides the full state of the document after a change. This field is present only when capture.mode is set to one of the following values:

  • change_streams_update_full

  • change_streams_update_full_with_pre_image.

Assuming a connector that is configured to capture full documents, when the ExtractNewDocumentState SMT receives an update message for an $unset event, the SMT re-encodes the message by representing the removed field has a null value, as shown in the following example:

  1. {
  2. "id": 1,
  3. "a": null
  4. }

For connectors that are not configured to capture full documents, when the SMT receives an update event for an $unset operation, it produces the following output message:

  1. {
  2. "a": null
  3. }

Determine original operation

After the SMT flattens an event message, the resulting message no longer indicates whether the operation that generated the event was of type create, update or initial snapshot read. Typically, you can identify delete operations by configuring the connectors to expose information about the tombstone or rewrite events that accompany a deletion. For more information about configuring the connector to expose information about tombstones and rewrites in event messages, see the drop.tombstones and delete.handling.mode properties.

To report the type of a database operation in an event message, the SMT can add an op field to one of the following elements:

  • The event message body.

  • A message header.

For example, to add a header property that shows the type of the original operation, add the transform, and then add the add.headers property to the connector configuration, as in the following example:

  1. transforms=unwrap,...
  2. transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
  3. transforms.unwrap.add.headers=op

Based on the preceding configuration, the SMT reports the event type by adding an op header to the message and assigning it a string value to identify the type of the operation. The assigned string value is based on the op field value in the original MongoDB change event message.

Adding metadata fields

The event flattening SMT for MongoDB can add metadata fields from the original change event message to the simplified message. The added metadata fields are prefixed with a double underscore ("__"). Adding metadata to the event record makes it possible to include content such as the name of the collection in which a change event occurred, or to include connector-specific fields, such as a replica set name. Currently, the SMT can add fields from the following change event sub-structures only: source, transaction and updateDescription.

For more information about the MongoDB change event structure, see the MongoDB connector documentation.

For example, you might specify the following configuration to add the replica set name (rs) and the collection name for a change event to the final flattened event record:

  1. transforms=unwrap,...
  2. transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
  3. transforms.unwrap.add.fields=rs,collection

The preceding configuration results in the following content being added to the flattened record:

  1. { "__rs" : "rs0", "__collection" : "my-collection", ... }

If you want the SMT to add metadata fields to delete events, set the value of the delete.handling.mode option to rewrite.

Options for applying the transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages.

For more information about how to apply the SMT selectively, see Configure an SMT predicate for the transformation.

Configuration options

The following table describes the configuration options for the MongoDB event flattening SMT.

PropertyDefaultDescription

array

Specifies the format that the SMT uses when it encodes arrays that it reads from the original event message. Set one of the following options:

    array

    The SMT uses the array datatype to encode MongoDB arrays into a format that is compatible with Apache Kafka Connect or Apache Avro schemas. If you set this option, verify that the elements in each array instance are of the same type. Although MongoDB allows arrays to contain multiple data types, some downstream clients cannot process arrays.

    document

    The SMT converts each MongoDB array into a struct of structs, in a manner that is similar to BSON serialization. The main struct contains fields with the names 0, _1, _2, and so forth. To comply with Avro naming standards, the SMT prefixes the numeric name of each index field with an underscore. Each of the numeric field names represents the index of an element in the original array. The SMT populates each of these index fields with the value that it retrieves from the source document for the designated array element.

For more information about the array.coding option, see the options for encoding arrays in MongoDB event messages.

false

The SMT flattens structures (structs) in the original event message by concatenating the names of nested properties in the message, separated by a configurable delimiter, to form a simple field name.

When flatten.struct is set to true, specifies the delimiter that the transformation inserts between field names that it concatenates from the input record to generate field names in the output record.

true

Debezium generates a tombstone record for each delete operation. The default behavior is that event flattening SMT removes tombstone records from the stream. To retain tombstone records in the stream, specify drop.tombstones=false.

This option is scheduled for removal in a future release. In its place, use the delete.tombstone.handling.mode option.

drop

Specifies how the SMT handles the change event records that Debezium generates for delete operations. Set one of the following options:

    drop

    The SMT removes records for delete operations from the event stream.

    none

    The SMT retains the original change event record from the event stream. The record contains only “value”: “null”.

    rewrite

    The SMT retains a modified version of the change event record from the stream. To provide another way to indicate that the record was deleted, the modified record includes a value field that contains the key/value pairs that were from the original record, and adds deleted: true to the value.

    If you set the rewrite option, you might find that the updated, simplified records for DELETE operations are sufficient for tracking deleted records. In such a case, you might want the SMT to drop tombstone records.

This option is scheduled for removal in a future release. In its place, use the delete.tombstone.handling.mode option.

No default

Debezium generates a change event record for each DELETE operation. This setting determines how the MongoDB event flattening SMT handles DELETE events in the stream.

The setting for this option takes precedence over any conflicting settings that you might configure for the deprecated drop.tombstones or delete.handling.mode options.

Set one of the following options:

    drop

    The SMT removes both DELETE events and TOMBSTONErecords from the stream.

    tombstone (default)

    The SMT retains TOMBSTONE records in the stream. The TOMBSTONE record contains only the following value: “value”: “null”.

    rewrite

    The SMT retains the change event record in the stream and makes the following changes:

    • Adds a value field to the record that contains the key/value pairs from the before field of the original record.

    • Adds deleted: true to the value of the record.

    • Removes TOMBSTONE records.

      This setting provides another way to indicate that the record has been deleted.

    rewrite-with-tombstone

    The SMT behaves as it does when you select the rewrite option, except that it also retains TOMBSTONE records.

(double-underscore)

Set this optional string to prefix a header.

No default

Specifies a comma-separated list, with no spaces, of metadata fields that you want the SMT to add to the header of simplified messages. When the original message contains duplicate field names, you can identify the specific field to modify by providing the name of the struct together with the name of the field, for example, source.ts_ms.

Optionally, you can override the original name of a field and assign it a new name by adding an entry in the following format to the list:

<field_name>:<new_field_name>.

For example:

  1. version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP

The new name values that you specify are case-sensitive.

When the SMT adds metadata fields to the header of the simplified message, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name.

If you specify a field that is not in the change event original message, the SMT does not add the field to the header.

(double-underscore)

Specifies an optional string to prefix to a field name.

No default

Set this option to a comma-separated list, with no spaces, of metadata fields to add to the value element of the simplified Kafka message. When the original message contains duplicate field names, you can identify the specific field to modify by providing the name of the struct together with the name of the field, for example, source.ts_ms.
Optionally, you can override the original name of a field and assign it a new name by adding an entry in the following format to the list:

<field_name>:<new_field_name>.

For example:

  1. version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP

The new name values that you specify are case-sensitive.

When the SMT adds metadata fields to the value element of the simplified message, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name.

If you specify a field that is not present in the original change event message, the SMT still adds the specified field to the value element of the modified message.

Known limitations

  • Because MongoDB is a schemaless database, to ensure consistent column definitions when you use Debezium to stream changes to a schema-based data relational database, fields within a collection that have the same name must store the same type of data.

  • Configure the SMT to produce messages in the format that is compatible with the sink connector. If a sink connector requires a “flat” message structure, but it receives a message that encodes an array in the source MongoDB document as a struct of structs, the sink connector cannot process the message.