- Debezium connector for MongoDB
- Overview
- How the MongoDB connector works
- Data change events
- Setting up MongoDB
- Deployment
- Monitoring
- MongoDB connector common issues
- Configuration and startup errors
- MongoDB becomes unavailable
- Connector Unable to Start - InvalidResumeToken or ChangeStreamHistoryLost
- Kafka Connect process stops gracefully
- Kafka Connect process crashes
- Kafka becomes unavailable
- Connector fails after it is stopped for a long interval if
snapshot.mode
is set toinitial
- MongoDB loses writes
Debezium connector for MongoDB
Overview
MongoDB’s replication mechanism provides redundancy and high availability, and is the preferred way to run MongoDB in production. MongoDB connector captures the changes in a replica set or sharded cluster.
A MongoDB replica set consists of a set of servers that all have copies of the same data, and replication ensures that all changes made by clients to documents on the replica set’s primary are correctly applied to the other replica set’s servers, called secondaries. MongoDB replication works by having the primary record the changes in its oplog (or operation log), and then each of the secondaries reads the primary’s oplog and applies in order all of the operations to their own documents. When a new server is added to a replica set, that server first performs an snapshot of all of the databases and collections on the primary, and then reads the primary’s oplog to apply all changes that might have been made since it began the snapshot. This new server becomes a secondary (and able to handle queries) when it catches up to the tail of the primary’s oplog.
Change streams
Although the Debezium MongoDB connector does not become part of a replica set, it uses a similar replication mechanism to obtain oplog data. The main difference is that the connector does not read the oplog directly. Instead, it delegates the capture and decoding of oplog data to the MongoDB change streams feature. With change streams, the MongoDB server exposes the changes that occur in a collection as an event stream. The Debezium connector monitors the stream and then delivers the changes downstream. The first time that the connector detects a replica set, it examines the oplog to obtain the last recorded transaction, and then performs a snapshot of the primary’s databases and collections. After the connector finishes copying the data, it creates a change stream beginning from the oplog position that it read earlier.
As the MongoDB connector processes changes, it periodically records the position at which the event originated in the oplog stream. When the connector stops, it records the last oplog stream position that it processed, so that after a restart it can resume streaming from that position. In other words, the connector can be stopped, upgraded or maintained, and restarted some time later, and always pick up exactly where it left off without losing a single event. Of course, MongoDB oplogs are usually capped at a maximum size, so if the connector is stopped for long periods, operations in the oplog might be purged before the connector has a chance to read them. In this case, after a restart the connector detects the missing oplog operations, performs a snapshot, and then proceeds to stream changes.
The MongoDB connector is also quite tolerant of changes in membership and leadership of the replica sets, of additions or removals of shards within a sharded cluster, and network problems that might cause communication failures. The connector always uses the replica set’s primary node to stream changes, so when the replica set undergoes an election and a different node becomes primary, the connector will immediately stop streaming changes, connect to the new primary, and start streaming changes using the new primary node. Similarly, if connector is unable to communicate with the replica set primary, it attempts to reconnect (using exponential backoff so as to not overwhelm the network or replica set). After connection is reestablished, the connector continues to stream changes from the last event that it captured. In this way the connector dynamically adjusts to changes in replica set membership, and automatically handles communication disruptions.
Additional resources
Read Preference
You specify read preferences for a MongoDB connection by setting the readPreference
parameter in the mongodb.connection.string.
How the MongoDB connector works
An overview of the MongoDB topologies that the connector supports is useful for planning your application.
When a MongoDB connector is configured and deployed, it starts by connecting to the MongoDB servers at the seed addresses, and determines the details about each of the available replica sets. Since each replica set has its own independent oplog, the connector will try to use a separate task for each replica set. The connector can limit the maximum number of tasks it will use, and if not enough tasks are available the connector will assign multiple replica sets to each task, although the task will still use a separate thread for each replica set.
When running the connector against a sharded cluster, use a value of |
Supported MongoDB topologies
The MongoDB connector supports the following MongoDB topologies:
MongoDB replica set
The Debezium MongoDB connector can capture changes from a single MongoDB replica set. Production replica sets require a minimum of at least three members.
To use the MongoDB connector with a replica set, you must set the value of the mongodb.connection.string
property in the connector configuration to the replica set connection string. When the connector is ready to begin capturing changes from a MongoDB change stream, it starts a connection task. The connection task then uses the specified connection string to establish a connection to an available replica set member.
MongoDB sharded cluster
A MongoDB sharded cluster consists of:
One or more shards, each deployed as a replica set;
A separate replica set that acts as the cluster’s configuration server
One or more routers (also called
mongos
) to which clients connect and that routes requests to the appropriate shardsTo use the MongoDB connector with a sharded cluster, in the connector configuration, set the value of the
mongodb.connection.string
property to the sharded cluster connection string.
MongoDB standalone server
The MongoDB connector is not capable of monitoring the changes of a standalone MongoDB server, since standalone servers do not have an oplog. The connector will work if the standalone server is converted to a replica set with one member.
MongoDB does not recommend running a standalone server in production. For more information, see the MongoDB documentation. |
Required user permissions
To capture data from MongoDB, Debezium attaches to the database as a MongoDB user. The MongoDB user account that you create for Debezium requires specific database permissions to read from the database. The connector user requires the following permissions:
Read from the database.
Run the
hello
command.
The connector user might also require the following permission:
- Read from the
config.shards
system collection.
Database read permissions
The connector user must be able to read from all databases, or to read from a specific database, depending on the value of the connector’s capture.scope property. Assign one of the following permissions to the user, depending on the capture.scope
setting:
capture.scope
is set to deployment
Grant the user permission to read any database.
capture.scope
is set to database
Grant the user permission to read the database specified by the connector’s capture.target property.
Permission to use the MongoDB hello
command
Regardless of the capture.scope
setting, the user requires permission to run the MongoDB hello command.
Permission to read the config.shards
collection
Depending on your Debezium environment, to enable the connector to perform offset consolidation, you must grant the connector user explicit permission to read the config.shards
collection. Permission to read the config.shards
collection is required for the following connector environments:
Connectors upgraded from Debezium 2.5 or earlier.
Connectors configured to capture changes from a sharded MongoDB cluster.
Logical connector name
The connector configuration property topic.prefix
serves as a logical name for the MongoDB replica set or sharded cluster. The connector uses the logical name in a number of ways: as the prefix for all topic names, and as a unique identifier when recording the change stream position of each replica set.
You should give each MongoDB connector a unique logical name that meaningfully describes the source MongoDB system. We recommend logical names begin with an alphabetic or underscore character, and remaining characters that are alphanumeric or underscore.
Offset consolidation
The Debezium MongoDB connector no longer supports replica_set
connections to sharded MongoDB deployments. As a result, the offsets recorded by connector versions that used the replica_set
connection mode are incompatible with the current version.
To minimize the affects of the connection mode change, and to prevent the connector from running unnecessary snapshots, when the connector restarts after the upgrade, it runs a procedure to consolidate offsets. During this offset consolidation procedure, the connector completes the following steps to reconcile offsets that were recorded by the earlier connector version:
Offsets that were recorded by connector versions later than 2.5 are used as-is.
Offsets for events that were captured in
sharded
connection mode from sharded MongoDB deployments, or from MongoDB replica set deployments, are used as-is.Shard-specific offsets that were recorded by connector versions 2.5.x and earlier are used as-is, if both of the following conditions apply:
The offsets exist for all current database shards.
Offset invalidation is enabled.
If offset invalidation is disabled, the connector fails to start.
After the connector processes existing offsets in the preceding steps, it resumes streaming changes, and then commits offsets for new events that it captures.
If the offset consolidation procedure does not detect any existing offsets, the connector performs an initial snapshot.
Performing a snapshot
When a Debezium task starts to use a replica set, it uses the connector’s logical name and the replica set name to find an offset that describes the position where the connector previously stopped reading changes. If an offset can be found and it still exists in the oplog, then the task immediately proceeds with streaming changes, starting at the recorded offset position.
However, if no offset is found, or if the oplog no longer contains that position, the task must first obtain the current state of the replica set contents by performing a snapshot. This process starts by recording the current position of the oplog and recording that as the offset (along with a flag that denotes a snapshot has been started). The task then proceeds to copy each collection, spawning as many threads as possible (up to the value of the snapshot.max.threads
configuration property) to perform this work in parallel. The connector records a separate read event for each document it sees. Each read event contains the object’s identifier, the complete state of the object, and source information about the MongoDB replica set where the object was found. The source information also includes a flag that denotes that the event was produced during a snapshot.
This snapshot will continue until it has copied all collections that match the connector’s filters. If the connector is stopped before the tasks’ snapshots are completed, upon restart the connector begins the snapshot again.
Try to avoid task reassignment and reconfiguration while the connector performs snapshots of any replica sets. The connector generates log messages to report on the progress of the snapshot. To provide for the greatest control, run a separate Kafka Connect cluster for each connector. |
Setting | Description |
---|---|
| The connector performs a snapshot every time that it starts. After the snapshot completes, the connector begins to stream event records for subsequent database changes. |
| The connector performs a database snapshot as described in the default workflow for creating an initial snapshot. After the snapshot completes, the connector begins to stream event records for subsequent database changes. |
| The connector performs a database snapshot. After the snapshot completes, the connector stops, and does not stream event records for subsequent database changes. |
| Deprecated, see |
| The connector captures the structure of all relevant tables, performing all the steps described in the default snapshot workflow, except that it does not create |
| After the connector starts, it performs a snapshot only if it detects one of the following circumstances:
|
| Set the snapshot mode to |
| The |
For more information, see snapshot.mode in the table of connector configuration properties.
Ad hoc snapshots
By default, a connector runs an initial snapshot operation only after it starts for the first time. Following this initial snapshot, under normal circumstances, the connector does not repeat the snapshot process. Any future change event data that the connector captures comes in through the streaming process only.
However, in some situations the data that the connector obtained during the initial snapshot might become stale, lost, or incomplete. To provide a mechanism for recapturing collection data, Debezium includes an option to perform ad hoc snapshots. You might want to perform an ad hoc snapshot after any of the following changes occur in your Debezium environment:
The connector configuration is modified to capture a different set of collections.
Kafka topics are deleted and must be rebuilt.
Data corruption occurs due to a configuration error or some other problem.
You can re-run a snapshot for a collection for which you previously captured a snapshot by initiating a so-called ad-hoc snapshot. Ad hoc snapshots require the use of signaling collections. You initiate an ad hoc snapshot by sending a signal request to the Debezium signaling collection.
When you initiate an ad hoc snapshot of an existing collection, the connector appends content to the topic that already exists for the collection. If a previously existing topic was removed, Debezium can create a topic automatically if automatic topic creation is enabled.
Ad hoc snapshot signals specify the collections to include in the snapshot. The snapshot can capture the entire contents of the database, or capture only a subset of the collections in the database.
You specify the collections to capture by sending an execute-snapshot
message to the signaling collection. Set the type of the execute-snapshot
signal to incremental
or blocking
, and provide the names of the collections to include in the snapshot, as described in the following table:
Field | Default | Value |
---|---|---|
|
| Specifies the type of snapshot that you want to run. |
| N/A | An array that contains regular expressions matching the fully-qualified names of the collection to be snapshotted. |
Triggering an ad hoc incremental snapshot
You initiate an ad hoc incremental snapshot by adding an entry with the execute-snapshot
signal type to the signaling collection. After the connector processes the message, it begins the snapshot operation. The snapshot process reads the first and last primary key values and uses those values as the start and end point for each collection. Based on the number of entries in the collection, and the configured chunk size, Debezium divides the collection into chunks, and proceeds to snapshot each chunk, in succession, one at a time.
For more information, see Incremental snapshots.
Triggering an ad hoc blocking snapshot
You initiate an ad hoc blocking snapshot by adding an entry with the execute-snapshot
signal type to the signaling collection. After the connector processes the message, it begins the snapshot operation. The connector temporarily stops streaming, and then initiates a snapshot of the specified collection, following the same process that it uses during an initial snapshot. After the snapshot completes, the connector resumes streaming.
For more information, see Blocking snapshots.
Incremental snapshots
To provide flexibility in managing snapshots, Debezium includes a supplementary snapshot mechanism, known as incremental snapshotting. Incremental snapshots rely on the Debezium mechanism for sending signals to a Debezium connector. Incremental snapshots are based on the DDD-3 design document.
In an incremental snapshot, instead of capturing the full state of a database all at once, as in an initial snapshot, Debezium captures each collection in phases, in a series of configurable chunks. You can specify the collections that you want the snapshot to capture and the size of each chunk. The chunk size determines the number of rows that the snapshot collects during each fetch operation on the database. The default chunk size for incremental snapshots is 1024 rows.
As an incremental snapshot proceeds, Debezium uses watermarks to track its progress, maintaining a record of each collection row that it captures. This phased approach to capturing data provides the following advantages over the standard initial snapshot process:
You can run incremental snapshots in parallel with streamed data capture, instead of postponing streaming until the snapshot completes. The connector continues to capture near real-time events from the change log throughout the snapshot process, and neither operation blocks the other.
If the progress of an incremental snapshot is interrupted, you can resume it without losing any data. After the process resumes, the snapshot begins at the point where it stopped, rather than recapturing the collection from the beginning.
You can run an incremental snapshot on demand at any time, and repeat the process as needed to adapt to database updates. For example, you might re-run a snapshot after you modify the connector configuration to add a collection to its collection.include.list property.
Incremental snapshot process
When you run an incremental snapshot, Debezium sorts each collection by primary key and then splits the collection into chunks based on the configured chunk size. Working chunk by chunk, it then captures each collection row in a chunk. For each row that it captures, the snapshot emits a READ
event. That event represents the value of the row when the snapshot for the chunk began.
As a snapshot proceeds, it’s likely that other processes continue to access the database, potentially modifying collection records. To reflect such changes, INSERT
, UPDATE
, or DELETE
operations are committed to the transaction log as per usual. Similarly, the ongoing Debezium streaming process continues to detect these change events and emits corresponding change event records to Kafka.
How Debezium resolves collisions among records with the same primary key
In some cases, the UPDATE
or DELETE
events that the streaming process emits are received out of sequence. That is, the streaming process might emit an event that modifies a collection row before the snapshot captures the chunk that contains the READ
event for that row. When the snapshot eventually emits the corresponding READ
event for the row, its value is already superseded. To ensure that incremental snapshot events that arrive out of sequence are processed in the correct logical order, Debezium employs a buffering scheme for resolving collisions. Only after collisions between the snapshot events and the streamed events are resolved does Debezium emit an event record to Kafka.
Snapshot window
To assist in resolving collisions between late-arriving READ
events and streamed events that modify the same collection row, Debezium employs a so-called snapshot window. The snapshot windows demarcates the interval during which an incremental snapshot captures data for a specified collection chunk. Before the snapshot window for a chunk opens, Debezium follows its usual behavior and emits events from the transaction log directly downstream to the target Kafka topic. But from the moment that the snapshot for a particular chunk opens, until it closes, Debezium performs a de-duplication step to resolve collisions between events that have the same primary key..
For each data collection, the Debezium emits two types of events, and stores the records for them both in a single destination Kafka topic. The snapshot records that it captures directly from a table are emitted as READ
operations. Meanwhile, as users continue to update records in the data collection, and the transaction log is updated to reflect each commit, Debezium emits UPDATE
or DELETE
operations for each change.
As the snapshot window opens, and Debezium begins processing a snapshot chunk, it delivers snapshot records to a memory buffer. During the snapshot windows, the primary keys of the READ
events in the buffer are compared to the primary keys of the incoming streamed events. If no match is found, the streamed event record is sent directly to Kafka. If Debezium detects a match, it discards the buffered READ
event, and writes the streamed record to the destination topic, because the streamed event logically supersede the static snapshot event. After the snapshot window for the chunk closes, the buffer contains only READ
events for which no related transaction log events exist. Debezium emits these remaining READ
events to the collection’s Kafka topic.
The connector repeats the process for each snapshot chunk.
Incremental snapshots require that the primary key for each table is stably ordered. Because For more information about BSON string types in MongoDB, see the MongoDB documentation). |
Incremental snapshots for sharded clusters
To use incremental snapshots with sharded MongoDB clusters, you must set specific values for the following properties:
Set mongodb.connection.mode to
sharded
.Set incremental.snapshot.chunk.size to a value that is high enough to compensate for the increased complexity of change stream pipelines.
Triggering an incremental snapshot
Currently, the only way to initiate an incremental snapshot is to send an ad hoc snapshot signal to the signaling collection on the source database.
You submit a signal to the signaling collection by using the MongoDB insert()
method.
After Debezium detects the change in the signaling collection, it reads the signal, and runs the requested snapshot operation.
The query that you submit specifies the collections to include in the snapshot, and, optionally, specifies the type of snapshot operation. Currently, the only valid options for snapshots operations are incremental`and `blocking
.
To specify the collections to include in the snapshot, provide a data-collections
array that lists the collections or an array of regular expressions used to match collections, for example,{"data-collections": ["public.Collection1", "public.Collection2"]}
The data-collections
array for an incremental snapshot signal has no default value. If the data-collections
array is empty, Debezium detects that no action is required and does not perform a snapshot.
If the name of a collection that you want to include in a snapshot contains a dot ( |
Prerequisites
-
A signaling data collection exists on the source database.
The signaling data collection is specified in the signal.data.collection property.
Using a source signaling channel to trigger an incremental snapshot
Insert a snapshot signal document into the signaling collection:
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : <snapshotType>, "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": <snapshotType>}});
For example,
db.debeziumSignal.insert({ (1)
"type" : "execute-snapshot", (2) (3)
"data" : {
"data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], (4)
"type": "incremental"} (5)
});
The values of the
id
,type
, anddata
parameters in the command correspond to the fields of the signaling collection.The following table describes the parameters in the example:
Table 3. Descriptions of fields in a MongoDB insert() command for sending an incremental snapshot signal to the signaling collection Item Value Description 1
db.debeziumSignal
Specifies the fully-qualified name of the signaling collection on the source database.
2
null
The
_id
parameter specifies an arbitrary string that is assigned as theid
identifier for the signal request.
The insert method in the preceding example omits use of the optional_id
parameter. Because the document does not explicitly assign a value for the parameter, the arbitrary id that MongoDB automatically assigns to the document becomes theid
identifier for the signal request.
Use this string to identify logging messages to entries in the signaling collection. Debezium does not use this identifier string. Rather, during the snapshot, Debezium generates its ownid
string as a watermarking signal.3
execute-snapshot
Specifies
type
parameter specifies the operation that the signal is intended to trigger.4
data-collections
A required component of the
data
field of a signal that specifies an array of collection names or regular expressions to match collection names to include in the snapshot.
The array lists regular expressions which match collections by their fully-qualified names, using the same format as you use to specify the name of the connector’s signaling collection in the signal.data.collection configuration property.5
incremental
An optional
type
component of thedata
field of a signal that specifies the type of snapshot operation to run.
Currently supports theincremental
andblocking
types.
If you do not specify a value, the connector runs an incremental snapshot.
The following example, shows the JSON for an incremental snapshot event that is captured by a connector.
Example: Incremental snapshot event message
{
"before":null,
"after": {
"pk":"1",
"value":"New data"
},
"source": {
...
"snapshot":"incremental" (1)
},
"op":"r", (2)
"ts_ms":"1620393591654",
"ts_us":"1620393591654962",
"ts_ns":"1620393591654962147",
"transaction":null
}
Item | Field name | Description |
---|---|---|
1 |
| Specifies the type of snapshot operation to run. |
2 |
| Specifies the event type. |
Using the Kafka signaling channel to trigger an incremental snapshot
You can send a message to the configured Kafka topic to request the connector to run an ad hoc incremental snapshot.
The key of the Kafka message must match the value of the topic.prefix
connector configuration option.
The value of the message is a JSON object with type
and data
fields.
The signal type is execute-snapshot
, and the data
field must have the following fields:
Field | Default | Value | ||
---|---|---|---|---|
|
| The type of the snapshot to be executed. Currently Debezium supports the | ||
| N/A | An array of comma-separated regular expressions that match the fully-qualified names of tables to include in the snapshot. | ||
| N/A | An optional string that specifies a condition that the connector evaluates to designate a subset of records to include in a snapshot.
| ||
| N/A | An optional array of additional conditions that specifies criteria that the connector evaluates to designate a subset of records to include in a snapshot. |
An example of the execute-snapshot Kafka message:
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
Ad hoc incremental snapshots with additional-conditions
Debezium uses the additional-conditions
field to select a subset of a collection’s content.
Typically, when Debezium runs a snapshot, it runs a SQL query such as:
SELECT * FROM _<tableName>_ ….
When the snapshot request includes an additional-conditions
property, the data-collection
and filter
parameters of the property are appended to the SQL query, for example:
SELECT * FROM _<data-collection>_ WHERE _<filter>_ ….
For example, given a products
collection with the columns id
(primary key), color
, and brand
, if you want a snapshot to include only content for which color='blue'
, when you request the snapshot, you could add the additional-conditions
property to filter the content:
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue'"}]}}`
You can use the additional-conditions
property to pass conditions based on multiple columns. For example, using the same products
collection as in the previous example, if you want a snapshot to include only the content from the products
collection for which color='blue'
, and brand='MyBrand'
, you could send the following request:
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
Stopping an incremental snapshot
You can also stop an incremental snapshot by sending a signal to the collection on the source database. You submit a stop snapshot signal by inserting a document into the to the signaling collection. After Debezium detects the change in the signaling collection, it reads the signal, and stops the incremental snapshot operation if it’s in progress.
The query that you submit specifies the snapshot operation of incremental
, and, optionally, the collections of the current running snapshot to be removed.
Prerequisites
-
A signaling data collection exists on the source database.
The signaling data collection is specified in the signal.data.collection property.
Using a source signaling channel to stop an incremental snapshot
Insert a stop snapshot signal document into the signaling collection:
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : "stop-snapshot", "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": "incremental"}});
For example,
db.debeziumSignal.insert({ (1)
"type" : "stop-snapshot", (2) (3)
"data" : {
"data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], (4)
"type": "incremental"} (5)
});
The values of the
id
,type
, anddata
parameters in the signal command correspond to the fields of the signaling collection.The following table describes the parameters in the example:
Table 5. Descriptions of fields in an insert command for sending a stop incremental snapshot document to the signaling collection Item Value Description 1
db.debeziumSignal
Specifies the fully-qualified name of the signaling collection on the source database.
2
null
The insert method in the preceding example omits use of the optional
_id
parameter. Because the document does not explicitly assign a value for the parameter, the arbitrary id that MongoDB automatically assigns to the document becomes theid
identifier for the signal request.
Use this string to identify logging messages to entries in the signaling collection. Debezium does not use this identifier string.3
stop-snapshot
The
type
parameter specifies the operation that the signal is intended to trigger.4
data-collections
An optional component of the
data
field of a signal that specifies an array of collection names or regular expressions to match collection names to remove from the snapshot.
The array lists regular expressions which match collections by their fully-qualified names, using the same format as you use to specify the name of the connector’s signaling collection in the signal.data.collection configuration property. If this component of thedata
field is omitted, the signal stops the entire incremental snapshot that is in progress.5
incremental
A required component of the
data
field of a signal that specifies the type of snapshot operation that is to be stopped.
Currently, the only valid option isincremental
.
If you do not specify atype
value, the signal fails to stop the incremental snapshot.
Using the Kafka signaling channel to stop an incremental snapshot
You can send a signal message to the configured Kafka signaling topic to stop an ad hoc incremental snapshot.
The key of the Kafka message must match the value of the topic.prefix
connector configuration option.
The value of the message is a JSON object with type
and data
fields.
The signal type is stop-snapshot
, and the data
field must have the following fields:
Field | Default | Value |
---|---|---|
|
| The type of the snapshot to be executed. Currently Debezium supports only the |
| N/A | An optional array of comma-separated regular expressions that match the fully-qualified names of the tables to include in the snapshot. |
The following example shows a typical stop-snapshot
Kafka message:
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
Custom snapshotter SPI
For more advanced uses, you can fine-tune control of the snapshot by implementing one of the following interfaces:
io.debezium.snapshot.spi.Snapshotter
Controls whether the connector takes a snapshot.
io.debezium.snapshot.spi.SnapshotQuery
Controls how data is queried during a snapshot.
io.debezium.snapshot.spi.SnapshotLock
Controls whether the connector locks tables when taking a snapshot.
io.debezium.snapshot.spi.Snapshotter interface. All built-in snapshot modes implement this interface.
/**
* {@link Snapshotter} is used to determine the following details about the snapshot process:
* <p>
* - Whether a snapshot occurs. <br>
* - Whether streaming continues during the snapshot. <br>
* - Whether the snapshot includes schema (if supported). <br>
* - Whether to snapshot data or schema following an error.
* <p>
* Although Debezium provides many default snapshot modes,
* to provide more advanced functionality, such as partial snapshots,
* you can customize implementation of the interface.
* For more information, see the documentation.
*
*
*
*/
@Incubating
public interface Snapshotter extends Configurable {
/**
* @return the name of the snapshotter.
*
*
*/
String name();
/**
* @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
* @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
*
* @return {@code true} if the snapshotter should take a data snapshot
*/
boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress);
/**
* @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
* @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
*
* @return {@code true} if the snapshotter should take a schema snapshot
*/
boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress);
/**
* @return {@code true} if the snapshotter should stream after taking a snapshot
*/
boolean shouldStream();
/**
* @return {@code true} whether the schema can be recovered if database schema history is corrupted.
*/
boolean shouldSnapshotOnSchemaError();
/**
* @return {@code true} whether the snapshot should be re-executed when there is a gap in data stream.
*/
boolean shouldSnapshotOnDataError();
/**
*
* @return {@code true} if streaming should resume from the start of the snapshot
* transaction, or {@code false} for when a connector resumes and takes a snapshot,
* streaming should resume from where streaming previously left off.
*/
default boolean shouldStreamEventsStartingFromSnapshot() {
return true;
}
/**
* Lifecycle hook called after the snapshot phase is successful.
*/
default void snapshotCompleted() {
// no operation
}
/**
* Lifecycle hook called after the snapshot phase is aborted.
*/
default void snapshotAborted() {
// no operation
}
}
io.debezium.snapshot.spi.SnapshotQuery interface. All built-in snapshot query modes implement this interface.
/**
* {@link SnapshotQuery} is used to determine the query used during a data snapshot
*
*
*/
public interface SnapshotQuery extends Configurable, Service {
/**
* @return the name of the snapshot lock.
*
*
*/
String name();
/**
* Generate a valid query string for the specified table, or an empty {@link Optional}
* to skip snapshotting this table (but that table will still be streamed from)
*
* @param tableId the table to generate a query for
* @param snapshotSelectColumns the columns to be used in the snapshot select based on the column
* include/exclude filters
* @return a valid query string, or none to skip snapshotting this table
*/
Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns);
}
io.debezium.snapshot.spi.SnapshotLock interface. All built-in snapshot lock modes implement this interface.
/**
* {@link SnapshotLock} is used to determine the table lock mode used during schema snapshot
*
*
*/
public interface SnapshotLock extends Configurable, Service {
/**
* @return the name of the snapshot lock.
*
*
*/
String name();
/**
* Returns a SQL statement for locking the given table during snapshotting, if required by the specific snapshotter
* implementation.
*/
Optional<String> tableLockingStatement(Duration lockTimeout, String tableId);
}
Blocking snapshots
To provide more flexibility in managing snapshots, Debezium includes a supplementary ad hoc snapshot mechanism, known as a blocking snapshot. Blocking snapshots rely on the Debezium mechanism for sending signals to a Debezium connector.
A blocking snapshot behaves just like an initial snapshot, except that you can trigger it at run time.
You might want to run a blocking snapshot rather than use the standard initial snapshot process in the following situations:
You add a new collection and you want to complete the snapshot while the connector is running.
You add a large collection, and you want the snapshot to complete in less time than is possible with an incremental snapshot.
Blocking snapshot process
When you run a blocking snapshot, Debezium stops streaming, and then initiates a snapshot of the specified collection, following the same process that it uses during an initial snapshot. After the snapshot completes, the streaming is resumed.
Configure snapshot
You can set the following properties in the data
component of a signal:
data-collections: to specify which collections must be snapshot
additional-conditions: You can specify different filters for different collection.
The
data-collection
property is the fully-qualified name of the collection for which the filter will be applied.The
filter
property will have the same value used in thesnapshot.select.statement.overrides
For example:
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
Possible duplicates
A delay might exist between the time that you send the signal to trigger the snapshot, and the time when streaming stops and the snapshot starts. As a result of this delay, after the snapshot completes, the connector might emit some event records that duplicate records captured by the snapshot.
Streaming changes
After the connector task for a replica set records an offset, it uses the offset to determine the position in the oplog where it should start streaming changes. The task then (depending on the configuration) either connects to the replica set’s primary node or connects to a replica-set-wide change stream and starts streaming changes from that position. It processes all of create, insert, and delete operations, and converts them into Debezium change events. Each change event includes the position in the oplog where the operation was found, and the connector periodically records this as its most recent offset. The interval at which the offset is recorded is governed by offset.flush.interval.ms, which is a Kafka Connect worker configuration property.
When the connector is stopped gracefully, the last offset processed is recorded so that, upon restart, the connector will continue exactly where it left off. If the connector’s tasks terminate unexpectedly, however, then the tasks may have processed and generated events after it last records the offset but before the last offset is recorded; upon restart, the connector begins at the last recorded offset, possibly generating some the same events that were previously generated just prior to the crash.
When all components in a Kafka pipeline operate nominally, Kafka consumers receive every message exactly once. However, when things go wrong, Kafka can only guarantee that consumers receive every message at least once. To avoid unexpected results, consumers must be able to handle duplicate messages. |
As mentioned earlier, the connector tasks always use the replica set’s primary node to stream changes from the oplog, ensuring that the connector sees the most up-to-date operations as possible and can capture the changes with lower latency than if secondaries were to be used instead. When the replica set elects a new primary, the connector immediately stops streaming changes, connects to the new primary, and starts streaming changes from the new primary node at the same position. Likewise, if the connector experiences any problems communicating with the replica set members, it tries to reconnect, by using exponential backoff so as to not overwhelm the replica set, and once connected it continues streaming changes from where it last left off. In this way, the connector is able to dynamically adjust to changes in replica set membership and automatically handle communication failures.
To summarize, the MongoDB connector continues running in most situations. Communication problems might cause the connector to wait until the problems are resolved.
Pre-image support
In MongoDB 6.0 and later, you can configure change streams to emit the pre-image state of a document to populate the before
field for MongoDB change events. To enable the use of pre-images in MongoDB, you must set the changeStreamPreAndPostImages
for a collection by using db.createCollection()
, create
, or collMod
. To enable the Debezium MongoDB to include pre-images in change events, set the capture.mode
for the connector to one of the *_with_pre_image
options.
Size limits on MongoDB change stream events The size of a MongoDB change stream event is limited to 16 megabytes. The use of pre-images thus increases the likelihood of exceeding this threshold, which can lead to failures. For information about how to avoid exceeding the change stream limit, see the MongoDB documentation. |
Topic names
The MongoDB connector writes events for all insert, update, and delete operations to documents in each collection to a single Kafka topic. The name of the Kafka topics always takes the form logicalName.databaseName.collectionName, where logicalName is the logical name of the connector as specified with the topic.prefix
configuration property, databaseName is the name of the database where the operation occurred, and collectionName is the name of the MongoDB collection in which the affected document existed.
For example, consider a MongoDB replica set with an inventory
database that contains four collections: products
, products_on_hand
, customers
, and orders
. If the connector monitoring this database were given a logical name of fulfillment
, then the connector would produce events on these four Kafka topics:
fulfillment.inventory.products
fulfillment.inventory.products_on_hand
fulfillment.inventory.customers
fulfillment.inventory.orders
Notice that the topic names do not incorporate the replica set name or shard name. As a result, all changes to a sharded collection (where each shard contains a subset of the collection’s documents) all go to the same Kafka topic.
You can set up Kafka to auto-create the topics as they are needed. If not, then you must use Kafka administration tools to create the topics before starting the connector.
Partitions
The MongoDB connector does not make any explicit determination about how to partition topics for events. Instead, it allows Kafka to determine how to partition topics based on event keys. You can change Kafka’s partitioning logic by defining the name of the Partitioner
implementation in the Kafka Connect worker configuration.
Kafka maintains total order only for events written to a single topic partition. Partitioning the events by key does mean that all events with the same key always go to the same partition. This ensures that all events for a specific document are always totally ordered.
Transaction Metadata
Debezium can generate events that represents transaction metadata boundaries and enrich change data event messages.
Limits on when Debezium receives transaction metadata Debezium registers and receives metadata only for transactions that occur after you deploy the connector. Metadata for transactions that occur before you deploy the connector is not available. |
For every transaction BEGIN
and END
, Debezium generates an event that contains the following fields:
status
BEGIN
or END
id
String representation of unique transaction identifier.
event_count
(for END
events)
Total number of events emitted by the transaction.
data_collections
(for END
events)
An array of pairs of data_collection
and event_count
that provides number of events emitted by changes originating from given data collection.
The following example shows a typical message:
{
"status": "BEGIN",
"id": "1462833718356672513",
"event_count": null,
"data_collections": null
}
{
"status": "END",
"id": "1462833718356672513",
"event_count": 2,
"data_collections": [
{
"data_collection": "rs0.testDB.collectiona",
"event_count": 1
},
{
"data_collection": "rs0.testDB.collectionb",
"event_count": 1
}
]
}
Unless overridden via the topic.transaction option, transaction events are written to the topic named .transaction
.
Change data event enrichment
When transaction metadata is enabled, the data message Envelope
is enriched with a new transaction
field. This field provides information about every event in the form of a composite of fields:
id
String representation of unique transaction identifier.
total_order
The absolute position of the event among all events generated by the transaction.
data_collection_order
The per-data collection position of the event among all events that were emitted by the transaction.
Following is an example of what a message looks like:
{
"after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}",
"source": {
...
},
"op": "c",
"ts_ms": "1580390884335",
"ts_us": "1580390884335486",
"ts_ns": "1580390884335486281",
"transaction": {
"id": "1462833718356672513",
"total_order": "1",
"data_collection_order": "1"
}
}
Data change events
The Debezium MongoDB connector generates a data change event for each document-level operation that inserts, updates, or deletes data. Each event contains a key and a value. The structure of the key and the value depends on the collection that was changed.
Debezium and Kafka Connect are designed around continuous streams of event messages. However, the structure of these events may change over time, which can be difficult for consumers to handle. To address this, each event contains the schema for its content or, if you are using a schema registry, a schema ID that a consumer can use to obtain the schema from the registry. This makes each event self-contained.
The following skeleton JSON shows the basic four parts of a change event. However, how you configure the Kafka Connect converter that you choose to use in your application determines the representation of these four parts in change events. A schema
field is in a change event only when you configure the converter to produce it. Likewise, the event key and event payload are in a change event only if you configure a converter to produce it. If you use the JSON converter and you configure it to produce all four basic change event parts, change events have this structure:
{
"schema": { (1)
...
},
"payload": { (2)
...
},
"schema": { (3)
...
},
"payload": { (4)
...
},
}
Item | Field name | Description |
---|---|---|
1 |
| The first |
2 |
| The first |
3 |
| The second |
4 |
| The second |
By default, the connector streams change event records to topics with names that are the same as the event’s originating collection. See topic names.
The MongoDB connector ensures that all Kafka Connect schema names adhere to the Avro schema name format. This means that the logical server name must start with a Latin letter or an underscore, that is, a-z, A-Z, or . Each remaining character in the logical server name and each character in the database and collection names must be a Latin letter, a digit, or an underscore, that is, a-z, A-Z, 0-9, or \. If there is an invalid character it is replaced with an underscore character. This can lead to unexpected conflicts if the logical server name, a database name, or a collection name contains invalid characters, and the only characters that distinguish names from one another are invalid and thus replaced with underscores. |
Change event keys
A change event’s key contains the schema for the changed document’s key and the changed document’s actual key. For a given collection, both the schema and its corresponding payload contain a single id
field. The value of this field is the document’s identifier represented as a string that is derived from MongoDB extended JSON serialization strict mode.
Consider a connector with a logical name of fulfillment
, a replica set containing an inventory
database, and a customers
collection that contains documents such as the following.
Example document
{
"_id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
}
Example change event key
Every change event that captures a change to the customers
collection has the same event key schema. For as long as the customers
collection has the previous definition, every change event that captures a change to the customers
collection has the following key structure. In JSON, it looks like this:
{
"schema": { (1)
"type": "struct",
"name": "fulfillment.inventory.customers.Key", (2)
"optional": false, (3)
"fields": [ (4)
{
"field": "id",
"type": "string",
"optional": false
}
]
},
"payload": { (5)
"id": "1004"
}
}
Item | Field name | Description |
---|---|---|
1 |
| The schema portion of the key specifies a Kafka Connect schema that describes what is in the key’s |
2 |
| Name of the schema that defines the structure of the key’s payload. This schema describes the structure of the key for the document that was changed. Key schema names have the format connector-name.database-name.collection-name.
|
3 |
| Indicates whether the event key must contain a value in its |
4 |
| Specifies each field that is expected in the |
5 |
| Contains the key for the document for which this change event was generated. In this example, the key contains a single |
This example uses a document with an integer identifier, but any valid MongoDB document identifier works the same way, including a document identifier. For a document identifier, an event key’s payload.id
value is a string that represents the updated document’s original _id
field as a MongoDB extended JSON serialization that uses strict mode. The following table provides examples of how different types of _id
fields are represented.
Type | MongoDB _id Value | Key’s payload |
---|---|---|
Integer | 1234 |
|
Float | 12.34 |
|
String | “1234” |
|
Document |
|
|
ObjectId |
|
|
Binary |
|
|
Change event values
The value in a change event is a bit more complicated than the key. Like the key, the value has a schema
section and a payload
section. The schema
section contains the schema that describes the Envelope
structure of the payload
section, including its nested fields. Change events for operations that create, update or delete data all have a value payload with an envelope structure.
Consider the same sample document that was used to show an example of a change event key:
Example document
{
"_id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
}
The value portion of a change event for a change to this document is described for each event type:
create events
The following example shows the value portion of a change event that the connector generates for an operation that creates data in the customers
collection:
{
"schema": { (1)
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json", (2)
"version": 1,
"field": "after"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "patch"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "int64",
"optional": false,
"field": "ts_us"
},
{
"type": "int64",
"optional": false,
"field": "ts_ns"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "rs"
},
{
"type": "string",
"optional": false,
"field": "collection"
},
{
"type": "int32",
"optional": false,
"field": "ord"
},
{
"type": "int64",
"optional": true,
"field": "h"
}
],
"optional": false,
"name": "io.debezium.connector.mongo.Source", (3)
"field": "source"
},
{
"type": "string",
"optional": true,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "int64",
"optional": true,
"field": "ts_us"
},
{
"type": "int64",
"optional": true,
"field": "ts_ns"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope" (4)
},
"payload": { (5)
"after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}", (6)
"source": { (7)
"version": "2.6.2.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"ts_ms": 1558965508000000,
"ts_ms": 1558965508000000000,
"snapshot": false,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 31,
"h": 1546547425148721999
},
"op": "c", (8)
"ts_ms": 1558965515240, (9)
"ts_us": 1558965515240142, (10)
"ts_ns": 1558965515240142879, (11)
}
}
Item | Field name | Description |
---|---|---|
1 |
| The value’s schema, which describes the structure of the value’s payload. A change event’s value schema is the same in every change event that the connector generates for a particular collection. |
2 |
| In the |
3 |
|
|
4 |
|
|
5 |
| The value’s actual data. This is the information that the change event is providing. |
6 |
| An optional field that specifies the state of the document after the event occurred. In this example, the |
7 |
| Mandatory field that describes the source metadata for the event. This field contains information that you can use to compare this event with other events, with regard to the origin of the events, the order in which the events occurred, and whether events were part of the same transaction. The source metadata includes:
|
8 |
| Mandatory string that describes the type of operation that caused the connector to generate the event. In this example,
|
9 |
| Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task. |
10 |
| Optional field that displays the time at which the connector processed the event, in microseconds. The time is based on the system clock in the JVM running the Kafka Connect task. |
9 |
| Optional field that displays the time at which the connector processed the event, in nanoseconds. The time is based on the system clock in the JVM running the Kafka Connect task. |
update events
Change streams capture mode
The value of a change event for an update in the sample customers
collection has the same schema as a create event for that collection. Likewise, the event value’s payload has the same structure. However, the event value payload contains different values in an update event. An update event includes an after
value only if the capture.mode
option is set to change_streams_update_full
. A before
value is provided if the capture.mode
option is set to one of the *_with_pre_image
option. There is a new structured field updateDescription
with a few additional fields in this case:
updatedFields
is a string field that contains the JSON representation of the updated document fields with their valuesremovedFields
is a list of field names that were removed from the documenttruncatedArrays
is a list of arrays in the document that were truncated
Here is an example of a change event value in an event that the connector generates for an update in the customers
collection:
{
"schema": { ... },
"payload": {
"op": "u", (1)
"ts_ms": 1465491461815, (2)
"ts_us": 1465491461815698, (2)
"ts_ns": 1465491461815698142, (2)
"before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"unknown\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", (3)
"after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", (4)
"updateDescription": {
"removedFields": null,
"updatedFields": "{\"first_name\": \"Anne Marie\"}", (5)
"truncatedArrays": null
},
"source": { (6)
"version": "2.6.2.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"ts_us": 1558965508000000,
"ts_ns": 1558965508000000000,
"snapshot": false,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 1,
"h": null,
"tord": null,
"stxnid": null,
"lsid":"{\"id\": {\"$binary\": \"FA7YEzXgQXSX9OxmzllH2w==\",\"$type\": \"04\"},\"uid\": {\"$binary\": \"47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=\",\"$type\": \"00\"}}",
"txnNumber":1
}
}
}
Item | Field name | Description |
---|---|---|
1 |
| Mandatory string that describes the type of operation that caused the connector to generate the event. In this example, |
2 |
| Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task. |
3 |
| Contains the JSON string representation of the actual MongoDB document before change. + An update event value does not contain an |
4 |
| Contains the JSON string representation of the actual MongoDB document. |
5 |
| Contains the JSON string representation of the updated field values of the document. In this example, the update changed the |
6 |
| Mandatory field that describes the source metadata for the event. This field contains the same information as a create event for the same collection, but the values are different since this event is from a different position in the oplog. The source metadata includes:
|
The If your application depends on gradual change evolution then you should rely on |
delete events
The value in a delete change event has the same schema
portion as create and update events for the same collection. The payload
portion in a delete event contains values that are different from create and update events for the same collection. In particular, a delete event contains neither an after
value nor a updateDescription
value. Here is an example of a delete event for a document in the customers
collection:
{
"schema": { ... },
"payload": {
"op": "d", (1)
"ts_ms": 1465495462115, (2)
"ts_us": 1465495462115748, (2)
"ts_ns": 1465495462115748263, (2)
"before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}",(3)
"source": { (4)
"version": "2.6.2.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"ts_us": 1558965508000000,
"ts_ns": 1558965508000000000,
"snapshot": true,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 6,
"h": 1546547425148721999
}
}
}
Item | Field name | Description |
---|---|---|
1 |
| Mandatory string that describes the type of operation. The |
2 |
| Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task. |
3 |
| Contains the JSON string representation of the actual MongoDB document before change. + An update event value does not contain an |
4 |
| Mandatory field that describes the source metadata for the event. This field contains the same information as a create or update event for the same collection, but the values are different since this event is from a different position in the oplog. The source metadata includes:
|
MongoDB connector events are designed to work with Kafka log compaction. Log compaction enables removal of some older messages as long as at least the most recent message for every key is kept. This lets Kafka reclaim storage space while ensuring that the topic contains a complete data set and can be used for reloading key-based state.
Tombstone events
All MongoDB connector events for a uniquely identified document have exactly the same key. When a document is deleted, the delete event value still works with log compaction because Kafka can remove all earlier messages that have that same key. However, for Kafka to remove all messages that have that key, the message value must be null
. To make this possible, after Debezium’s MongoDB connector emits a delete event, the connector emits a special tombstone event that has the same key but a null
value. A tombstone event informs Kafka that all messages with that same key can be removed.
Setting up MongoDB
The MongoDB connector uses MongoDB’s change streams to capture the changes, so the connector works only with MongoDB replica sets or with sharded clusters where each shard is a separate replica set. See the MongoDB documentation for setting up a replica set or sharded cluster. Also, be sure to understand how to enable access control and authentication with replica sets.
You must also have a MongoDB user that has the appropriate roles to read the admin
database where the oplog can be read. Additionally, the user must also be able to read the config
database in the configuration server of a sharded cluster and must have listDatabases
privilege action. When change streams are used (the default) the user also must have cluster-wide privilege actions find
and changeStream
.
When you intend to utilize pre-image and populate the before
field, you need to first enable changeStreamPreAndPostImages
for a collection using db.createCollection()
, create
, or collMod
.
MongoDB in the Cloud
You can use the Debezium connector for MongoDB with MongoDB Atlas. Note that MongoDB Atlas only supports secure connections via SSL, i.e. the +mongodb.ssl.enabled connector option must be set to true
.
Optimal Oplog Config
The Debezium MongoDB connector reads change streams to obtain oplog data for a replica set. Because the oplog is a fixed-sized, capped collection, if it exceeds its maximum configured size, it begins to overwrite its oldest entries. If the connector is stopped for any reason, when it restarts, it attempts to resume streaming from the last oplog stream position. However, if last stream position was removed from the oplog, depending on the value specified in the connector’s snapshot.mode property, the connector might fail to start, reporting an invalid resume token error. In the event of a failure, you must create a new connector to enable Debezium to continue capturing records from the database. For more information, see Connector fails after it is stopped for a long interval if snapshot.mode is set to initial.
To ensure that the oplog retains the offset values that Debezium requires to resume streaming, you can use either of the following approaches:
Increase the size of the oplog. Based on your typical workloads, set the oplog size to a value that is greater than the peak number of oplog entries per hour.
Increase the minimum number of hours that an oplog entry is retained (MongoDB 4.4 and greater). This setting is time-based, such that entries in the last n hours are guaranteed to be available even if the oplog reaches its maximum configured size. Although this is generally the preferred option, for clusters with high workloads that are nearing capacity, specify the maximum oplog size.
To help prevent failures that are related to missing oplog entries, it’s important to track metrics that report replication behavior, and to optimize the oplog size to support Debezium. In particular, you should monitor the values of Oplog GB/Hour and Replication Oplog Window. If Debezium is offline for an interval that exceeds the value of the replication oplog window, and the primary oplog grows faster than Debezium can consume entries, a connector failure can result.
For information about how to monitor these metrics, see the MongoDB documentation.
It’s best to set the maximum oplog size to a value that is based on the anticipated hourly growth of the oplog (Oplog GB/Hour), multiplied by the time that might be required to address a Debezium failure.
That is,
_Oplog GB/Hour_
X _average reaction time to Debezium failure_
For example, if the oplog size limit is set to 1GB, and the oplog grows by 3GB per hour, oplog entries are cleared three times per hour. If Debezium were to fail during this time, its last oplog position is likely to be removed.
If the oplog grows at the rate of 3GB/hour, and Debezium is offline for two hours, you would thus set the oplog size to 3GB/hour X 2 hours, or 6GB.
Deployment
To deploy a Debezium MongoDB connector, you install the Debezium MongoDB connector archive, configure the connector, and start the connector by adding its configuration to Kafka Connect.
Prerequisites
Apache Zookeeper, Apache Kafka, and Kafka Connect are installed.
MongoDB is installed and is set up to work with the Debezium connector.
Procedure
Download the connector’s plug-in archive,
Extract the JAR files into your Kafka Connect environment.
Add the directory with the JAR files to Kafka Connect’s plugin.path.
Restart your Kafka Connect process to pick up the new JAR files.
If you are working with immutable containers, see Debezium’s Container images for Apache Zookeeper, Apache Kafka, and Kafka Connect with the MongoDB connector already installed and ready to run.
You can also run Debezium on Kubernetes and OpenShift.
The Debezium tutorial walks you through using these images, and this is a great way to learn about Debezium.
MongoDB connector configuration example
Following is an example of the configuration for a connector instance that captures data from a MongoDB replica set rs0
at port 27017 on 192.168.99.100, which we logically name fullfillment
. Typically, you configure the Debezium MongoDB connector in a JSON file by setting the configuration properties that are available for the connector.
You can choose to produce events for a particular MongoDB replica set or sharded cluster. Optionally, you can filter out collections that are not needed.
{
"name": "inventory-connector", (1)
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector", (2)
"mongodb.connection.string": "mongodb://192.168.99.100:27017/?replicaSet=rs0", (3)
"topic.prefix": "fullfillment", (4)
"collection.include.list": "inventory[.]*" (5)
}
}
1 | The name of our connector when we register it with a Kafka Connect service. |
2 | The name of the MongoDB connector class. |
3 | The connection string to use to connect to the MongoDB replica set. |
4 | The logical name of the MongoDB replica set, which forms a namespace for generated events and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used. |
5 | A list of regular expressions that match the collection namespaces (for example, <dbName>.<collectionName>) of all collections to be monitored. This is optional. |
For the complete list of the configuration properties that you can set for the Debezium MongoDB connector, see MongoDB connector configuration properties.
You can send this configuration with a POST
command to a running Kafka Connect service. The service records the configuration and starts one connector task that performs the following actions:
Connects to the MongoDB replica set or sharded cluster.
Assigns tasks for each replica set.
Performs a snapshot, if necessary.
Reads the change stream.
Streams change event records to Kafka topics.
Adding connector configuration
To start running a Debezium MongoDB connector, create a connector configuration, and add the configuration to your Kafka Connect cluster.
Prerequisites
The Debezium MongoDB connector is installed.
Procedure
Create a configuration for the MongoDB connector.
Use the Kafka Connect REST API to add that connector configuration to your Kafka Connect cluster.
Results
After the connector starts, it completes the following actions:
Performs a consistent snapshot of the collections in your MongoDB replica sets.
Reads the change streams for the replica sets.
Produces change events for every inserted, updated, and deleted document.
Streams change event records to Kafka topics.
Connector properties
The Debezium MongoDB connector has numerous configuration properties that you can use to achieve the right connector behavior for your application. Many properties have default values. Information about the properties is organized as follows:
Required Debezium MongoDB connector configuration properties
Advanced Debezium MongoDB connector configuration properties
The following configuration properties are required unless a default value is available.
Property | Default | Description | ||
---|---|---|---|---|
false | Set this property to
| |||
No default | Unique name for the connector. Attempting to register again with the same name will fail. (This property is required by all Kafka Connect connectors.) | |||
No default | The name of the Java class for the connector. Always use a value of | |||
No default | Specifies a connection string that the connector uses to connect to a MongoDB replica set. This property replaces the | |||
No default | A unique name that identifies the connector and/or MongoDB replica set or sharded cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster. Use only alphanumeric characters, hyphens, dots and underscores to form the name. The logical name should be unique across all other connectors, because the name is used as the prefix in naming the Kafka topics that receive records from this connector.
| |||
DefaultMongoDbAuthProvider | A full Java class name that is an implementation of the io.debezium.connector.mongodb.connection.MongoDbAuthProvider interface. This class handles setting the credentials on the MongoDB connection (called on each app boot). Default behavior uses the mongodb.user, mongodb.password, and mongodb.authsource properties according to each of their documentation, but other implementations may use them differently or ignore them altogether. Note that any setting in mongodb.connection.string will override settings set by this class | |||
No default | When using default mongodb.authentication.class: Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. | |||
No default | When using default mongodb.authentication.class: Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. | |||
| When using default mongodb.authentication.class: Database (authentication source) containing MongoDB credentials. This is required only when MongoDB is configured to use authentication with another authentication database than | |||
| Connector will use SSL to connect to MongoDB instances. | |||
| When SSL is enabled this setting controls whether strict hostname checking is disabled during connection phase. If | |||
regex | The mode used to match events based on included/excluded database and collection names. Set the property to one of the following values:
| |||
empty string | An optional comma-separated list of regular expressions or literals that match database names to be monitored. By default, all databases are monitored. To match the name of a database, Debezium performs one of the following actions based on the value of filters.match.mode property
If you include this property in the configuration, do not also set the | |||
empty string | An optional comma-separated list of regular expressions or literals that match database names to be excluded from monitoring. When To match the name of a database, Debezium performs one of the following actions based on the value of filters.match.mode property
If you include this property in the configuration, do not set the | |||
empty string | An optional comma-separated list of regular expressions or literals that match fully-qualified namespaces for MongoDB collections to be monitored. By default, the connector monitors all collections except those in the To match the name of a namespace, Debezium performs one of the following actions based on the value of filters.match.mode property
If you include this property in the configuration, do not also set the | |||
empty string | An optional comma-separated list of regular expressions or literals that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring. When To match the name of a namespace, Debezium performs one of the following actions based on the value of filters.match.mode property
If you include this property in the configuration, do not set the | |||
| Specifies the method that the connector uses to capture
| |||
| Specifies the scope of the change streams that the connector opens. Set this property to one of the following values:
| |||
Specifies the database that the connector monitors for changes. This property applies only if the capture.scope is set to | ||||
empty string | An optional comma-separated list of the fully-qualified names of fields that should be excluded from change event message values. Fully-qualified names for fields are of the form databaseName.collectionName.fieldName.nestedFieldName, where databaseName and collectionName may contain the wildcard () which matches any characters. | |||
empty string | An optional comma-separated list of the fully-qualified replacements of fields that should be used to rename fields in change event message values. Fully-qualified replacements for fields are of the form databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName, where databaseName and collectionName may contain the wildcard () which matches any characters, the colon character (:) is used to determine rename mapping of field. The next field replacement is applied to the result of the previous field replacement in the list, so keep this in mind when renaming multiple fields that are in the same path. | |||
| Controls whether a delete event is followed by a tombstone event. | |||
none | Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings:
| |||
none | Specifies how field names should be adjusted for compatibility with the message converter used by the connector. Possible settings:
See Avro naming for more details. |
The following advanced configuration properties have good defaults that will work in most situations and therefore rarely need to be specified in the connector’s configuration.
Property | Default | Description |
---|---|---|
| Specifies how the connector looks up the full value of an updated document when the capture.mode is set retrieve full documents. The connector retrieves full documents when its
To use this option with a MongoDB change streams collection, you must configure the collection to return document pre- and post-images. Pre- and post-images for an operation are available only if the required configuration is in place before the operation occurs. Set this property to one of the following values:
| |
| Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to 2048. | |
| Positive integer value that specifies the maximum number of records that the blocking queue can hold. When Debezium reads events streamed from the database, it places the events in the blocking queue before it writes them to Kafka. The blocking queue can provide backpressure for reading change events from the database in cases where the connector ingests messages faster than it can write them to Kafka, or when Kafka becomes unavailable. Events that are held in the queue are disregarded when the connector periodically records offsets. Always set the value of | |
| A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value. | |
| Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to 500 milliseconds, or 0.5 second. | |
| Positive integer value that specifies the initial delay when trying to reconnect to a primary after the first failed connection attempt or when no primary is available. Defaults to 1 second (1000 ms). | |
| Positive integer value that specifies the maximum delay when trying to reconnect to a primary after repeated failed connection attempts or when no primary is available. Defaults to 120 seconds (120,000 ms). | |
| Positive integer value that specifies the maximum number of failed connection attempts to a replica set primary before an exception occurs and task is aborted. Defaults to 16, which with the defaults for | |
v2 | Schema version for the | |
| Controls how frequently heartbeat messages are sent. Set this parameter to | |
| A comma-separated list of operation types that will be skipped during streaming. The operations include: | |
No default | Controls which collection items are included in snapshot. This property affects snapshots only. Specify a comma-separated list of collection names in the form databaseName.collectionName. For each collection that you specify, also specify another configuration property: | |
No default | An interval in milliseconds that the connector should wait before taking a snapshot after starting up; | |
| Specifies the maximum number of documents that should be read in one go from each collection while taking a snapshot. The connector will read the collection contents in multiple batches of this size. | |
All collections specified in | An optional, comma-separated list of regular expressions that match the fully-qualified names ( To match the name of a schema, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the schema; it does not match substrings that might be present in a schema name. | |
| Positive integer value that specifies the maximum number of threads used to perform an intial sync of the collections in a replica set. Defaults to 1. | |
initial | Specifies the criteria for performing a snapshot when the connector starts. Set the property to one of the following values:
For more information, see custom snapshotter SPI. | |
false | If the | |
false | If the | |
false | If the | |
false | If the | |
false | If the | |
No default | If | |
| When set to See Transaction Metadata for additional details. | |
10000 (10 seconds) | The number of milliseconds to wait before restarting a connector after a retriable error occurs. | |
| The interval in which the connector polls for new, removed, or changed replica sets. | |
10000 (10 seconds) | The number of milliseconds the driver will wait before a new connection attempt is aborted. | |
10000 (10 seconds) | The frequency that the cluster monitor attempts to reach each server. | |
0 | The number of milliseconds before a send/receive on the socket can take before a timeout occurs. A value of | |
30000 (30 seconds) | The number of milliseconds the driver will wait to select a server before it times out and throws an error. | |
No default | When streaming changes, this setting applies processing to change stream events as part of the standard MongoDB aggregation stream pipeline. A pipeline is a MongoDB aggregation pipeline composed of instructions to the database to filter or transform data. This can be used customize the data that the connector consumes. The value of this property must be an array of permitted aggregation pipeline stages in JSON format. Note that this is appended after the internal pipeline used to support the connector (e.g. filtering operation types, database names, collection names, etc.). | |
internal_first | The order used to construct the effective MongoDB aggregation stream pipeline. Set the property to one of the following values:
| |
fail | The strategy used to handle change events for documents exceeding specified BSON size. Set the property to one of the following values:
| |
0 | The maximum allowed size in bytes of the stored document for which change events are processed. This includes both, the size before and after database operation, more specifically this limits the size of fullDocument and fullDocumentBeforeChange filed of MongoDB change events. | |
| Specifies the maximum number of milliseconds the oplog/change stream cursor will wait for the server to produce a result before causing an execution timeout exception. A value of | |
No default | Fully-qualified name of the data collection that is used to send signals to the connector. Use the following format to specify the collection name: | |
source | List of the signaling channel names that are enabled for the connector. By default, the following channels are available:
| |
No default | List of notification channel names that are enabled for the connector. By default, the following channels are available:
| |
| The maximum number of documents that the connector fetches and reads into memory during an incremental snapshot chunk. Increasing the chunk size provides greater efficiency, because the snapshot runs fewer snapshot queries of a greater size. However, larger chunk sizes also require more memory to buffer the snapshot data. Adjust the chunk size to a value that provides the best performance in your environment. | |
| Specifies the watermarking mechanism that the connector uses during an incremental snapshot to deduplicate events that might be captured by an incremental snapshot and then recaptured after streaming resumes.
| |
| The name of the TopicNamingStrategy class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event etc., defaults to | |
| Specify the delimiter for topic name, defaults to | |
| The size used for holding the topic names in bounded concurrent hash map. This cache will help to determine the topic name corresponding to a given data collection. | |
| Controls the name of the topic to which the connector sends heartbeat messages. The topic name has this pattern: | |
| Controls the name of the topic to which the connector sends transaction metadata messages. The topic name has this pattern: | |
| The custom metric tags will accept key-value pairs to customize the MBean object name which should be appended the end of regular name, each key would represent a tag for the MBean object name, and the corresponding value would be the value of that tag the key is. For example: | |
| Specifies how the connector responds after an operation that results in a retriable error, such as a connection error.
|
Debezium connector Kafka signals configuration properties
Debezium provides a set of signal.*
properties that control how the connector interacts with the Kafka signals topic.
The following table describes the Kafka signal
properties.
Property | Default | Description | ||
---|---|---|---|---|
<topic.prefix>-signal | The name of the Kafka topic that the connector monitors for ad hoc signals.
| |||
kafka-signal | The name of the group ID that is used by Kafka consumers. | |||
No default | A list of host/port pairs that the connector uses for establishing an initial connection to the Kafka cluster. Each pair references the Kafka cluster that is used by the Debezium Kafka Connect process. | |||
| An integer value that specifies the maximum number of milliseconds that the connector waits when polling signals. | |||
| Enable the offset commit for the signal topic in order to guarantee At-Least-Once delivery. If disabled, only signals received when the consumer is up&running are processed. Any signals received when the consumer is down are lost. |
Debezium connector pass-through signals Kafka consumer client configuration properties
The Debezium connector provides for pass-through configuration of the signals Kafka consumer. Pass-through signals properties begin with the prefix signals.consumer.*
. For example, the connector passes properties such as signal.consumer.security.protocol=SSL
to the Kafka consumer.
Debezium strips the prefixes from the properties before it passes the properties to the Kafka signals consumer.
Debezium connector sink notifications configuration properties
The following table describes the notification
properties.
Property | Default | Description |
---|---|---|
No default | The name of the topic that receives notifications from Debezium. This property is required when you configure the notification.enabled.channels property to include |
Monitoring
The Debezium MongoDB connector has two metric types in addition to the built-in support for JMX metrics that Zookeeper, Kafka, and Kafka Connect have.
Snapshot metrics provide information about connector operation while performing a snapshot.
Streaming metrics provide information about connector operation when the connector is capturing changes and streaming change event records.
The Debezium monitoring documentation provides details about how to expose these metrics by using JMX.
Snapshot Metrics
The MBean is debezium.mongodb:type=connector-metrics,context=snapshot,server=_<topic.prefix>_,task=_<task.id>_
.
Snapshot metrics are not exposed unless a snapshot operation is active, or if a snapshot has occurred since the last connector start.
The following table lists the shapshot metrics that are available.
Attributes | Type | Description |
---|---|---|
| The last snapshot event that the connector has read. | |
| The number of milliseconds since the connector has read and processed the most recent event. | |
| The total number of events that this connector has seen since last started or reset. | |
| The number of events that have been filtered by include/exclude list filtering rules configured on the connector. | |
| The list of tables that are captured by the connector. | |
| The length the queue used to pass events between the snapshotter and the main Kafka Connect loop. | |
| The free capacity of the queue used to pass events between the snapshotter and the main Kafka Connect loop. | |
| The total number of tables that are being included in the snapshot. | |
| The number of tables that the snapshot has yet to copy. | |
| Whether the snapshot was started. | |
| Whether the snapshot was paused. | |
| Whether the snapshot was aborted. | |
| Whether the snapshot completed. | |
| The total number of seconds that the snapshot has taken so far, even if not complete. Includes also time when snapshot was paused. | |
| The total number of seconds that the snapshot was paused. If the snapshot was paused several times, the paused time adds up. | |
| Map containing the number of rows scanned for each table in the snapshot. Tables are incrementally added to the Map during processing. Updates every 10,000 rows scanned and upon completing a table. | |
| The maximum buffer of the queue in bytes. This metric is available if max.queue.size.in.bytes is set to a positive long value. | |
| The current volume, in bytes, of records in the queue. |
The Debezium MongoDB connector also provides the following custom snapshot metrics:
Attribute | Type | Description |
---|---|---|
|
| Number of database disconnects. |
Streaming Metrics
The MBean is debezium.mongodb:type=connector-metrics,context=streaming,server=_<topic.prefix>_,task=_<task.id>_
.
The following table lists the streaming metrics that are available.
Attributes | Type | Description |
---|---|---|
| The last streaming event that the connector has read. | |
| The number of milliseconds since the connector has read and processed the most recent event. | |
| The total number of events that this connector has seen since the last start or metrics reset. | |
| The total number of create events that this connector has seen since the last start or metrics reset. | |
| The total number of update events that this connector has seen since the last start or metrics reset. | |
| The total number of delete events that this connector has seen since the last start or metrics reset. | |
| The number of events that have been filtered by include/exclude list filtering rules configured on the connector. | |
| The list of tables that are captured by the connector. | |
| The length the queue used to pass events between the streamer and the main Kafka Connect loop. | |
| The free capacity of the queue used to pass events between the streamer and the main Kafka Connect loop. | |
| Flag that denotes whether the connector is currently connected to the database server. | |
| The number of milliseconds between the last change event’s timestamp and the connector processing it. The values will incoporate any differences between the clocks on the machines where the database server and the connector are running. | |
| The number of processed transactions that were committed. | |
| The coordinates of the last received event. | |
| Transaction identifier of the last processed transaction. | |
| The maximum buffer of the queue in bytes. This metric is available if max.queue.size.in.bytes is set to a positive long value. | |
| The current volume, in bytes, of records in the queue. |
The Debezium MongoDB connector also provides the following custom streaming metrics:
Attribute | Type | Description |
---|---|---|
|
| Number of database disconnects. |
|
| Number of primary node elections. |
MongoDB connector common issues
Debezium is a distributed system that captures all changes in multiple upstream databases, and will never miss or lose an event. When the system is operating normally and is managed carefully, then Debezium provides exactly once delivery of every change event.
If a fault occurs, the system does not lose any events. However, while it is recovering from the fault, it might repeat some change events. In such situations, Debezium, like Kafka, provides at least once delivery of change events.
The rest of this section describes how Debezium handles various kinds of faults and problems.
Configuration and startup errors
In the following situations, the connector fails when trying to start, reports an error or exception in the log, and stops running:
The connector’s configuration is invalid.
The connector cannot successfully connect to MongoDB by using the specified connection parameters.
After a failure, the connector attempts to reconnect by using exponential backoff. You can configure the maximum number of reconnection attempts.
In these cases, the error will have more details about the problem and possibly a suggested work around. The connector can be restarted when the configuration has been corrected or the MongoDB problem has been addressed.
MongoDB becomes unavailable
Once the connector is running, if the primary node of any of the MongoDB replica sets become unavailable or unreachable, the connector will repeatedly attempt to reconnect to the primary node, using exponential backoff to prevent saturating the network or servers. If the primary remains unavailable after the configurable number of connection attempts, the connector will fail.
The attempts to reconnect are controlled by three properties:
connect.backoff.initial.delay.ms
- The delay before attempting to reconnect for the first time, with a default of 1 second (1000 milliseconds).connect.backoff.max.delay.ms
- The maximum delay before attempting to reconnect, with a default of 120 seconds (120,000 milliseconds).connect.max.attempts
- The maximum number of attempts before an error is produced, with a default of 16.
Each delay is double that of the prior delay, up to the maximum delay. Given the default values, the following table shows the delay for each failed connection attempt and the total accumulated time before failure.
Reconnection attempt number | Delay before attempt, in seconds | Total delay before attempt, in minutes and seconds |
---|---|---|
1 | 1 | 00:01 |
2 | 2 | 00:03 |
3 | 4 | 00:07 |
4 | 8 | 00:15 |
5 | 16 | 00:31 |
6 | 32 | 01:03 |
7 | 64 | 02:07 |
8 | 120 | 04:07 |
9 | 120 | 06:07 |
10 | 120 | 08:07 |
11 | 120 | 10:07 |
12 | 120 | 12:07 |
13 | 120 | 14:07 |
14 | 120 | 16:07 |
15 | 120 | 18:07 |
16 | 120 | 20:07 |
Connector Unable to Start - InvalidResumeToken or ChangeStreamHistoryLost
A connector that is stopped for a long period fails to start, and reports the following exception:
Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog
The preceding exception indicates that the entry that corresponds to the connector’s resume token is no longer present in the oplog. Because the oplog no longer contains the last offset that the connector processed, the connector cannot resume streaming.
You can use either of the following options to recover from the failure:
Delete the failed connector, and create a new connector with the same configuration but with a different connector name.
Pause the connector and then remove offsets, or change the offset topic.
To help prevent failures related to missing resume tokens, optimize configuration of the oplog.
Kafka Connect process stops gracefully
If Kafka Connect is being run in distributed mode, and a Kafka Connect process is stopped gracefully, then prior to shutdown of that processes Kafka Connect will migrate all of the process’ connector tasks to another Kafka Connect process in that group, and the new connector tasks will pick up exactly where the prior tasks left off. There is a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.
If the group contains only one process and that process is stopped gracefully, then Kafka Connect will stop the connector and record the last offset for each replica set. Upon restart, the replica set tasks will continue exactly where they left off.
Kafka Connect process crashes
If the Kafka Connector process stops unexpectedly, then any connector tasks it was running will terminate without recording their most recently-processed offsets. When Kafka Connect is being run in distributed mode, it will restart those connector tasks on other processes. However, the MongoDB connectors will resume from the last offset recorded by the earlier processes, which means that the new replacement tasks may generate some of the same change events that were processed just prior to the crash. The number of duplicate events depends on the offset flush period and the volume of data changes just before the crash.
Because there is a chance that some events may be duplicated during a recovery from failure, consumers should always anticipate some events may be duplicated. Debezium changes are idempotent, so a sequence of events always results in the same state. Debezium also includes with each change event message the source-specific information about the origin of the event, including the MongoDB event’s unique transaction identifier ( |
Kafka becomes unavailable
As the connector generates change events, the Kafka Connect framework records those events in Kafka using the Kafka producer API. Kafka Connect will also periodically record the latest offset that appears in those change events, at a frequency that you have specified in the Kafka Connect worker configuration. If the Kafka brokers become unavailable, the Kafka Connect worker process running the connectors will simply repeatedly attempt to reconnect to the Kafka brokers. In other words, the connector tasks will simply pause until a connection can be reestablished, at which point the connectors will resume exactly where they left off.
Connector fails after it is stopped for a long interval if snapshot.mode
is set to initial
If the connector is gracefully stopped, users might continue to perform operations on replica set members. Changes that occur while the connector is offline continue to be recorded in MongoDB’s oplog. In most cases, after the connector is restarted, it reads the offset value in the oplog to determine the last operation that it streamed for each replica set, and then resumes streaming changes from that point. After the restart, database operations that occurred while the connector was stopped are emitted to Kafka as usual, and after some time, the connector catches up with the database. The amount of time required for the connector to catch up depends on the capabilities and performance of Kafka and the volume of changes that occurred in the database.
However, if the connector remains stopped for a long enough interval, it can occur that MongoDB purges the oplog during the time that the connector is inactive, resulting in the loss of information about the connector’s last position. After the connector restarts, it cannot resume streaming, because the oplog no longer contains the previous offset value that marks the last operation that the connector processed. The connector also cannot perform a snapshot, as it typically would when the snapshot.mode
property is set to initial
, and no offset value is present. In this case, a mismatch exists, because the oplog does not contain the value of the previous offset, but the offset value is present in the connector’s internal Kafka offsets topic. An error results and the connector fails.
To recover from the failure, delete the failed connector, and create a new connector with the same configuration but with a different connector name. When you start the new connector, it performs a snapshot to ingest the state of database, and then resumes streaming.
MongoDB loses writes
In certain failure situations, MongoDB can lose commits, which results in the MongoDB connector being unable to capture the lost changes. For example, if the primary crashes suddenly after it applies a change and records the change to its oplog, the oplog might become unavailable before secondary nodes can read its contents. As a result, the secondary node that is elected as the new primary node might be missing the most recent changes from its oplog.
At this time, there is no way to prevent this side effect in MongoDB.