MongoDB SQL Connector

Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append & Upsert Mode

The MongoDB connector allows for reading data from and writing data into MongoDB. This document describes how to set up the MongoDB connector to run SQL queries against MongoDB.

The connector can operate in upsert mode for exchanging UPDATE/DELETE messages with the external system using the primary key defined on the DDL.

If no primary key is defined on the DDL, the connector can only operate in append mode for exchanging INSERT only messages with external system.

Dependencies

In order to use the MongoDB connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

There is no connector (yet) available for Flink version 1.20.

The MongoDB connector is not part of the binary distribution. See how to link with it for cluster execution here.

How to create a MongoDB table

The MongoDB table can be defined as following:

  1. -- register a MongoDB table 'users' in Flink SQL
  2. CREATE TABLE MyUserTable (
  3. _id STRING,
  4. name STRING,
  5. age INT,
  6. status BOOLEAN,
  7. PRIMARY KEY (_id) NOT ENFORCED
  8. ) WITH (
  9. 'connector' = 'mongodb',
  10. 'uri' = 'mongodb://user:password@127.0.0.1:27017',
  11. 'database' = 'my_db',
  12. 'collection' = 'users'
  13. );
  14. -- write data into the MongoDB table from the other table "T"
  15. INSERT INTO MyUserTable
  16. SELECT _id, name, age, status FROM T;
  17. -- scan data from the MongoDB table
  18. SELECT id, name, age, status FROM MyUserTable;
  19. -- temporal join the MongoDB table as a dimension table
  20. SELECT * FROM myTopic
  21. LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
  22. ON myTopic.key = MyUserTable._id;

Connector Options

OptionRequiredForwardedDefaultTypeDescription
connector
requiredno(none)StringSpecify what connector to use, here should be ‘mongodb’.
uri
requiredyes(none)StringThe MongoDB connection uri.
database
requiredyes(none)StringThe name of MongoDB database to read or write.
collection
requiredyes(none)StringThe name of MongoDB collection to read or write.
scan.fetch-size
optionalyes2048IntegerGives the reader a hint as to the number of documents that should be fetched from the database per round-trip when reading.
scan.cursor.no-timeout
optionalyestrueBooleanMongoDB server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to true to prevent that. However, if the application takes longer than 30 minutes to process the current batch of documents, the session is marked as expired and closed.
scan.partition.strategy
optionalnodefaultStringSpecifies the partition strategy. Available strategies are single, sample, split-vector, sharded and default. See the following Partitioned Scan section for more details.
scan.partition.size
optionalno64mbMemorySizeSpecifies the partition memory size.
scan.partition.samples
optionalno10IntegerSpecifies the samples count per partition. It only takes effect when the partition strategy is sample. The sample partitioner samples the collection, projects and sorts by the partition fields. Then uses every scan.partition.samples as the value to use to calculate the partition boundaries. The total number of samples taken is calculated as: samples per partition * (count of documents / number of documents per partition).
lookup.cache
optionalnoNONE

Enum

Possible values: NONE, PARTIAL
The cache strategy for the lookup table. Currently supports NONE (no caching) and PARTIAL (caching entries on lookup operation in external database).
lookup.partial-cache.max-rows
optionalno(none)LongThe max number of rows of lookup cache, over this value, the oldest rows will be expired. “lookup.cache” must be set to “PARTIAL” to use this option. See the following Lookup Cache section for more details.
lookup.partial-cache.expire-after-write
optionalno(none)DurationThe max time to live for each rows in lookup cache after writing into the cache. “lookup.cache” must be set to “PARTIAL” to use this option. See the following Lookup Cache section for more details.
lookup.partial-cache.expire-after-access
optionalno(none)DurationThe max time to live for each rows in lookup cache after accessing the entry in the cache. “lookup.cache” must be set to “PARTIAL” to use this option. See the following Lookup Cache section for more details.
lookup.partial-cache.caching-missing-key
optionalnotrueBooleanWhether to store an empty value into the cache if the lookup key doesn’t match any rows in the table. “lookup.cache” must be set to “PARTIAL” to use this option.
lookup.max-retries
optionalno3IntegerThe max retry times if lookup database failed.
lookup.retry.interval
optionalno1sDurationSpecifies the retry time interval if lookup records from database failed.
sink.buffer-flush.max-rows
optionalyes1000IntegerSpecifies the maximum number of buffered rows per batch request.
sink.buffer-flush.interval
optionalyes1sDurationSpecifies the batch flush interval.
sink.max-retries
optionalyes3IntegerThe max retry times if writing records to database failed.
sink.retry.interval
optionalyes1sDurationSpecifies the retry time interval if writing records to database failed.
sink.parallelism
optionalno(none)IntegerDefines the parallelism of the MongoDB sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.
sink.delivery-guarantee
optionalnoat-lease-once

Enum

Possible values: none, at-least-once
Optional delivery guarantee when committing. The exactly-once guarantee is not supported yet.

Features

Key handling

