将变更流与 Amazon DocumentDB 结合使用

中的变更流功能提供按时间顺序排列的更改事件序列,这些事件在您的集群集合内发生。Amazon DocumentDB(与 MongoDB 兼容)您可以从变更流读取事件以实施许多不同的使用案例,其中包括:

  • 更改通知

  • 使用 Amazon Elasticsearch Service (Amazon ES) 进行全文搜索

  • 使用 Amazon Redshift 分析

应用程序可以使用变更流在各个集合中订阅数据变更。变更流事件在集群上发生时按顺序排列,并在记录事件之后存储 3 个小时(默认情况下)。可使用 change_stream_log_retention_duration 参数将保留期延长至最多 7 天。要修改变更流保留期,请参阅修改变更流日志保留持续时间

支持的 操作

Amazon DocumentDB 支持更改流的以下操作:

  • MongoDB db.collection.watch()db.watch()client.watch() API 中支持的所有更改事件。

  • 更新的完整文档查找。

  • 聚合阶段:$match$project$redact$addFields$replaceRoot

  • 从恢复令牌恢复变更流

  • 使用 startAtOperation 从时间戳恢复变更流(适用于 Amazon DocumentDB v4.0+)

Billing

默认情况下,Amazon DocumentDB 变更流功能处于禁用状态,并且在启用该功能之前不会产生任何额外费用。在集群中使用变更流将产生额外的读取和写入 IOs 和存储费用。您可以使用 modifyChangeStreams API 操作为集群启用此功能。有关定价的更多信息,请参阅 Amazon DocumentDB 定价

Limitations

变更流在 Amazon DocumentDB 中具有以下限制:

  • 变更流只能通过与 Amazon DocumentDB 集群主实例的连接打开。当前不支持从副本实例上的变更流中进行读取。在调用 watch() API 操作时,您必须指定 primary 读取首选项,以确保所有读取都定向到主实例(请参阅示例部分)。

  • 写入集合的变更流的事件最多可在 7 天内使用(默认为 3 小时)。变更流数据将在日志保留时段过后删除,即使没有发生新更改也是如此。

  • 对集合执行的长时间运行的写入操作(如 updateManydeleteMany)可能会临时暂停变更流事件的写入,直到长时间运行的写入操作完成为止。

  • Amazon DocumentDB 不支持 MongoDB 操作日志 (oplog)。

  • 使用 Amazon DocumentDB,您必须在给定集合上显式启用变更流。

  • 如果变更流事件的总大小(包括变更数据,在请求的情况下还包括完整文档)大于 16 MB,客户端将在变更流上遇到读取失败情况。

  • db.watch()client.watch() 与 Amazon DocumentDB v3.6 结合使用时,目前不支持 Ruby 驱动程序。

启用变更流

您可以为给定数据库中的所有集合启用 Amazon DocumentDB 变更流,也可以仅为选定的集合启用。以下是如何使用 mongo shell 为不同使用案例启用变更流的示例。指定数据库和集合名称时,空字符串将被视为通配符。

  1. //Enable change streams for the collection "foo" in database "bar"
  2. db.adminCommand({modifyChangeStreams: 1,
  3. database: "bar",
  4. collection: "foo",
  5. enable: true});
  1. //Disable change streams on collection "foo" in database "bar"
  2. db.adminCommand({modifyChangeStreams: 1,
  3. database: "bar",
  4. collection: "foo",
  5. enable: false});
  1. //Enable change streams for all collections in database "bar"
  2. db.adminCommand({modifyChangeStreams: 1,
  3. database: "bar",
  4. collection: "",
  5. enable: true});
  1. //Enable change streams for all collections in all databases in a cluster
  2. db.adminCommand({modifyChangeStreams: 1,
  3. database: "",
  4. collection: "",
  5. enable: true});

如果满足以下任意条件,则将为集合启用变更流:

  • 数据库和集合均显式启用。

  • 包含集合的数据库已启用。

  • 启用所有数据库。

如果父数据库还启用了更改流或者集群中的所有数据库已启用,则从数据库中删除集合不会禁用该集合的更改流。如果创建了与已删除集合同名的新集合,则将为该集合启用更改流。

