MongoDB-CDC

Overview

The MongoDB CDC connector allows for reading snapshot data and incremental data from MongoDB. This document describes how to setup the MongoDB CDC connector to run SQL queries against MongoDB.

Supported Version

Extract NodeVersion
MongoDB-CDCMongoDB: >= 3.6

Dependencies

In order to setup the MongoDB CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-mongodb-cdc</artifactId>
  4. <version>1.5.0-SNAPSHOT</version>
  5. </dependency>

Setup MongoDB

Availability

  • MongoDB version

    MongoDB version >\= 3.6 We use change streams feature (new in version 3.6) to capture change data.

  • Cluster Deployment

    replica sets or sharded clusters is required.

  • Storage Engine

    WiredTiger storage engine is required.

  • Replica set protocol version

    Replica set protocol version 1 (pv1) is required. Starting in version 4.0, MongoDB only supports pv1. pv1 is the default for all new replica sets created with MongoDB 3.2 or later.

  • Privileges

    changeStream and read privileges are required by MongoDB Kafka Connector.

    You can use the following example for simple authorization. For more detailed authorization, please refer to MongoDB Database User Roles.

    1. use admin;
    2. db.createUser({
    3. user: "flinkuser",
    4. pwd: "flinkpw",
    5. roles: [
    6. { role: "read", db: "admin" }, // read role includes changeStream privilege
    7. { role: "readAnyDatabase", db: "admin" } // for snapshot reading
    8. ]
    9. });

How to create a MongoDB Extract Node

Usage for SQL API

The example below shows how to create an MongoDB Extract Node with Flink SQL :

  1. -- Set checkpoint every 3000 milliseconds
  2. Flink SQL> SET 'execution.checkpointing.interval' = '3s';
  3. -- Create a MySQL table 'mongodb_extract_node' in Flink SQL
  4. Flink SQL> CREATE TABLE mongodb_extract_node (
  5. _id STRING, // must be declared
  6. name STRING,
  7. weight DECIMAL(10,3),
  8. tags ARRAY<STRING>, -- array
  9. price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
  10. suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
  11. PRIMARY KEY(_id) NOT ENFORCED
  12. ) WITH (
  13. 'connector' = 'mongodb-cdc-inlong',
  14. 'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
  15. 'username' = 'flinkuser',
  16. 'password' = 'flinkpw',
  17. 'database' = 'inventory',
  18. 'collection' = 'mongodb_extract_node'
  19. );
  20. -- Read snapshot and binlogs from mongodb_extract_node
  21. Flink SQL> SELECT * FROM mongodb_extract_node;

Note

MongoDB’s change event record doesn’t have update before message. So, we can only convert it to Flink’s UPSERT changelog stream. An upsert stream requires a unique key, so we must declare _id as primary key. We can’t declare other column as primary key, becauce delete operation do not contain’s the key and value besides _id and sharding key.

Usage for InLong Dashboard

TODO: It will be supported in the future.

Usage for InLong Manager Client

TODO: It will be supported in the future.

MongoDB Extract Node Options

OptionRequiredDefaultTypeDescription
connectorrequired(none)StringSpecify what connector to use, here should be mongodb-cdc-inlong.
hostsrequired(none)StringThe comma-separated list of hostname and port pairs of the MongoDB servers. eg. localhost:27017,localhost:27018
usernameoptional(none)StringName of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.
passwordoptional(none)StringPassword to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.
databaserequired(none)StringName of the database to watch for changes.
collectionrequired(none)StringName of the collection in the database to watch for changes.
connection.optionsoptional(none)StringThe ampersand-separated connection options of MongoDB. eg. replicaSet=test&connectTimeoutMS=300000
errors.toleranceoptionalnoneStringWhether to continue processing messages if an error is encountered. Accept none or all. When set to none, the connector reports an error and blocks further processing of the rest of the records when it encounters an error. When set to all, the connector silently ignores any bad messages.
errors.log.enableoptionaltrueBooleanWhether details of failed operations should be written to the log file.
copy.existingoptionaltrueBooleanWhether copy existing data from source collections.
copy.existing.pipelineoptional(none)StringAn array of JSON objects describing the pipeline operations to run when copying existing data. This can improve the use of indexes by the copying manager and make copying more efficient. eg. [{“$match”: {“closed”: “false”}}] ensures that only documents in which the closed field is set to false are copied.
copy.existing.max.threadsoptionalProcessors CountIntegerThe number of threads to use when performing the data copy.
copy.existing.queue.sizeoptional16000IntegerThe max size of the queue to use when copying data.
poll.max.batch.sizeoptional1000IntegerMaximum number of change stream documents to include in a single batch when polling for new data.
poll.await.time.msoptional1500IntegerThe amount of time to wait before checking for new results on the change stream.
heartbeat.interval.msoptional0IntegerThe length of time in milliseconds between sending heartbeat messages. Use 0 to disa
inlong.metric.labelsoptional(none)StringInlong metric label, format of value is groupId={groupId}&streamId={streamId}&nodeId={nodeId}.

Available Metadata

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

KeyDataTypeDescription
database_nameSTRING NOT NULLName of the database that contain the row.
collection_nameSTRING NOT NULLName of the collection that contain the row.
op_tsTIMESTAMP_LTZ(3) NOT NULLIt indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the change stream, the value is always 0.

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

  1. CREATE TABLE `mysql_extract_node` (
  2. db_name STRING METADATA FROM 'database_name' VIRTUAL,
  3. collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
  4. operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
  5. _id STRING, // must be declared
  6. name STRING,
  7. weight DECIMAL(10,3),
  8. tags ARRAY<STRING>, -- array
  9. price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
  10. suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
  11. PRIMARY KEY(_id) NOT ENFORCED
  12. ) WITH (
  13. 'connector' = 'mongodb-cdc-inlong',
  14. 'hostname' = 'YourHostname',
  15. 'username' = 'YourUsername',
  16. 'password' = 'YourPassword',
  17. 'database' = 'YourDatabase',
  18. 'collection' = 'YourTable'
  19. );

Data Type Mapping

BSON typeFlink SQL type
TINYINT
SMALLINT
IntINT
LongBIGINT
FLOAT
DoubleDOUBLE
Decimal128DECIMAL(p, s)
BooleanBOOLEAN
Date TimestampDATE
Date TimestampTIME
DateTIMESTAMP(3) TIMESTAMP_LTZ(3)
TimestampTIMESTAMP(0) TIMESTAMP_LTZ(0)
String ObjectId UUID Symbol MD5 JavaScript RegexSTRING
BinDataBYTES
ObjectROW
ArrayARRAY
DBPointerROW\<\$ref STRING, \$id STRING>
GeoJSONPoint : ROW\<type STRING, coordinates ARRAY\<DOUBLE>> Line : ROW\<type STRING, coordinates ARRAY\<ARRAY\< DOUBLE>>> …