The MongoDB sink can work in either upsert mode or append mode, depending on whether a primary key is defined. If a primary key is defined, the MongoDB sink works in upsert mode which can consume queries containing UPDATE/DELETE messages. If a primary key is not defined, the MongoDB sink works in append mode which can only consume queries containing INSERT only messages.

In MongoDB the primary key is used to calculate the MongoDB document _id. Its value must be unique and immutable in the collection, and may be of any BSON Type other than an Array. If the _id contains subfields, the subfield names cannot begin with a ($) symbol.

There are also some constraints on the primary key index. Before MongoDB 4.2, the total size of an index entry, which can include structural overhead depending on the BSON type, must be less than 1024 bytes. Starting in version 4.2, MongoDB removes the Index Key Limit. For more detailed introduction, you can refer to Index Key Limit.

The MongoDB connector generates a document _id for every row by compositing all primary key fields in the order defined in the DDL.

  • When there’s only a single field in the specified primary key, we convert the field data to bson value as _id of the corresponding document.
  • When there’s multiple fields in the specified primary key, we convert and composite these fields into a bson document as the _id of the corresponding document. For example, if have a primary key statement PRIMARY KEY (f1, f2) NOT ENFORCED, the extracted _id will be the form like _id: {f1: v1, f2: v2}.

Notice that it will be ambiguous if the _id field exists in DDL, but the primary key is not declared as _id. Either use the _id column as the key, or rename the _id column.

See CREATE TABLE DDL for more details about PRIMARY KEY syntax.

Partitioned Scan

To accelerate reading data in parallel Source task instances, Flink provides partitioned scan feature for MongoDB collection. The following partition strategies are provided:

  • single: treats the entire collection as a single partition.
  • sample: samples the collection and generate partitions which is fast but possibly uneven.
  • split-vector: uses the splitVector command to generate partitions for non-sharded collections which is fast and even. The splitVector permission is required.
  • sharded: reads config.chunks (MongoDB splits a sharded collection into chunks, and the range of the chunks are stored within the collection) as the partitions directly. The sharded strategy only used for sharded collection which is fast and even. Read permission of config database is required.
  • default: uses sharded strategy for sharded collections otherwise using split vector strategy.

Lookup Cache

MongoDB connector can be used in temporal join as a lookup source (aka. dimension table). Currently, only sync lookup mode is supported.

By default, lookup cache is not enabled. You can enable it by setting lookup.cache to PARTIAL.

The lookup cache is used to improve performance of temporal join the MongoDB connector. By default, lookup cache is not enabled, so all the requests are sent to external database. When lookup cache is enabled, each process (i.e. TaskManager) will hold a cache. Flink will lookup the cache first, and only send requests to external database when cache missing, and update cache with the rows returned. The oldest rows in cache will be expired when the cache hit to the max cached rows lookup.partial-cache.max-rows or when the row exceeds the max time to live specified by lookup.partial-cache.expire-after-write or lookup.partial-cache.expire-after-access. The cached rows might not be the latest, users can tune expiration options to a smaller value to have a better fresh data, but this may increase the number of requests send to database. So this is a balance between throughput and correctness.

By default, flink will cache the empty query result for a Primary key, you can toggle the behaviour by setting lookup.partial-cache.caching-missing-key to false.

Idempotent Writes

MongoDB sink will use upsert semantics rather than plain INSERT statements if primary key is defined in DDL. We composite the primary key fields as the document _id which is the reserved primary key of MongoDB. Use upsert mode to write rows into MongoDB, which provides idempotence.

If there are failures, the Flink job will recover and re-process from last successful checkpoint, which can lead to re-processing messages during recovery. The upsert mode is highly recommended as it helps avoid constraint violations or duplicate data if records need to be re-processed.

Filters Pushdown

MongoDB supports pushing down simple comparisons and logical filters to optimize queries. The mappings from Flink SQL filters to MongoDB query operators are listed in the following table.

Flink SQL filtersMongoDB Query Operators
=$eq
<>$ne
>$gt
>=$gte
<$lt
<=$lte
IS NULL$eq : null
IS NOT NULL$ne : null
OR$or
AND$and

Data Type Mapping

The field data type mappings from MongoDB BSON types to Flink SQL data types are listed in the following table.

MongoDB BSON typeFlink SQL type
ObjectIdSTRING
StringSTRING
BooleanBOOLEAN
BinaryBINARY
VARBINARY
Int32INTEGER
-TINYINT
SMALLINT
FLOAT
Int64BIGINT
DoubleDOUBLE
Decimal128DECIMAL
DateTimeTIMESTAMP_LTZ(3)
TimestampTIMESTAMP_LTZ(0)
ObjectROW
ArrayARRAY

For specific types in MongoDB, we use Extended JSON format to map them to Flink SQL STRING type.

MongoDB BSON typeFlink SQL STRING
Symbol{“_value”: {“$symbol”: “12”}}
RegularExpression{“_value”: {“$regularExpression”: {“pattern”: “^9$”, “options”: “i”}}}
JavaScript{“_value”: {“$code”: “function() { return 10; }”}}
DbPointer{“_value”: {“$dbPointer”: {“$ref”: “db.coll”, “$id”: {“$oid”: “63932a00da01604af329e33c”}}}}