您可以使用 $listChangeStreams 聚合管道阶段列出所有已启用集群的更改流。管道中可以使用 Amazon DocumentDB 支持的所有聚合阶段进行额外处理。如果以前启用的某个集合被禁用,则该集合将不会显示在 $listChangeStreams 输出中。

  1. //List all databases and collections with change streams enabled
  2. cursor = new DBCommandCursor(db,
  3. db.runCommand(
  4. {aggregate: 1,
  5. pipeline: [{$listChangeStreams: 1}],
  6. cursor:{}}));
  1. //List of all databases and collections with change streams enabled
  2. { "database" : "test", "collection" : "foo" }
  3. { "database" : "bar", "collection" : "" }
  4. { "database" : "", "collection" : "" }
  1. //Determine if the database “bar” or collection “bar.foo” have change streams enabled
  2. cursor = new DBCommandCursor(db,
  3. db.runCommand(
  4. {aggregate: 1,
  5. pipeline: [{$listChangeStreams: 1},
  6. {$match: {$or: [{database: "bar", collection: "foo"},
  7. {database: "bar", collection: ""},
  8. {database: "", collection: ""}]}}
  9. ],
  10. cursor:{}}));

例如:将变更流与 Python 结合使用

以下是在集合级别将 Amazon DocumentDB 变更流与 Python 结合使用的示例。

  1. import os
  2. import sys
  3. from pymongo import MongoClient, ReadPreference
  4. username = "DocumentDBusername"
  5. password = <Insert your password>
  6. clusterendpoint = "DocumentDBClusterEndpoint”
  7. client = MongoClient(clusterendpoint, username=username, password=password, ssl='true', ssl_ca_certs='rds-combined-ca-bundle.pem')
  8. db = client['bar']
  9. #While ‘Primary’ is the default read preference, here we give an example of
  10. #how to specify the required read preference when reading the change streams
  11. coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY)
  12. #Create a stream object
  13. stream = coll.watch()
  14. #Write a new document to the collection to generate a change event
  15. coll.insert_one({'x': 1})
  16. #Read the next change event from the stream (if any)
  17. print(stream.try_next())
  18. """
  19. Expected Output:
  20. {'_id': {'_data': '015daf94f600000002010000000200009025'},
  21. 'clusterTime': Timestamp(1571788022, 2),
  22. 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')},
  23. 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1},
  24. 'ns': {'coll': 'foo', 'db': 'bar'},
  25. 'operationType': 'insert'}
  26. """
  27. #A subsequent attempt to read the next change event returns nothing, as there are no new changes
  28. print(stream.try_next())
  29. """
  30. Expected Output:
  31. None
  32. """
  33. #Generate a new change event by updating a document
  34. result = coll.update_one({'x': 1}, {'$set': {'x': 2}})
  35. print(stream.try_next())
  36. """
  37. Expected Output:
  38. {'_id': {'_data': '015daf99d400000001010000000100009025'},
  39. 'clusterTime': Timestamp(1571789268, 1),
  40. 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')},
  41. 'ns': {'coll': 'foo', 'db': 'bar'},
  42. 'operationType': 'update',
  43. 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}}
  44. """

