MongoDB-CDC

概述

MongoDB CDC 连接器允许从 MongoDB 读取快照数据和增量数据。本文档介绍如何设置 MongoDB CDC 连接器以对 MongoDB 运行 SQL 查询。

支持的版本

Extract 节点版本
MongoDB-CDCMongoDB: >= 3.6

依赖项

I.为了设置 MongoDB CDC 连接器,下表提供了使用构建自动化工具(例如 Maven 或 SBT)的依赖关系信息

Maven依赖

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-mongodb-cdc</artifactId>
  4. <version>1.2.0-incubating</version>
  5. </dependency>

设置 MongoDB

可用性

  • MongoDB 版本

    MongoDB 版本 >\= 3.6 我们使用 更改流功能(3.6 版中的新功能)来捕获更改数据。

  • 集群部署

    需要 副本集分片集群

  • 存储引擎

    需要 WiredTiger存储引擎。

  • 副本集协议版本

    需要副本集协议版本 1 (pv1)。 从版本 4.0 开始,MongoDB 仅支持 pv1。pv1 是使用 MongoDB 3.2 或更高版本创建的所有新副本集的默认值。

  • 特权

    changeStream MongoDB Kafka 连接器 read 需要权限。

    您可以使用以下示例进行简单授权。 更详细的授权请参考 MongoDB 数据库用户角色

    1. use admin;
    2. db.createUser({
    3. user: "flinkuser",
    4. pwd: "flinkpw",
    5. roles: [
    6. { role: "read", db: "admin" }, // read role includes changeStream privilege
    7. { role: "readAnyDatabase", db: "admin" } // for snapshot reading
    8. ]
    9. });

如何创建 MongoDB Extract 节点

SQL API 用法

这个例子展示了如何使用 Flink SQL 创建一个 MongoDB Extract 节点:

  1. -- Set checkpoint every 3000 milliseconds
  2. Flink SQL> SET 'execution.checkpointing.interval' = '3s';
  3. -- Create a MySQL table 'mongodb_extract_node' in Flink SQL
  4. Flink SQL> CREATE TABLE mongodb_extract_node (
  5. _id STRING, // must be declared
  6. name STRING,
  7. weight DECIMAL(10,3),
  8. tags ARRAY<STRING>, -- array
  9. price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
  10. suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
  11. PRIMARY KEY(_id) NOT ENFORCED
  12. ) WITH (
  13. 'connector' = 'mongodb-cdc',
  14. 'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
  15. 'username' = 'flinkuser',
  16. 'password' = 'flinkpw',
  17. 'database' = 'inventory',
  18. 'collection' = 'mongodb_extract_node'
  19. );
  20. -- Read snapshot and binlogs from mongodb_extract_node
  21. Flink SQL> SELECT * FROM mongodb_extract_node;

注意

MongoDB 的更改事件记录在消息之前没有更新。所以,我们只能将其转换为 Flink 的 UPSERT 变更日志流。UPSERT 流需要唯一键,因此我们必须声明 _id 为主键。我们不能将其他列声明为主键,因为删除操作不包含除 _idsharding key 之外的键和值。

InLong Dashboard 用法

TODO: 未来会支持

InLong Manager 用法

TODO: 未来会支持

MongoDB Extract 节点参数

选项是否必须默认类型描述
connector必须(none)String指定要使用的连接器,这里应该是mongodb-cdc.
hosts必须(none)StringMongoDB 服务器的主机名和端口对的逗号分隔列表。例如。localhost:27017,localhost:27018
username可选(none)String连接到 MongoDB 时要使用的数据库用户的名称。仅当 MongoDB 配置为使用身份验证时才需要这样做。
password可选(none)String连接 MongoDB 时使用的密码。仅当 MongoDB 配置为使用身份验证时才需要这样做。
database必须(none)String要监视更改的数据库的名称。
collection必须(none)String数据库中要监视更改的集合的名称。
connection.options可选(none)StringMongoDB的 & 分隔连接选项。例如。replicaSet=test&connectTimeoutMS=300000
errors.tolerance可选noneString如果遇到错误,是否继续处理消息。接受noneall。设置为none时,连接器会报告错误并在遇到错误时阻止对其余记录的进一步处理。设置为all时,连接器会静默忽略任何错误消息。
errors.log.enable可选trueBoolean是否应将失败操作的详细信息写入日志文件。
copy.existing可选trueBoolean是否从源集合中复制现有数据。
copy.existing.pipeline可选(none)String一组 JSON 对象,描述在复制现有数据时要运行的管道操作。这可以提高复制管理器对索引的使用,并使复制更有效。例如。[{“$match”: {“closed”: “false”}}]确保仅复制已关闭字段设置为 false 的文档。
copy.existing.max.threads可选处理器数量Integer执行数据复制时使用的线程数。
copy.existing.queue.size可选16000Integer执行数据复制时使用的线程数。
poll.max.batch.size可选1000Integer轮询新数据时,单个批次中包含的最大更改流文档数。
poll.await.time.ms可选1500Integer在更改流上检查新结果之前等待的时间量。
heartbeat.interval.ms可选0Integer发送心跳消息之间的时间长度(以毫秒为单位)。使用 0 禁用。

可用元数据

以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。

Key数据类型描述
database_nameSTRING NOT NULL包含该行的数据库的名称。
collection_nameSTRING NOT NULL包含该行的集合的名称。
op_tsTIMESTAMP_LTZ(3) NOT NULL它指示在数据库中进行更改的时间。如果记录是从表的快照而不是更改流中读取的,则该值始终为 0。

扩展的 CREATE TABLE 示例演示了公开这些元数据字段的语法:

  1. CREATE TABLE `mysql_extract_node` (
  2. db_name STRING METADATA FROM 'database_name' VIRTUAL,
  3. collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
  4. operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
  5. _id STRING, // must be declared
  6. name STRING,
  7. weight DECIMAL(10,3),
  8. tags ARRAY<STRING>, -- array
  9. price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
  10. suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
  11. PRIMARY KEY(_id) NOT ENFORCED
  12. ) WITH (
  13. 'connector' = 'mongodb-cdc',
  14. 'hostname' = 'YourHostname',
  15. 'username' = 'YourUsername',
  16. 'password' = 'YourPassword',
  17. 'database' = 'YourDatabase',
  18. 'collection' = 'YourTable'
  19. );

数据类型映射

BSON 类型Flink SQL 类型
TINYINT
SMALLINT
IntINT
LongBIGINT
FLOAT
DoubleDOUBLE
Decimal128DECIMAL(p, s)
BooleanBOOLEAN
Date TimestampDATE
Date TimestampTIME
DateTIMESTAMP(3) TIMESTAMP_LTZ(3)
TimestampTIMESTAMP(0) TIMESTAMP_LTZ(0)
String ObjectId UUID Symbol MD5 JavaScript RegexSTRING
BinDataBYTES
ObjectROW
ArrayARRAY
DBPointerROW\<\$ref STRING, \$id STRING>
GeoJSONPoint : ROW\<type STRING, coordinates ARRAY\<DOUBLE>> Line : ROW\<type STRING, coordinates ARRAY\<ARRAY\< DOUBLE>>> …