MongoDB Outbox Event Router
Example outbox message
To understand how to configure the Debezium MongoDB outbox event router SMT, consider the following example of a Debezium outbox message:
# Kafka Topic: outbox.event.order
# Kafka Message key: "b2730779e1f596e275826f08"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
"{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}
A Debezium connector that is configured to apply the MongoDB outbox event router SMT generates the preceding message by transforming a raw Debezium change event message as in the following example:
# Kafka Message key: { "id": "{\"$oid\": \"596e275826f08b2730779e1f\"}" }
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
"patch": null,
"after": "{\"_id\": {\"$oid\": \"596e275826f08b2730779e1f\"}, \"aggregateid\": {\"$oid\": \"b2730779e1f596e275826f08\"}, \"aggregatetype\": \"Order\", \"type\": \"OrderCreated\", \"payload\": {\"_id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}}",
"source": {
"version": "1.9.6.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"snapshot": false,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 31,
"h": 1546547425148721999
},
"op": "c",
"ts_ms": 1556890294484
}
This example of a Debezium outbox message is based on the default outbox event router configuration, which assumes an outbox collection structure and event routing based on aggregates. To customize behavior, the outbox event router SMT provides numerous configuration options.
Basic outbox collection
To apply the default MongoDB outbox event router SMT configuration, your outbox collection is assumed to have the following fields:
{
"_id": "objectId",
"aggregatetype": "string",
"aggregateid": "objectId",
"type": "string",
"payload": "object"
}
Field | Effect |
---|---|
| Contains the unique ID of the event. In an outbox message, this value is a header. You can use this ID, for example, to remove duplicate messages. |
Contains a value that the SMT appends to the name of the topic to which the connector emits an outbox message. The default behavior is that this value replaces the default | |
| Contains the event key, which provides an ID for the payload. The SMT uses this value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions. |
| A representation of the outbox change event. The default structure is JSON. By default, the Kafka message value is solely comprised of the |
Additional custom fields | Any additional fields from the outbox collection can be added to outbox events either within the payload section or as a message header. |
Basic configuration
To configure a Debezium connector to support the outbox pattern, configure the outbox.EventRouter
SMT. The following example shows the basic configuration for the SMT in a .properties
file:
transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
Customizing the configuration
The connector might emit many types of event messages (for example, heartbeat messages, tombstone messages, or metadata messages about transactions). To apply the transformation only to events that originate in the outbox collection, define an SMT predicate statement that selectively applies the transformation to those events only.
Options for applying the transformation selectively
In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages. You can use one of the following methods to configure the connector to apply the SMT selectively:
Use the route.topic.regex configuration option for the SMT.
Using Avro as the payload format
The MongoDB outbox event router SMT supports arbitrary payload formats. The payload
field value in an outbox collection is passed on transparently. An alternative to working with JSON is to use Avro. This can be beneficial for message format governance and for ensuring that outbox event schemas evolve in a backwards-compatible way.
How a source application produces Avro formatted content for outbox message payloads is out of the scope of this documentation. One possibility is to leverage the KafkaAvroSerializer
class to serialize GenericRecord
instances. To ensure that the Kafka message value is the exact Avro binary data, apply the following configuration to the connector:
transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
value.converter=io.debezium.converters.ByteArrayConverter
By default, the payload
field value (the Avro data) is the only message value. Configuration of ByteArrayConverter
as the value converter propagates the payload
field value as-is into the Kafka message value.
Note that this differs from the ByteBufferConverter
suggested for other SMTs. This is due to the different approach MongoDB takes to storing byte arrays internally.
The Debezium connectors may be configured to emit heartbeat, transaction metadata, or schema change events (support varies by connector). These events cannot be serialized by the ByteArrayConverter
so additional configuration must be provided so the converter knows how to serialize these events. As an example, the following configuration illustrates using the Apache Kafka JsonConverter
with no schemas:
transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
value.converter=io.debezium.converters.ByteArrayConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false
The delegate Converter
implementation is specified by the delegate.converter.type
option. If any extra configuration options are needed by the converter, they can also be specified, such as the disablement of schemas shown above using schemas.enable=false
.
Emitting messages with additional fields
Your outbox collection might contain fields whose values you want to add to the emitted outbox messages. For example, consider an outbox collection that has a value of purchase-order
in the aggregatetype
field and another field, eventType
, whose possible values are order-created
and order-shipped
. Additional fields can be added with the syntax field:placement:alias
.
The allowed values for placement
are: - header
- envelope
- partition
To emit the eventType
field value in the outbox message header, configure the SMT like this:
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:header:type
The result will be a header on the Kafka message with type
as its key, and the value of the eventType
field as its value.
To emit the eventType
field value in the outbox message envelope, configure the SMT like this:
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:envelope:type
To control which partition the outbox message is produced on, configure the SMT like this:
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=partitionField:partition
Note that for the partition
placement, adding an alias will have no effect.
Expanding escaped JSON string as JSON
By default, the payload
of the Debezium outbox message is represented as a string. When the original source of the string is in JSON format, the resulting Kafka message uses escape sequences to represent the string, as shown in the following example:
# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
"{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}
You can configure the outbox event router to expand the message content, converting the escaped JSON back to its original, unescaped JSON format. In the converted string, the companion schema is deduced from the original JSON document. The following examples shows the expanded JSON in the resulting Kafka message:
# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
"id": "da8d6de63b7745ff8f4457db", "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123
}
To enable string conversion in the transformation, set the value of collection.expand.json.payload
to true
and use the StringConverter
as shown in the following example:
transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
transforms.outbox.collection.expand.json.payload=true
value.converter=org.apache.kafka.connect.storage.StringConverter
Configuration options
The following table describes the options that you can specify for the outbox event router SMT. In the table, the Group column indicates a configuration option classification for Kafka.
Option | Default | Group | Description |
---|---|---|---|
| Collection | Determines the behavior of the SMT when there is an update operation on the outbox collection. Possible settings are:
All changes in an outbox collection are expected to be an insert or delete operation. That is, an outbox collection functions as a queue; updates to documents in an outbox collection are not allowed. The SMT automatically filters out delete operations (for removing proceeded outbox events) on an outbox collection. | |
| Collection | Specifies the outbox collection field that contains the unique event ID. This ID will be stored in the emitted event’s headers under the | |
| Collection | Specifies the outbox collection field that contains the event key. When this field contains a value, the SMT uses that value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions. | |
Collection | By default, the timestamp in the emitted outbox message is the Debezium event timestamp. To use a different timestamp in outbox messages, set this option to an outbox collection field that contains the timestamp that you want to be in emitted outbox messages. | ||
| Collection | Specifies the outbox collection field that contains the event payload. | |
| Collection | Specifies whether the JSON expansion of a String payload should be done. If no content found or in case of parsing error, the content is kept “as is”. | |
Collection, Envelope | Specifies one or more outbox collection fields that you want to add to outbox message headers or envelopes. Specify a comma-separated list of pairs. In each pair, specify the name of a field and whether you want the value to be in the header or the envelope. Separate the values in the pair with a colon, for example:
To specify an alias for the field, specify a trio with the alias as the third value, for example:
The second value is the placement and it must always be Configuration examples are in emitting additional fields in Debezium outbox messages. | ||
Collection, Schema | When set, this value is used as the schema version as described in the Kafka Connect Schema Javadoc. | ||
| Router | Specifies the name of a field in the outbox collection. By default, the value specified in this field becomes a part of the name of the topic to which the connector emits the outbox messages. For an example, see the description of the expected outbox collection. | |
| Router | Specifies a regular expression that the outbox SMT applies in the RegexRouter to outbox collection documents. This regular expression is part of the setting of the route.topic.replacement SMT option. + The default behavior is that the SMT replaces the default | |
| Router | Specifies the name of the topic to which the connector emits outbox messages. The default topic name is + To change the topic name, you can:
| |
| Router | Indicates whether an empty or | |
| Tracing | The name of the field containing tracing span context. | |
| Tracing | The operation name representing the Debezium processing span. | |
| Tracing | When |
Distributed tracing
The outbox event routing SMT has support for distributed tracing. See tracing documentation for more details.