以下是在数据库级别将 Amazon DocumentDB 变更流与 Python 结合使用的示例。

  1. import os
  2. import sys
  3. from pymongo import MongoClient
  4. username = "DocumentDBusername"
  5. password = <Insert your password>
  6. clusterendpoint = "DocumentDBClusterEndpoint”
  7. client = MongoClient(clusterendpoint, username=username, password=password, ssl='true', ssl_ca_certs='rds-combined-ca-bundle.pem')
  8. db = client['bar']
  9. #Create a stream object
  10. stream = db.watch()
  11. coll = db.get_collection('foo')
  12. #Write a new document to the collection foo to generate a change event
  13. coll.insert_one({'x': 1})
  14. #Read the next change event from the stream (if any)
  15. print(stream.try_next())
  16. """
  17. Expected Output:
  18. {'_id': {'_data': '015daf94f600000002010000000200009025'},
  19. 'clusterTime': Timestamp(1571788022, 2),
  20. 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')},
  21. 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1},
  22. 'ns': {'coll': 'foo', 'db': 'bar'},
  23. 'operationType': 'insert'}
  24. """
  25. #A subsequent attempt to read the next change event returns nothing, as there are no new changes
  26. print(stream.try_next())
  27. """
  28. Expected Output:
  29. None
  30. """
  31. coll = db.get_collection('foo1')
  32. #Write a new document to another collection to generate a change event
  33. coll.insert_one({'x': 1})
  34. print(stream.try_next())
  35. """
  36. Expected Output: Since the change stream cursor was the database level you can see change events from different collections in the same database
  37. {'_id': {'_data': '015daf94f600000002010000000200009025'},
  38. 'clusterTime': Timestamp(1571788022, 2),
  39. 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')},
  40. 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1},
  41. 'ns': {'coll': 'foo1', 'db': 'bar'},
  42. 'operationType': 'insert'}
  43. """

完整文档查找

更新更改事件不包括完整文档;它仅包含已进行的更改。如果您的使用案例需要用到受更新影响的完整文档,则可以在打开流时启用完整文档查找。

更新变更流事件的 fullDocument 文档会指明文档查找时已更新文档的最新版本。如果在更新操作和 fullDocument 查找之间发生了更改,则 fullDocument 文档可能无法表示更新时的文档状态。

  1. #Create a stream object with update lookup enabled
  2. stream = coll.watch(full_document='updateLookup')
  3. #Generate a new change event by updating a document
  4. result = coll.update_one({'x': 2}, {'$set': {'x': 3}})
  5. stream.try_next()
  6. #Output:
  7. {'_id': {'_data': '015daf9b7c00000001010000000100009025'},
  8. 'clusterTime': Timestamp(1571789692, 1),
  9. 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')},
  10. 'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3},
  11. 'ns': {'coll': 'foo', 'db': 'bar'},
  12. 'operationType': 'update',
  13. 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}

恢复变更流

您可以在以后通过使用恢复令牌来恢复变更流,该令牌相当于上次检索的变更事件文档的 _id 字段。

  1. import os
  2. import sys
  3. from pymongo import MongoClient
  4. username = "DocumentDBusername"
  5. password = <Insert your password>
  6. clusterendpoint = "DocumentDBClusterEndpoint”
  7. client = MongoClient(clusterendpoint, username=username, password=password, ssl='true', ssl_ca_certs='rds-combined-ca-bundle.pem', retryWrites='false')
  8. db = client['bar']
  9. coll = db.get_collection('foo')
  10. #Create a stream object
  11. stream = db.watch()
  12. coll.update_one({'x': 1}, {'$set': {'x': 4}})
  13. event = stream.try_next()
  14. token = event['_id']
  15. print(token)
  16. """
  17. Output: This is the resume token that we will later us to resume the change stream
  18. {'_data': '015daf9c5b00000001010000000100009025'}
  19. """
  20. #Python provides a nice shortcut for getting a stream’s resume token
  21. print(stream.resume_token)
  22. """
  23. Output
  24. {'_data': '015daf9c5b00000001010000000100009025'}
  25. """
  26. #Generate a new change event by updating a document
  27. result = coll.update_one({'x': 4}, {'$set': {'x': 5}})
  28. #Generate another change event by inserting a document
  29. result = coll.insert_one({'y': 5})
  30. #Open a stream starting after the selected resume token
  31. stream = db.watch(full_document='updateLookup', resume_after=token)
  32. #Our first change event is the update with the specified _id
  33. print(stream.try_next())
  34. """
  35. #Output: Since we are resuming the change stream from the resume token, we will see all events after the first update operation. In our case, the change stream will resume from the update operation {x:5}
  36. {'_id': {'_data': '015f7e8f0c000000060100000006000fe038'},
  37. 'operationType': 'update',
  38. 'clusterTime': Timestamp(1602129676, 6),
  39. 'ns': {'db': 'bar', 'coll': 'foo'},
  40. 'documentKey': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2')},
  41. 'fullDocument': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2'), 'x': 5},
  42. 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}}
  43. """
  44. #Followed by the insert
  45. print(stream.try_next())
  46. """
  47. #Output:
  48. {'_id': {'_data': '015f7e8f0c000000070100000007000fe038'},
  49. 'operationType': 'insert',
  50. 'clusterTime': Timestamp(1602129676, 7),
  51. 'ns': {'db': 'bar', 'coll': 'foo'},
  52. 'documentKey': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94')},
  53. 'fullDocument': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94'), 'y': 5}}
  54. """

