Mongo CDC

Prepare MongoDB Bundled Jar

  1. flink-sql-connector-mongodb-cdc-*.jar

only cdc 3.1+ is supported

Synchronizing Tables

By using MongoDBSyncTableAction in a Flink DataStream job or directly through flink run, users can synchronize one collection from MongoDB into one Paimon table.

To use this feature through flink run, run the following shell command.

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. mongodb_sync_table
  4. --warehouse <warehouse-path> \
  5. --database <database-name> \
  6. --table <table-name> \
  7. [--partition_keys <partition_keys>] \
  8. [--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \
  9. [--mongodb_conf <mongodb-cdc-source-conf> [--mongodb_conf <mongodb-cdc-source-conf> ...]] \
  10. [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
  11. [--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
ConfigurationDescription
—warehouse
The path to Paimon warehouse.
—database
The database name in Paimon catalog.
—table
The Paimon table name.
—partition_keys
The partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example “dt,hh,mm”.
—computed_column
The definitions of computed columns. The argument field is from MongoDB collection field name. See here for a complete list of configurations.
—mongodb_conf
The configuration for Flink CDC MongoDB sources. Each configuration should be specified in the format “key=value”. hosts, username, password, database and collection are required configurations, others are optional. See its document for a complete list of configurations.
—catalog_conf
The configuration for Paimon catalog. Each configuration should be specified in the format “key=value”. See here for a complete list of catalog configurations.
—table_conf
The configuration for Paimon table sink. Each configuration should be specified in the format “key=value”. See here for a complete list of table configurations.

Here are a few points to take note of:

  1. The mongodb_conf introduces the schema.start.mode parameter on top of the MongoDB CDC source configuration.schema.start.mode provides two modes: dynamic (default) and specified. In dynamic mode, MongoDB schema information is parsed at one level, which forms the basis for schema change evolution. In specified mode, synchronization takes place according to specified criteria. This can be done by configuring field.name to specify the synchronization fields and parser.path to specify the JSON parsing path for those fields. The difference between the two is that the specify mode requires the user to explicitly identify the fields to be used and create a mapping table based on those fields. Dynamic mode, on the other hand, ensures that Paimon and MongoDB always keep the top-level fields consistent, eliminating the need to focus on specific fields. Further processing of the data table is required when using values from nested fields.
  2. The mongodb_conf introduces the default.id.generation parameter as an enhancement to the MongoDB CDC source configuration. The default.id.generation setting offers two distinct behaviors: when set to true and when set to false. When default.id.generation is set to true, the MongoDB CDC source adheres to the default _id generation strategy, which involves stripping the outer $oid nesting to provide a more straightforward identifier. This mode simplifies the _id representation, making it more direct and user-friendly. On the contrary, when default.id.generation is set to false, the MongoDB CDC source retains the original _id structure, without any additional processing. This mode offers users the flexibility to work with the raw _id format as provided by MongoDB, preserving any nested elements like $oid. The choice between the two hinges on the user’s preference: the former for a cleaner, simplified _id and the latter for a direct representation of MongoDB’s _id structure.
OperatorDescription
$
The root element to query. This starts all path expressions.
@
The current node being processed by a filter predicate.
*
Wildcard. Available anywhere a name or numeric are required.
..
Deep scan. Available anywhere a name is required.
.
Dot-notated child.
[‘{name}’ (, ‘{name}’)]
Bracket-notated child or children.
[{number} (, {number})]
Bracket-notated child or children.
[start:end]
Array index or indexes.
[?({expression})]
Filter expression. Expression must evaluate to a boolean value.

Functions can be invoked at the tail end of a path - the input to a function is the output of the path expression. The function output is dictated by the function itself.

FunctionDescriptionOutput type
min()
Provides the min value of an array of numbers.Double
max()
Provides the max value of an array of numbers.Double
avg()
Provides the average value of an array of numbers.Double
stddev()
Provides the standard deviation value of an array of numbersDouble
length()
Provides the length of an arrayInteger
sum()
Provides the sum value of an array of numbers.Double
keys()
Provides the property keys (An alternative for terminal tilde ~)Set
concat(X)
Provides a concatinated version of the path output with a new item.like input
append(X)
add an item to the json path output arraylike input
append(X)
add an item to the json path output arraylike input
first()
Provides the first item of an arrayDepends on the array
last()
Provides the last item of an arrayDepends on the array
index(X)
Provides the item of an array of index: X, if the X is negative, take from backwardsDepends on the array

Path Examples

  1. {
  2. "store": {
  3. "book": [
  4. {
  5. "category": "reference",
  6. "author": "Nigel Rees",
  7. "title": "Sayings of the Century",
  8. "price": 8.95
  9. },
  10. {
  11. "category": "fiction",
  12. "author": "Evelyn Waugh",
  13. "title": "Sword of Honour",
  14. "price": 12.99
  15. },
  16. {
  17. "category": "fiction",
  18. "author": "Herman Melville",
  19. "title": "Moby Dick",
  20. "isbn": "0-553-21311-3",
  21. "price": 8.99
  22. },
  23. {
  24. "category": "fiction",
  25. "author": "J. R. R. Tolkien",
  26. "title": "The Lord of the Rings",
  27. "isbn": "0-395-19395-8",
  28. "price": 22.99
  29. }
  30. ],
  31. "bicycle": {
  32. "color": "red",
  33. "price": 19.95
  34. }
  35. },
  36. "expensive": 10
  37. }
JsonPathResult
$.store.book[].author
Provides the min value of an array of numbers.
$..author
All authors.
$.store.
All things, both books and bicycles.
$.store..price
Provides the standard deviation value of an array of numbers.
$..book[2]
The third book.
$..book[-2]
The second to last book.
$..book[0,1]
The first two books.
$..book[:2]
All books from index 0 (inclusive) until index 2 (exclusive).
$..book[1:2]
All books from index 1 (inclusive) until index 2 (exclusive)
$..book[-2:]
Last two books
$..book[2:]
All books from index 2 (inclusive) to last
$..book[?(@.isbn)]
All books with an ISBN number
$.store.book[?(@.price < 10)]
All books in store cheaper than 10
$..book[?(@.price <= $[‘expensive’])]
All books in store that are not “expensive”
$..book[?(@.author =~ /.REES/i)]
All books matching regex (ignore case)
$..
Give me every thing
$..book.length()
The number of books
  1. The synchronized table is required to have its primary key set as _id. This is because MongoDB’s change events are recorded before updates in messages. Consequently, we can only convert them into Flink’s UPSERT change log stream. The upstart stream demands a unique key, which is why we must declare _id as the primary key. Declaring other columns as primary keys is not feasible, as delete operations only encompass the _id and sharding key, excluding other keys and values.

  2. MongoDB Change Streams are designed to return simple JSON documents without any data type definitions. This is because MongoDB is a document-oriented database, and one of its core features is the dynamic schema, where documents can contain different fields, and the data types of fields can be flexible. Therefore, the absence of data type definitions in Change Streams is to maintain this flexibility and extensibility. For this reason, we have set all field data types for synchronizing MongoDB to Paimon as String to address the issue of not being able to obtain data types.

If the Paimon table you specify does not exist, this action will automatically create the table. Its schema will be derived from MongoDB collection.

Example 1: synchronize collection into one Paimon table

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. mongodb_sync_table \
  4. --warehouse hdfs:///path/to/warehouse \
  5. --database test_db \
  6. --table test_table \
  7. --partition_keys pt \
  8. --computed_column '_year=year(age)' \
  9. --mongodb_conf hosts=127.0.0.1:27017 \
  10. --mongodb_conf username=root \
  11. --mongodb_conf password=123456 \
  12. --mongodb_conf database=source_db \
  13. --mongodb_conf collection=source_table1 \
  14. --catalog_conf metastore=hive \
  15. --catalog_conf uri=thrift://hive-metastore:9083 \
  16. --table_conf bucket=4 \
  17. --table_conf changelog-producer=input \
  18. --table_conf sink.parallelism=4

Example 2: Synchronize collection into a Paimon table according to the specified field mapping.

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. mongodb_sync_table \
  4. --warehouse hdfs:///path/to/warehouse \
  5. --database test_db \
  6. --table test_table \
  7. --partition_keys pt \
  8. --mongodb_conf hosts=127.0.0.1:27017 \
  9. --mongodb_conf username=root \
  10. --mongodb_conf password=123456 \
  11. --mongodb_conf database=source_db \
  12. --mongodb_conf collection=source_table1 \
  13. --mongodb_conf schema.start.mode=specified \
  14. --mongodb_conf field.name=_id,name,description \
  15. --mongodb_conf parser.path=$._id,$.name,$.description \
  16. --catalog_conf metastore=hive \
  17. --catalog_conf uri=thrift://hive-metastore:9083 \
  18. --table_conf bucket=4 \
  19. --table_conf changelog-producer=input \
  20. --table_conf sink.parallelism=4

Synchronizing Databases

By using MongoDBSyncDatabaseAction in a Flink DataStream job or directly through flink run, users can synchronize the whole MongoDB database into one Paimon database.

To use this feature through flink run, run the following shell command.

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. mongodb_sync_database
  4. --warehouse <warehouse-path> \
  5. --database <database-name> \
  6. [--table_prefix <paimon-table-prefix>] \
  7. [--table_suffix <paimon-table-suffix>] \
  8. [--including_tables <mongodb-table-name|name-regular-expr>] \
  9. [--excluding_tables <mongodb-table-name|name-regular-expr>] \
  10. [--partition_keys <partition_keys>] \
  11. [--primary_keys <primary-keys>] \
  12. [--mongodb_conf <mongodb-cdc-source-conf> [--mongodb_conf <mongodb-cdc-source-conf> ...]] \
  13. [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
  14. [--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
ConfigurationDescription
—warehouse
The path to Paimon warehouse.
—database
The database name in Paimon catalog.
—tableprefix
The prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have “ods“ as prefix, you can specify “—tableprefix ods“.
—table_suffix
The suffix of all Paimon tables to be synchronized. The usage is same as “—table_prefix”.
—including_tables
It is used to specify which source tables are to be synchronized. You must use ‘|’ to separate multiple tables.Because ‘|’ is a special character, a comma is required, for example: ‘a|b|c’.Regular expression is supported, for example, specifying “—including_tables test|paimon.*” means to synchronize table ‘test’ and all tables start with ‘paimon’.
—excluding_tables
It is used to specify which source tables are not to be synchronized. The usage is same as “—including_tables”. “—excluding_tables” has higher priority than “—including_tables” if you specified both.
—partition_keys
The partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example “dt,hh,mm”. If the keys are not in source table, the sink table won’t set partition keys.
—primary_keys
The primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example “buyer_id,seller_id”. If the keys are not in source table, but the source table has primary keys, the sink table will use source table’s primary keys. Otherwise, the sink table won’t set primary keys.
—mongodb_conf
The configuration for Flink CDC MongoDB sources. Each configuration should be specified in the format “key=value”. hosts, username, password, database are required configurations, others are optional. See its document for a complete list of configurations.
—catalog_conf
The configuration for Paimon catalog. Each configuration should be specified in the format “key=value”. See here for a complete list of catalog configurations.
—table_conf
The configuration for Paimon table sink. Each configuration should be specified in the format “key=value”. See here for a complete list of table configurations.

All collections to be synchronized need to set _id as the primary key. For each MongoDB collection to be synchronized, if the corresponding Paimon table does not exist, this action will automatically create the table. Its schema will be derived from all specified MongoDB collection. If the Paimon table already exists, its schema will be compared against the schema of all specified MongoDB collection. Any MongoDB tables created after the commencement of the task will automatically be included.

Example 1: synchronize entire database

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. mongodb_sync_database \
  4. --warehouse hdfs:///path/to/warehouse \
  5. --database test_db \
  6. --mongodb_conf hosts=127.0.0.1:27017 \
  7. --mongodb_conf username=root \
  8. --mongodb_conf password=123456 \
  9. --mongodb_conf database=source_db \
  10. --catalog_conf metastore=hive \
  11. --catalog_conf uri=thrift://hive-metastore:9083 \
  12. --table_conf bucket=4 \
  13. --table_conf changelog-producer=input \
  14. --table_conf sink.parallelism=4

Example 2: Synchronize the specified table.

  1. <FLINK_HOME>/bin/flink run \
  2. --fromSavepoint savepointPath \
  3. /path/to/paimon-flink-action-0.9.0.jar \
  4. mongodb_sync_database \
  5. --warehouse hdfs:///path/to/warehouse \
  6. --database test_db \
  7. --mongodb_conf hosts=127.0.0.1:27017 \
  8. --mongodb_conf username=root \
  9. --mongodb_conf password=123456 \
  10. --mongodb_conf database=source_db \
  11. --catalog_conf metastore=hive \
  12. --catalog_conf uri=thrift://hive-metastore:9083 \
  13. --table_conf bucket=4 \
  14. --including_tables 'product|user|address|order|custom'