使用 startAtOperationTime 恢复更改流

稍后,您可以使用 startAtOperationTime 从特定时间戳恢复更改流。

注意

4.0 版以上的版本提供了使用 startAtOperationTime 的功能。Amazon DocumentDB使用 startAtOperationTime 时,变更流光标将仅返回在指定时间戳或之后发生的更改。和 startAtOperationTime 命令相互排斥,因此无法一起使用。resumeAfter

  1. import os
  2. import sys
  3. from pymongo import MongoClient
  4. username = "DocumentDBusername"
  5. password = <Insert your password>
  6. clusterendpoint = "DocumentDBClusterEndpoint”
  7. client = MongoClient(clusterendpoint, username=username, password=password, ssl='true', ssl_ca_certs='rds-root-ca-2020.pem',retryWrites='false')
  8. db = client['bar']
  9. coll = db.get_collection('foo')
  10. #Create a stream object
  11. stream = db.watch()
  12. coll.update_one({'x': 1}, {'$set': {'x': 4}})
  13. event = stream.try_next()
  14. timestamp = event['clusterTime']
  15. print(timestamp)
  16. """
  17. Output
  18. Timestamp(1602129114, 4)
  19. """
  20. #Generate a new change event by updating a document
  21. result = coll.update_one({'x': 4}, {'$set': {'x': 5}})
  22. result = coll.insert_one({'y': 5})
  23. #Generate another change event by inserting a document
  24. #Open a stream starting after specified time stamp
  25. stream = db.watch(start_at_operation_time=timestamp)
  26. print(stream.try_next())
  27. """
  28. #Output: Since we are resuming the change stream at the time stamp of our first update operation (x:4), the change stream cursor will point to that event
  29. {'_id': {'_data': '015f7e941a000000030100000003000fe038'},
  30. 'operationType': 'update',
  31. 'clusterTime': Timestamp(1602130970, 3),
  32. 'ns': {'db': 'bar', 'coll': 'foo'},
  33. 'documentKey': {'_id': ObjectId('5f7e9417c423bafbfd9adbb1')},
  34. 'updateDescription': {'updatedFields': {'x': 4}, 'removedFields': []}}
  35. """
  36. print(stream.try_next())
  37. """
  38. #Output: The second event will be the subsequent update operation (x:5)
  39. {'_id': {'_data': '015f7e9502000000050100000005000fe038'},
  40. 'operationType': 'update',
  41. 'clusterTime': Timestamp(1602131202, 5),
  42. 'ns': {'db': 'bar', 'coll': 'foo'},
  43. 'documentKey': {'_id': ObjectId('5f7e94ffc423bafbfd9adbb2')},
  44. 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}}
  45. """
  46. print(stream.try_next())
  47. """
  48. #Output: And finally the last event will be the insert operation (y:5)
  49. {'_id': {'_data': '015f7e9502000000060100000006000fe038'},
  50. 'operationType': 'insert',
  51. 'clusterTime': Timestamp(1602131202, 6),
  52. 'ns': {'db': 'bar', 'coll': 'foo'},
  53. 'documentKey': {'_id': ObjectId('5f7e95025c4a569e0f6dde92')},
  54. 'fullDocument': {'_id': ObjectId('5f7e95025c4a569e0f6dde92'), 'y': 5}}
  55. """

变更流中的事务

变更流事件将不包含未提交和/或中止的事务中的事件。例如,如果您使用一个 INSERT 操作和一个 UPDATE 操作和 启动事务。如果您的 INSERT 操作成功,但 UPDATE 操作失败,事务将回滚。由于此事务已回滚,因此您的变更流不会包含此事务的任何事件。

修改变更流日志保留持续时间

您可以使用 AWS 管理控制台或 AWS CLI 将变更流日志的保留时间修改为 1 小时到 7 天之间的时间。

修改变更流日志保留期限的步骤

  1. 通过以下网址登录 AWS 管理控制台并打开 Amazon DocumentDB 控制台:https://console.aws.amazon.com/docdb

  2. 在导航窗格中,选择参数组

    提示

    如果您在屏幕左侧没有看到导航窗格,请在页面左上角选择菜单图标 (使用变更流 - 图1)。

  3. Parameter groups (参数组) 窗格中,选择与您的集群关联的集群参数组。要标识与集群关联的集群参数组,请参阅确定 Amazon DocumentDB 集群的参数组

  4. 生成的页面显示了集群参数组的参数及其相应的详细信息。选择 change_stream_log_retention_duration 参数。

  5. 在页面右上角,选择编辑以更改参数的值。可以将 change_stream_log_retention_duration 参数修改为介于 1 到 7 天之间。

  6. 进行更改,然后选择 Modify cluster parameter (修改集群参数) 以保存更改。要放弃更改,请选择 Cancel (取消)

要修改集群参数组的 change_stream_log_retention_duration 参数,请使用带以下参数的 modify-db-cluster-parameter-group 操作:

  • --db-cluster-parameter-group-name — 必需。您正在修改的集群参数组的名称。要标识与集群关联的集群参数组,请参阅确定 Amazon DocumentDB 集群的参数组

  • --parameters — 必需。您正在修改的参数。每个参数条目必须包含以下内容:

    • ParameterName — 您正在修改的参数的名称。在本例中,它是 change_stream_log_retention_duration

    • ParameterValue — 此参数的新值。

    • ApplyMethod — 您希望如何对应用的参数进行更改。允许的值为 immediatepending-reboot

      注意

      具有 ApplyTypestatic 参数必须具有 ApplyMethodpending-reboot

  1. 要更改参数 change_stream_log_retention_duration 的值,请运行以下命令并将 parameter-value 替换为要将参数修改为的值。

    对于 Linux、macOS 或 Unix:

    1. aws docdb modify-db-cluster-parameter-group \
    2. --db-cluster-parameter-group-name sample-parameter-group \
    3. --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    对于 Windows:

    1. aws docdb modify-db-cluster-parameter-group ^
    2. --db-cluster-parameter-group-name sample-parameter-group ^
    3. --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    此操作的输出将类似于下文(JSON 格式)。

    1. {
    2. "DBClusterParameterGroupName": "sample-parameter-group"
    3. }
  2. 等待至少 5 分钟。

  3. 列出 sample-parameter-group 的参数值,确保您的更改已执行。

    对于 Linux、macOS 或 Unix:

    1. aws docdb describe-db-cluster-parameters \
    2. --db-cluster-parameter-group-name sample-parameter-group

    对于 Windows:

    1. aws docdb describe-db-cluster-parameters ^
    2. --db-cluster-parameter-group-name sample-parameter-group

    此操作的输出将类似于下文(JSON 格式)。

    1. {
    2. "Parameters": [
    3. {
    4. "ParameterName": "audit_logs",
    5. "ParameterValue": "disabled",
    6. "Description": "Enables auditing on cluster.",
    7. "Source": "system",
    8. "ApplyType": "dynamic",
    9. "DataType": "string",
    10. "AllowedValues": "enabled,disabled",
    11. "IsModifiable": true,
    12. "ApplyMethod": "pending-reboot"
    13. },
    14. {
    15. "ParameterName": "change_stream_log_retention_duration",
    16. "ParameterValue": "12345",
    17. "Description": "Duration of time in seconds that the change stream log is retained and can be consumed.",
    18. "Source": "user",
    19. "ApplyType": "dynamic",
    20. "DataType": "integer",
    21. "AllowedValues": "3600-86400",
    22. "IsModifiable": true,
    23. "ApplyMethod": "immediate"
    24. }
    25. ]
    26. }