Transactions

Amazon DocumentDB(与 MongoDB 兼容)现在支持 MongoDB 4.0 兼容性,包括事务。您可以跨多个文档、语句、集合和数据库执行事务。利用事务,您可以对 Amazon DocumentDB 集群中的一个或多个文档执行原子、一致、隔离和持久 (ACID) 操作,从而简化应用程序开发。交易的常见使用案例包括财务处理、履行和管理订单以及构建多人游戏。

交易不会产生额外费用。您只需为作为事务一部分使用的读取和写入 IOs 付费。

Requirements

要使用事务功能,您需要满足以下要求:

  • 您必须使用 Amazon DocumentDB 4.0 引擎。

  • 您必须使用与 MongoDB 4.0 或更高版本兼容的驱动程序。

最佳实践

下面是一些最佳做法,以便您能充分利用与 Amazon DocumentDB 的事务。

  • 在事务完成后,始终提交或中止事务。使事务保持在不完整的状态会联系数据库资源,并可能导致写冲突。

  • 建议将事务保持在所需的最小命令数。如果您的事务具有多个可拆分为多个较小事务的语句,则建议这样做以降低超时的可能性。始终旨在创建短事务,而不是长时间运行的读取。

Limitations

  • Amazon DocumentDB 不支持在事务中使用游标。

  • Amazon DocumentDB 无法在事务中创建新集合,并且无法针对非现有集合进行查询/更新。

  • 文档级写入锁定受 1 分钟超时的约束,用户无法配置该超时。

  • 不支持可重试写入、可重试提交和可重试中止。

  • 每个 Amazon DocumentDB 实例对于同时在实例上打开的并发事务的数量都有一个上限。有关限制,请参阅实例限制

  • 对于给定事务,事务日志大小必须小于 32MB。

  • Amazon DocumentDB 在事务中支持 count(),但并非所有驱动程序都支持此功能。一种替代方法是使用 countDocuments() API,它将计数查询转换为客户端的聚合查询。

  • 事务具有一分钟执行限制,会话超时为 30 分钟。如果事务超时,则该事务将中止,并且现有事务的会话中发出的任何后续命令将生成以下错误:

    1. WriteCommandError({
    2. "ok" : 0,
    3. "operationTime" : Timestamp(1603491424, 627726),
    4. "code" : 251,
    5. "errmsg" : "Given transaction number 0 does not match any in-progress transactions."
    6. })

监控和诊断

在 Amazon DocumentDB 4.0 中支持事务时,添加了其他 CloudWatch 指标,以帮助您监控事务。

新 CloudWatch 指标

  • DatabaseTransactions:在一分钟期间内进行的打开事务数。

  • DatabaseTransactionsAborted:在一分钟周期内执行的已中止事务数。

  • DatabaseTransactionsMax:一分钟周期内打开的事务的最大数量。

  • TransactionsAborted:一分钟内在实例上中止的事务数。

  • TransactionsCommitted:一分钟内在实例上提交的事务数。

  • TransactionsOpen:在以一分钟为间隔在实例上打开的事务数。

  • TransactionsOpenMax:在一分钟内在实例上打开的事务的最大数量。

  • TransactionsStarted:在一分钟内在实例上启动的事务数。

注意

有关 CloudWatch 的更多 Amazon DocumentDB 指标,请转到 使用 Amazon DocumentDB 监控 CloudWatch

此外,新字段已添加到 currentOp lsidtransactionThreadId 和“idle transaction”和 serverStatus 事务的新状态:currentActivecurrentInactivecurrentOpentotalAbortedtotalCommittedtotalStarted

事务隔离级别

在启动事务时,您能够同时指定 readConcernwriteConcern,如以下示例所示:

mySession.startTransaction({readConcern: {level: 'snapshot'}, writeConcern: {w: 'majority'}});

对于 readConcern,Amazon DocumentDB 默认情况下支持快照隔离。如果指定了本地、可用或大多数 readConcern,Amazon DocumentDB 会将 readConcern 级别升级到快照。Amazon DocumentDB 不支持可线性的 readConcern,并且指定此类读取问题会导致错误。

对于 writeConcern,默认情况下,Amazon DocumentDB 支持大多数写入 quorum,并在跨三个 AZs 保留四个数据副本时实现写入 quorum。 如果指定了较低的 writeConcern,Amazon DocumentDB 会将 writeConcern 升级到大多数。此外,会记录所有 Amazon DocumentDB 写入,并且无法禁用日志。

使用案例

在本节中,我们将演练事务的两个使用案例:多语句和多集合。

多语句事务

Amazon DocumentDB 事务是多语句,这意味着您可以编写一个跨多个语句以及显式提交或回滚的事务。您可以将 insertupdateupdatefindAndModify 操作分组为单个原子操作。

多语句交易的一种常见使用案例是借记积分交易。例如:您需要为 Debty 支付好友款项。因此,您需要从您的账户借记 (提取) 500 USD,并向您的好友账户缴纳 500 USD(存储库)。要执行该操作,您可以在单个交易中同时执行余额和积分操作以确保原子性。这样做可以防止出现从您的账户借记 500 USD 但不会利用您好友的账户。以下是此使用案例的形式:

  1. // *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario***
  2. // Setup bank account for Alice and Bob. Each have $1000 in their account
  3. var databaseName = "bank";
  4. var collectionName = "account";
  5. var amountToTransfer = 500;
  6. var session = db.getMongo().startSession({causalConsistency: false});
  7. var bankDB = session.getDatabase(databaseName);
  8. var accountColl = bankDB[collectionName];
  9. accountColl.drop();
  10. accountColl.insert({name: "Alice", balance: 1000});
  11. accountColl.insert({name: "Bob", balance: 1000});
  12. session.startTransaction();
  13. // deduct $500 from Alice's account
  14. var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
  15. var newAliceBalance = aliceBalance - amountToTransfer;
  16. accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
  17. var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
  18. // add $500 to Bob's account
  19. var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
  20. var newBobBalance = bobBalance + amountToTransfer;
  21. accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
  22. var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
  23. session.commitTransaction();
  24. accountColl.find();
  25. // *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario***
  26. // Setup bank account for Alice and Bob. Each have $1000 in their account
  27. var databaseName = "bank";
  28. var collectionName = "account";
  29. var amountToTransfer = 500;
  30. var session = db.getMongo().startSession({causalConsistency: false});
  31. var bankDB = session.getDatabase(databaseName);
  32. var accountColl = bankDB[collectionName];
  33. accountColl.drop();
  34. accountColl.insert({name: "Alice", balance: 1000});
  35. accountColl.insert({name: "Bob", balance: 1000});
  36. session.startTransaction();
  37. // deduct $500 from Alice's account
  38. var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
  39. var newAliceBalance = aliceBalance - amountToTransfer;
  40. accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
  41. var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
  42. session.abortTransaction();

多集合事务

我们的事务也是多集合事务,这意味着它们可用于在单个事务内以及跨多个集合执行多个操作。这将提供一致的数据视图并维护数据的完整性。当您将命令提交为单个 <> 时,事务是“要么全有要么全无”执行—,因为前者要么全部成功,要么全部失败。

下面是使用同一个方案的多收集事务的示例,以及来自多语句事务示例的数据。

  1. // *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario***
  2. // Setup bank account for Alice and Bob. Each have $1000 in their account
  3. var amountToTransfer = 500;
  4. var collectionName = "account";
  5. var session = db.getMongo().startSession({causalConsistency: false});
  6. var accountCollInBankA = session.getDatabase("bankA")[collectionName];
  7. var accountCollInBankB = session.getDatabase("bankB")[collectionName];
  8. accountCollInBankA.drop();
  9. accountCollInBankB.drop();
  10. accountCollInBankA.insert({name: "Alice", balance: 1000});
  11. accountCollInBankB.insert({name: "Bob", balance: 1000});
  12. session.startTransaction();
  13. // deduct $500 from Alice's account
  14. var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
  15. var newAliceBalance = aliceBalance - amountToTransfer;
  16. accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
  17. var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
  18. // add $500 to Bob's account
  19. var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
  20. var newBobBalance = bobBalance + amountToTransfer;
  21. accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
  22. var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
  23. session.commitTransaction();
  24. accountCollInBankA.find(); // Alice holds $500 in bankA
  25. accountCollInBankB.find(); // Bob holds $1500 in bankB
  26. // *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario***
  27. // Setup bank account for Alice and Bob. Each have $1000 in their account
  28. var collectionName = "account";
  29. var amountToTransfer = 500;
  30. var session = db.getMongo().startSession({causalConsistency: false});
  31. var accountCollInBankA = session.getDatabase("bankA")[collectionName];
  32. var accountCollInBankB = session.getDatabase("bankB")[collectionName];
  33. accountCollInBankA.drop();
  34. accountCollInBankB.drop();
  35. accountCollInBankA.insert({name: "Alice", balance: 1000});
  36. accountCollInBankB.insert({name: "Bob", balance: 1000});
  37. session.startTransaction();
  38. // deduct $500 from Alice's account
  39. var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
  40. var newAliceBalance = aliceBalance - amountToTransfer;
  41. accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
  42. var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
  43. // add $500 to Bob's account
  44. var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
  45. var newBobBalance = bobBalance + amountToTransfer;
  46. accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
  47. var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
  48. session.abortTransaction();
  49. accountCollInBankA.find(); // Alice holds $1000 in bankA
  50. accountCollInBankB.find(); // Bob holds $1000 in bankB

回调 API 的事务 API 示例

回调 API 仅适用于 4.2 版以上的驱动程序。

Javascript

以下代码演示如何将 Amazon DocumentDB 事务 API 与 Javascript 结合使用。

  1. // *** Transfer $500 from Alice to Bob inside a transaction: Success ***
  2. // Setup bank account for Alice and Bob. Each have $1000 in their account
  3. var databaseName = "bank";
  4. var collectionName = "account";
  5. var amountToTransfer = 500;
  6. var session = db.getMongo().startSession({causalConsistency: false});
  7. var bankDB = session.getDatabase(databaseName);
  8. var accountColl = bankDB[collectionName];
  9. accountColl.drop();
  10. accountColl.insert({name: "Alice", balance: 1000});
  11. accountColl.insert({name: "Bob", balance: 1000});
  12. session.startTransaction();
  13. // deduct $500 from Alice's account
  14. var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
  15. assert(aliceBalance >= amountToTransfer);
  16. var newAliceBalance = aliceBalance - amountToTransfer;
  17. accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
  18. var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
  19. assert.eq(newAliceBalance, findAliceBalance);
  20. // add $500 to Bob's account
  21. var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
  22. var newBobBalance = bobBalance + amountToTransfer;
  23. accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
  24. var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
  25. assert.eq(newBobBalance, findBobBalance);
  26. session.commitTransaction();
  27. accountColl.find();

Node.js

以下代码演示了如何将 Amazon DocumentDB 事务 API 与 Node.js 结合使用。

  1. // Node.js callback API:
  2. const bankDB = await mongoclient.db("bank");
  3. var accountColl = await bankDB.createCollection("account");
  4. var amountToTransfer = 500;
  5. const session = mongoclient.startSession({causalConsistency: false});
  6. await accountColl.drop();
  7. await accountColl.insertOne({name: "Alice", balance: 1000}, { session });
  8. await accountColl.insertOne({name: "Bob", balance: 1000}, { session });
  9. const transactionOptions = {
  10. readConcern: { level: 'snapshot' },
  11. writeConcern: { w: 'majority' }
  12. };
  13. // deduct $500 from Alice's account
  14. var aliceBalance = await accountColl.findOne({name: "Alice"}, {session});
  15. assert(aliceBalance.balance >= amountToTransfer);
  16. var newAliceBalance = aliceBalance - amountToTransfer;
  17. session.startTransaction(transactionOptions);
  18. await accountColl.updateOne({name: "Alice"}, {$set: {balance: newAliceBalance}}, {session });
  19. await session.commitTransaction();
  20. aliceBalance = await accountColl.findOne({name: "Alice"}, {session});
  21. assert(newAliceBalance == aliceBalance.balance);
  22. // add $500 to Bob's account
  23. var bobBalance = await accountColl.findOne({name: "Bob"}, {session});
  24. var newBobBalance = bobBalance.balance + amountToTransfer;
  25. session.startTransaction(transactionOptions);
  26. await accountColl.updateOne({name: "Bob"}, {$set: {balance: newBobBalance}}, {session });
  27. await session.commitTransaction();
  28. bobBalance = await accountColl.findOne({name: "Bob"}, {session});
  29. assert(newBobBalance == bobBalance.balance);

C#

以下代码演示了如何将 Amazon DocumentDB 事务 API 与 C# 结合使用。

  1. // C# Callback API
  2. var dbName = "bank";
  3. var collName = "account";
  4. var amountToTransfer = 500;
  5. using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false}))
  6. {
  7. var bankDB = client.GetDatabase(dbName);
  8. var accountColl = bankDB.GetCollection<BsonDocument>(collName);
  9. bankDB.DropCollection(collName);
  10. accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } });
  11. accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } });
  12. // start transaction
  13. var transactionOptions = new TransactionOptions(
  14. readConcern: ReadConcern.Snapshot,
  15. writeConcern: WriteConcern.WMajority);
  16. var result = session.WithTransaction(
  17. (sess, cancellationtoken) =>
  18. {
  19. // deduct $500 from Alice's account
  20. var aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
  21. Debug.Assert(aliceBalance >= amountToTransfer);
  22. var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer;
  23. accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice"),
  24. Builders<BsonDocument>.Update.Set("balance", newAliceBalance));
  25. aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
  26. Debug.Assert(aliceBalance == newAliceBalance);
  27. // add $500 from Bob's account
  28. var bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
  29. var newBobBalance = bobBalance.AsInt32 + amountToTransfer;
  30. accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob"),
  31. Builders<BsonDocument>.Update.Set("balance", newBobBalance));
  32. bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
  33. Debug.Assert(bobBalance == newBobBalance);
  34. return "Transaction committed";
  35. }, transactionOptions);
  36. // check values outside of transaction
  37. var aliceNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
  38. var bobNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
  39. Debug.Assert(aliceNewBalance == 500);
  40. Debug.Assert(bobNewBalance == 1500);
  41. }

Ruby

以下代码演示了如何将 Amazon DocumentDB 事务 API 与 Ruby 结合使用。

  1. // Ruby Callback API
  2. dbName = "bank"
  3. collName = "account"
  4. amountToTransfer = 500
  5. session = client.start_session(:causal_consistency=> false)
  6. bankDB = Mongo::Database.new(client, dbName)
  7. accountColl = bankDB[collName]
  8. accountColl.drop()
  9. accountColl.insert_one({"name"=>"Alice", "balance"=>1000})
  10. accountColl.insert_one({"name"=>"Bob", "balance"=>1000})
  11. # start transaction
  12. session.with_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority}) do
  13. # deduct $500 from Alice's account
  14. aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
  15. assert aliceBalance >= amountToTransfer
  16. newAliceBalance = aliceBalance - amountToTransfer
  17. accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session)
  18. aliceBalance = accountColl.find({"name"=>>"Alice"}, :session=> session).first['balance']
  19. assert_equal(newAliceBalance, aliceBalance)
  20. # add $500 from Bob's account
  21. bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
  22. newBobBalance = bobBalance + amountToTransfer
  23. accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session)
  24. bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
  25. assert_equal(newBobBalance, bobBalance)
  26. end
  27. # check results outside of transaction
  28. aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance']
  29. bobBalance = accountColl.find({"name"=>"Bob"}).first['balance']
  30. assert_equal(aliceBalance, 500)
  31. assert_equal(bobBalance, 1500)
  32. session.end_session

Go

以下代码演示了如何将 Amazon DocumentDB 事务 API 与 Go 结合使用。

  1. // Go - Callback API
  2. type Account struct {
  3. Name string
  4. Balance int
  5. }
  6. ctx := context.TODO()
  7. dbName := "bank"
  8. collName := "account"
  9. amountToTransfer := 500
  10. session, err := client.StartSession(options.Session().SetCausalConsistency(false))
  11. assert.NilError(t, err)
  12. defer session.EndSession(ctx)
  13. bankDB := client.Database(dbName)
  14. accountColl := bankDB.Collection(collName)
  15. accountColl.Drop(ctx)
  16. _, err = accountColl.InsertOne(ctx, bson.M{"name" : "Alice", "balance":1000})
  17. _, err = accountColl.InsertOne(ctx, bson.M{"name" : "Bob", "balance":1000})
  18. transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()).
  19. SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
  20. _, err = session.WithTransaction(ctx, func(sessionCtx mongo.SessionContext) (interface{}, error) {
  21. var result Account
  22. // deduct $500 from Alice's account
  23. err = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result)
  24. aliceBalance := result.Balance
  25. newAliceBalance := aliceBalance - amountToTransfer
  26. _, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}})
  27. err = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result)
  28. aliceBalance = result.Balance
  29. assert.Equal(t, aliceBalance, newAliceBalance)
  30. // add $500 to Bob's account
  31. err = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result)
  32. bobBalance := result.Balance
  33. newBobBalance := bobBalance + amountToTransfer
  34. _, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}})
  35. err = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result)
  36. bobBalance = result.Balance
  37. assert.Equal(t, bobBalance, newBobBalance)
  38. if err != nil {
  39. return nil, err
  40. }
  41. return "transaction committed", err
  42. }, transactionOptions)
  43. // check results outside of transaction
  44. var result Account
  45. err = accountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&result)
  46. aliceNewBalance := result.Balance
  47. err = accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result)
  48. bobNewBalance := result.Balance
  49. assert.Equal(t, aliceNewBalance, 500)
  50. assert.Equal(t, bobNewBalance, 1500)
  51. // Go - Core API
  52. type Account struct {
  53. Name string
  54. Balance int
  55. }
  56. func transferMoneyWithRetry(sessionContext mongo.SessionContext, accountColl *mongo.Collection, t *testing.T) error {
  57. amountToTransfer := 500
  58. transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()).
  59. SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
  60. if err := sessionContext.StartTransaction(transactionOptions); err != nil {
  61. panic(err)
  62. }
  63. var result Account
  64. // deduct $500 from Alice's account
  65. err := accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result)
  66. aliceBalance := result.Balance
  67. newAliceBalance := aliceBalance - amountToTransfer
  68. _, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}})
  69. if err != nil {
  70. sessionContext.AbortTransaction(sessionContext)
  71. }
  72. err = accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result)
  73. aliceBalance = result.Balance
  74. assert.Equal(t, aliceBalance, newAliceBalance)
  75. // add $500 to Bob's account
  76. err = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result)
  77. bobBalance := result.Balance
  78. newBobBalance := bobBalance + amountToTransfer
  79. _, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}})
  80. if err != nil {
  81. sessionContext.AbortTransaction(sessionContext)
  82. }
  83. err = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result)
  84. bobBalance = result.Balance
  85. assert.Equal(t, bobBalance, newBobBalance)
  86. err = sessionContext.CommitTransaction(sessionContext)
  87. return err
  88. }
  89. func doTransactionWithRetry(t *testing.T) {
  90. ctx := context.TODO()
  91. dbName := "bank"
  92. collName := "account"
  93. bankDB := client.Database(dbName)
  94. accountColl := bankDB.Collection(collName)
  95. client.UseSessionWithOptions(ctx, options.Session().SetCausalConsistency(false), func(sessionContext mongo.SessionContext) error {
  96. accountColl.Drop(ctx)
  97. accountColl.InsertOne(sessionContext, bson.M{"name" : "Alice", "balance":1000})
  98. accountColl.InsertOne(sessionContext, bson.M{"name" : "Bob", "balance":1000})
  99. for {
  100. err := transferMoneyWithRetry(sessionContext, accountColl, t)
  101. if err == nil {
  102. println("transaction committed")
  103. return nil
  104. }
  105. if mongoErr := err.(mongo.CommandError); mongoErr.HasErrorLabel("TransientTransactionError") {
  106. continue
  107. }
  108. println("transaction failed")
  109. return err
  110. }
  111. })
  112. // check results outside of transaction
  113. var result Account
  114. accountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&esult)
  115. aliceBalance := result.Balance
  116. assert.Equal(t, aliceBalance, 500)
  117. accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result)
  118. bobBalance := result.Balance
  119. assert.Equal(t, bobBalance, 1500)
  120. }

Java

以下代码演示如何将 Amazon DocumentDB 事务 API 与 Java 结合使用。

  1. // Java (sync) - Callback API
  2. MongoDatabase bankDB = mongoClient.getDatabase("bank");
  3. MongoCollection accountColl = bankDB.getCollection("account");
  4. accountColl.drop();
  5. int amountToTransfer = 500;
  6. // add sample data
  7. accountColl.insertOne(new Document("name", "Alice").append("balance", 1000));
  8. accountColl.insertOne(new Document("name", "Bob").append("balance", 1000));
  9. TransactionOptions txnOptions = TransactionOptions.builder()
  10. .readConcern(ReadConcern.SNAPSHOT)
  11. .writeConcern(WriteConcern.MAJORITY)
  12. .build();
  13. ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();
  14. try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) {
  15. clientSession.withTransaction(new TransactionBody<Void>() {
  16. @Override
  17. public Void execute() {
  18. // deduct $500 from Alice's account
  19. List<Document> documentList = new ArrayList<>();
  20. accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
  21. int aliceBalance = (int) documentList.get(0).get("balance");
  22. int newAliceBalance = aliceBalance - amountToTransfer;
  23. accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance)));
  24. // check Alice's new balance
  25. documentList = new ArrayList<>();
  26. accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
  27. int updatedBalance = (int) documentList.get(0).get("balance");
  28. Assert.assertEquals(updatedBalance, newAliceBalance);
  29. // add $500 to Bob's account
  30. documentList = new ArrayList<>();
  31. accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
  32. int bobBalance = (int) documentList.get(0).get("balance");
  33. int newBobBalance = bobBalance + amountToTransfer;
  34. accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance)));
  35. // check Bob's new balance
  36. documentList = new ArrayList<>();
  37. accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
  38. updatedBalance = (int) documentList.get(0).get("balance");
  39. Assert.assertEquals(updatedBalance, newBobBalance);
  40. return null;
  41. }
  42. }, txnOptions);
  43. }

C

以下代码演示了如何将 Amazon DocumentDB 事务 API 与 C 结合使用。

  1. // Sample Code for C with Callback
  2. #include <bson.h>
  3. #include <mongoc.h>
  4. #include <stdio.h>
  5. #include <string.h>
  6. #include <assert.h>
  7. typedef struct {
  8. int64_t balance;
  9. bson_t *account;
  10. bson_t *opts;
  11. mongoc_collection_t *collection;
  12. } ctx_t;
  13. bool callback_session (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error)
  14. {
  15. bool r = true;
  16. ctx_t *data = (ctx_t *) ctx;
  17. bson_t local_reply;
  18. bson_t *selector = data->account;
  19. bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (data->balance), "}");
  20. mongoc_collection_update_one (data->collection, selector, update, data->opts, &local_reply, error);
  21. *reply = bson_copy (&local_reply);
  22. bson_destroy (&local_reply);
  23. bson_destroy (update);
  24. return r;
  25. }
  26. void test_callback_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){
  27. bson_t reply;
  28. bool r = true;
  29. const bson_t *doc;
  30. bson_iter_t iter;
  31. ctx_t alice_ctx;
  32. ctx_t bob_ctx;
  33. bson_error_t error;
  34. // find query
  35. bson_t *alice_query = bson_new ();
  36. BSON_APPEND_UTF8(alice_query, "name", "Alice");
  37. bson_t *bob_query = bson_new ();
  38. BSON_APPEND_UTF8(bob_query, "name", "Bob");
  39. // create session
  40. // set causal consistency to false
  41. mongoc_session_opt_t *session_opts = mongoc_session_opts_new ();
  42. mongoc_session_opts_set_causal_consistency (session_opts, false);
  43. // start the session
  44. mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error);
  45. // add session to options
  46. bson_t *opts = bson_new();
  47. mongoc_client_session_append (client_session, opts, &error);
  48. // deduct 500 from Alice
  49. // find account balance of Alice
  50. mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
  51. mongoc_cursor_next (cursor, &doc);
  52. bson_iter_init (&iter, doc);
  53. bson_iter_find (&iter, "balance");
  54. int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64;
  55. assert(alice_balance >= amount_to_transfer);
  56. int64_t new_alice_balance = alice_balance - amount_to_transfer;
  57. // set variables which will be used by callback function
  58. alice_ctx.collection = collection;
  59. alice_ctx.opts = opts;
  60. alice_ctx.balance = new_alice_balance;
  61. alice_ctx.account = alice_query;
  62. // callback
  63. r = mongoc_client_session_with_transaction (client_session, &callback_session, NULL, &alice_ctx, &reply, &error);
  64. assert(r);
  65. // find account balance of Alice after transaction
  66. cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
  67. mongoc_cursor_next (cursor, &doc);
  68. bson_iter_init (&iter, doc);
  69. bson_iter_find (&iter, "balance");
  70. alice_balance = (bson_iter_value (&iter))->value.v_int64;
  71. assert(alice_balance == new_alice_balance);
  72. assert(alice_balance == 500);
  73. // add 500 to bob's balance
  74. // find account balance of Bob
  75. cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
  76. mongoc_cursor_next (cursor, &doc);
  77. bson_iter_init (&iter, doc);
  78. bson_iter_find (&iter, "balance");
  79. int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64;
  80. int64_t new_bob_balance = bob_balance + amount_to_transfer;
  81. bob_ctx.collection = collection;
  82. bob_ctx.opts = opts;
  83. bob_ctx.balance = new_bob_balance;
  84. bob_ctx.account = bob_query;
  85. // set read & write concern
  86. mongoc_read_concern_t *read_concern = mongoc_read_concern_new ();
  87. mongoc_write_concern_t *write_concern = mongoc_write_concern_new ();
  88. mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new ();
  89. mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY);
  90. mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
  91. mongoc_transaction_opts_set_write_concern (txn_opts, write_concern);
  92. mongoc_transaction_opts_set_read_concern (txn_opts, read_concern);
  93. // callback
  94. r = mongoc_client_session_with_transaction (client_session, &callback_session, txn_opts, &bob_ctx, &reply, &error);
  95. assert(r);
  96. // find account balance of Bob after transaction
  97. cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
  98. mongoc_cursor_next (cursor, &doc);
  99. bson_iter_init (&iter, doc);
  100. bson_iter_find (&iter, "balance");
  101. bob_balance = (bson_iter_value (&iter))->value.v_int64;
  102. assert(bob_balance == new_bob_balance);
  103. assert(bob_balance == 1500);
  104. // cleanup
  105. bson_destroy(alice_query);
  106. bson_destroy(bob_query);
  107. mongoc_client_session_destroy(client_session);
  108. bson_destroy(opts);
  109. mongoc_transaction_opts_destroy(txn_opts);
  110. mongoc_read_concern_destroy(read_concern);
  111. mongoc_write_concern_destroy(write_concern);
  112. mongoc_cursor_destroy(cursor);
  113. bson_destroy(doc);
  114. }
  115. int main(int argc, char* argv[]) {
  116. mongoc_init ();
  117. mongoc_client_t* client = mongoc_client_new (<connection uri>);
  118. bson_error_t error;
  119. // connect to bank db
  120. mongoc_database_t *database = mongoc_client_get_database (client, "bank");
  121. // access account collection
  122. mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account");
  123. // set amount to transfer
  124. int64_t amount_to_transfer = 500;
  125. // delete the collection if already existing
  126. mongoc_collection_drop(collection, &error);
  127. // open Alice account
  128. bson_t *alice_account = bson_new ();
  129. BSON_APPEND_UTF8(alice_account, "name", "Alice");
  130. BSON_APPEND_INT64(alice_account, "balance", 1000);
  131. // open Bob account
  132. bson_t *bob_account = bson_new ();
  133. BSON_APPEND_UTF8(bob_account, "name", "Bob");
  134. BSON_APPEND_INT64(bob_account, "balance", 1000);
  135. bool r = true;
  136. r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error);
  137. if (!r) {printf("Error encountered:%s", error.message);}
  138. r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error);
  139. if (!r) {printf("Error encountered:%s", error.message);}
  140. test_callback_money_transfer(client, collection, amount_to_transfer);
  141. }

Python

以下代码演示了如何将 Amazon DocumentDB 事务 API 与 Python 结合使用。

  1. // Sample Python code with callback api
  2. import pymongo
  3. def callback(session, balance, query):
  4. collection.update_one(query, {'$set': {"balance": balance}}, session=session)
  5. client = pymongo.MongoClient(<connection uri>)
  6. rc_snapshot = pymongo.read_concern.ReadConcern('snapshot')
  7. wc_majority = pymongo.write_concern.WriteConcern('majority')
  8. # To start, drop and create an account collection and insert balances for both Alice and Bob
  9. collection = client.get_database("bank").get_collection("account")
  10. collection.drop()
  11. collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000})
  12. collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000})
  13. amount_to_transfer = 500
  14. # deduct 500 from Alice's account
  15. alice_balance = collection.find_one({"name": "Alice"}).get("balance")
  16. assert alice_balance >= amount_to_transfer
  17. new_alice_balance = alice_balance - amount_to_transfer
  18. with client.start_session({'causalConsistency':False}) as session:
  19. session.with_transaction(lambda s: callback(s, new_alice_balance, {"name": "Alice"}), read_concern=rc_snapshot, write_concern=wc_majority)
  20. updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance")
  21. assert updated_alice_balance == new_alice_balance
  22. # add 500 to Bob's account
  23. bob_balance = collection.find_one({"name": "Bob"}).get("balance")
  24. assert bob_balance >= amount_to_transfer
  25. new_bob_balance = bob_balance + amount_to_transfer
  26. with client.start_session({'causalConsistency':False}) as session:
  27. session.with_transaction(lambda s: callback(s, new_bob_balance, {"name": "Bob"}), read_concern=rc_snapshot, write_concern=wc_majority)
  28. updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance")
  29. assert updated_bob_balance == new_bob_balance
  30. Sample Python code with Core api
  31. import pymongo
  32. client = pymongo.MongoClient(<connection_string>)
  33. rc_snapshot = pymongo.read_concern.ReadConcern('snapshot')
  34. wc_majority = pymongo.write_concern.WriteConcern('majority')
  35. # To start, drop and create an account collection and insert balances for both Alice and Bob
  36. collection = client.get_database("bank").get_collection("account")
  37. collection.drop()
  38. collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000})
  39. collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000})
  40. amount_to_transfer = 500
  41. # deduct 500 from Alice's account
  42. alice_balance = collection.find_one({"name": "Alice"}).get("balance")
  43. assert alice_balance >= amount_to_transfer
  44. new_alice_balance = alice_balance - amount_to_transfer
  45. with client.start_session({'causalConsistency':False}) as session:
  46. session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority)
  47. collection.update_one({"name": "Alice"}, {'$set': {"balance": new_alice_balance}}, session=session)
  48. session.commit_transaction()
  49. updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance")
  50. assert updated_alice_balance == new_alice_balance
  51. # add 500 to Bob's account
  52. bob_balance = collection.find_one({"name": "Bob"}).get("balance")
  53. assert bob_balance >= amount_to_transfer
  54. new_bob_balance = bob_balance + amount_to_transfer
  55. with client.start_session({'causalConsistency':False}) as session:
  56. session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority)
  57. collection.update_one({"name": "Bob"}, {'$set': {"balance": new_bob_balance}}, session=session)
  58. session.commit_transaction()
  59. updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance")
  60. assert updated_bob_balance == new_bob_balance

核心 API 的事务 API 示例

Javascript

以下代码演示如何将 Amazon DocumentDB 事务 API 与 Javascript 结合使用。

  1. // *** Transfer $500 from Alice to Bob inside a transaction: Success ***
  2. // Setup bank account for Alice and Bob. Each have $1000 in their account
  3. var databaseName = "bank";
  4. var collectionName = "account";
  5. var amountToTransfer = 500;
  6. var session = db.getMongo().startSession({causalConsistency: false});
  7. var bankDB = session.getDatabase(databaseName);
  8. var accountColl = bankDB[collectionName];
  9. accountColl.drop();
  10. accountColl.insert({name: "Alice", balance: 1000});
  11. accountColl.insert({name: "Bob", balance: 1000});
  12. session.startTransaction();
  13. // deduct $500 from Alice's account
  14. var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
  15. assert(aliceBalance >= amountToTransfer);
  16. var newAliceBalance = aliceBalance - amountToTransfer;
  17. accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
  18. var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
  19. assert.eq(newAliceBalance, findAliceBalance);
  20. // add $500 to Bob's account
  21. var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
  22. var newBobBalance = bobBalance + amountToTransfer;
  23. accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
  24. var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
  25. assert.eq(newBobBalance, findBobBalance);
  26. session.commitTransaction();
  27. accountColl.find();

C#

以下代码演示了如何将 Amazon DocumentDB 事务 API 与 C# 结合使用。

  1. // C# Core API
  2. public void TransferMoneyWithRetry(IMongoCollection<bSondocument> accountColl, IClientSessionHandle session)
  3. {
  4. var amountToTransfer = 500;
  5. // start transaction
  6. var transactionOptions = new TransactionOptions(
  7. readConcern: ReadConcern.Snapshot,
  8. writeConcern: WriteConcern.WMajority);
  9. session.StartTransaction(transactionOptions);
  10. try
  11. {
  12. // deduct $500 from Alice's account
  13. var aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
  14. Debug.Assert(aliceBalance >= amountToTransfer);
  15. var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer;
  16. accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Alice"),
  17. Builders<bSondocument>.Update.Set("balance", newAliceBalance));
  18. aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
  19. Debug.Assert(aliceBalance == newAliceBalance);
  20. // add $500 from Bob's account
  21. var bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
  22. var newBobBalance = bobBalance.AsInt32 + amountToTransfer;
  23. accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Bob"),
  24. Builders<bSondocument>.Update.Set("balance", newBobBalance));
  25. bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
  26. Debug.Assert(bobBalance == newBobBalance);
  27. }
  28. catch (Exception e)
  29. {
  30. session.AbortTransaction();
  31. throw;
  32. }
  33. session.CommitTransaction();
  34. }
  35. }
  36. public void DoTransactionWithRetry(MongoClient client)
  37. {
  38. var dbName = "bank";
  39. var collName = "account";
  40. using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false}))
  41. {
  42. try
  43. {
  44. var bankDB = client.GetDatabase(dbName);
  45. var accountColl = bankDB.GetCollection<bSondocument>(collName);
  46. bankDB.DropCollection(collName);
  47. accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } });
  48. accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } });
  49. while(true) {
  50. try
  51. {
  52. TransferMoneyWithRetry(accountColl, session);
  53. break;
  54. }
  55. catch (MongoException e)
  56. {
  57. if(e.HasErrorLabel("TransientTransactionError"))
  58. {
  59. continue;
  60. }
  61. else
  62. {
  63. throw;
  64. }
  65. }
  66. }
  67. // check values outside of transaction
  68. var aliceNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
  69. var bobNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
  70. Debug.Assert(aliceNewBalance == 500);
  71. Debug.Assert(bobNewBalance == 1500);
  72. }
  73. catch (Exception e)
  74. {
  75. Console.WriteLine("Error running transaction: " + e.Message);
  76. }
  77. }
  78. }

Ruby

以下代码演示如何将 Amazon DocumentDB 事务 API 与 Ruby 结合使用。

  1. # Ruby Core API
  2. def transfer_money_w_retry(session, accountColl)
  3. amountToTransfer = 500
  4. session.start_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority})
  5. # deduct $500 from Alice's account
  6. aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
  7. assert aliceBalance >= amountToTransfer
  8. newAliceBalance = aliceBalance - amountToTransfer
  9. accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session)
  10. aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
  11. assert_equal(newAliceBalance, aliceBalance)
  12. # add $500 to Bob's account
  13. bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
  14. newBobBalance = bobBalance + amountToTransfer
  15. accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session)
  16. bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
  17. assert_equal(newBobBalance, bobBalance)
  18. session.commit_transaction
  19. end
  20. def do_txn_w_retry(client)
  21. dbName = "bank"
  22. collName = "account"
  23. session = client.start_session(:causal_consistency=> false)
  24. bankDB = Mongo::Database.new(client, dbName)
  25. accountColl = bankDB[collName]
  26. accountColl.drop()
  27. accountColl.insert_one({"name"=>"Alice", "balance"=>1000})
  28. accountColl.insert_one({"name"=>"Bob", "balance"=>1000})
  29. begin
  30. transferMoneyWithRetry(session, accountColl)
  31. puts "transaction committed"
  32. rescue Mongo::Error => e
  33. if e.label?('TransientTransactionError')
  34. retry
  35. else
  36. puts "transaction failed"
  37. raise
  38. end
  39. end
  40. # check results outside of transaction
  41. aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance']
  42. bobBalance = accountColl.find({"name"=>"Bob"}).first['balance']
  43. assert_equal(aliceBalance, 500)
  44. assert_equal(bobBalance, 1500)
  45. end

Java

以下代码演示如何将 Amazon DocumentDB 事务 API 与 Java 结合使用。

  1. // Java (sync) - Core API
  2. public void transferMoneyWithRetry() {
  3. // connect to server
  4. MongoClientURI mongoURI = new MongoClientURI(uri);
  5. MongoClient mongoClient = new MongoClient(mongoURI);
  6. MongoDatabase bankDB = mongoClient.getDatabase("bank");
  7. MongoCollection accountColl = bankDB.getCollection("account");
  8. accountColl.drop();
  9. // insert some sample data
  10. accountColl.insertOne(new Document("name", "Alice").append("balance", 1000));
  11. accountColl.insertOne(new Document("name", "Bob").append("balance", 1000));
  12. while (true) {
  13. try {
  14. doTransferMoneyWithRetry(accountColl, mongoClient);
  15. break;
  16. } catch (MongoException e) {
  17. if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
  18. continue;
  19. } else {
  20. throw e;
  21. }
  22. }
  23. }
  24. }
  25. public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) {
  26. int amountToTransfer = 500;
  27. TransactionOptions txnOptions = TransactionOptions.builder()
  28. .readConcern(ReadConcern.SNAPSHOT)
  29. .writeConcern(WriteConcern.MAJORITY)
  30. .build();
  31. ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();
  32. try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) {
  33. clientSession.startTransaction(txnOptions);
  34. // deduct $500 from Alice's account
  35. List<Document> documentList = new ArrayList<>();
  36. accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
  37. int aliceBalance = (int) documentList.get(0).get("balance");
  38. Assert.assertTrue(aliceBalance >= amountToTransfer);
  39. int newAliceBalance = aliceBalance - amountToTransfer;
  40. accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance)));
  41. // check Alice's new balance
  42. documentList = new ArrayList<>();
  43. accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
  44. int updatedBalance = (int) documentList.get(0).get("balance");
  45. Assert.assertEquals(updatedBalance, newAliceBalance);
  46. // add $500 to Bob's account
  47. documentList = new ArrayList<>();
  48. accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
  49. int bobBalance = (int) documentList.get(0).get("balance");
  50. int newBobBalance = bobBalance + amountToTransfer;
  51. accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance)));
  52. // check Bob's new balance
  53. documentList = new ArrayList<>();
  54. accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
  55. updatedBalance = (int) documentList.get(0).get("balance");
  56. Assert.assertEquals(updatedBalance, newBobBalance);
  57. // commit transaction
  58. clientSession.commitTransaction();
  59. }
  60. }
  61. // Java (async) -- Core API
  62. public void transferMoneyWithRetry() {
  63. // connect to the server
  64. MongoClient mongoClient = MongoClients.create(uri);
  65. MongoDatabase bankDB = mongoClient.getDatabase("bank");
  66. MongoCollection accountColl = bankDB.getCollection("account");
  67. SubscriberLatchWrapper<Void> dropCallback = new SubscriberLatchWrapper<>();
  68. mongoClient.getDatabase("bank").drop().subscribe(dropCallback);
  69. dropCallback.await();
  70. // insert some sample data
  71. SubscriberLatchWrapper<InsertOneResult> insertionCallback = new SubscriberLatchWrapper<>();
  72. accountColl.insertOne(new Document("name", "Alice").append("balance", 1000)).subscribe(insertionCallback);
  73. insertionCallback.await();
  74. insertionCallback = new SubscriberLatchWrapper<>();
  75. accountColl.insertOne(new Document("name", "Bob").append("balance", 1000)).subscribe(insertionCallback);;
  76. insertionCallback.await();
  77. while (true) {
  78. try {
  79. doTransferMoneyWithRetry(accountColl, mongoClient);
  80. break;
  81. } catch (MongoException e) {
  82. if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
  83. continue;
  84. } else {
  85. throw e;
  86. }
  87. }
  88. }
  89. }
  90. public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) {
  91. int amountToTransfer = 500;
  92. // start the transaction
  93. TransactionOptions txnOptions = TransactionOptions.builder()
  94. .readConcern(ReadConcern.SNAPSHOT)
  95. .writeConcern(WriteConcern.MAJORITY)
  96. .build();
  97. ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();
  98. SubscriberLatchWrapper<ClientSession> sessionCallback = new SubscriberLatchWrapper<>();
  99. mongoClient.startSession(sessionOptions).subscribe(sessionCallback);
  100. ClientSession session = sessionCallback.get().get(0);
  101. session.startTransaction(txnOptions);
  102. // deduct $500 from Alice's account
  103. SubscriberLatchWrapper<Document> findCallback = new SubscriberLatchWrapper<>();
  104. accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback);
  105. Document documentFound = findCallback.get().get(0);
  106. int aliceBalance = (int) documentFound.get("balance");
  107. int newAliceBalance = aliceBalance - amountToTransfer;
  108. SubscriberLatchWrapper<UpdateResult> updateCallback = new SubscriberLatchWrapper<>();
  109. accountColl.updateOne(session, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance))).subscribe(updateCallback);
  110. updateCallback.await();
  111. // check Alice's new balance
  112. findCallback = new SubscriberLatchWrapper<>();
  113. accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback);
  114. documentFound = findCallback.get().get(0);
  115. int updatedBalance = (int) documentFound.get("balance");
  116. Assert.assertEquals(updatedBalance, newAliceBalance);
  117. // add $500 to Bob's account
  118. findCallback = new SubscriberLatchWrapper<>();
  119. accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback);
  120. documentFound = findCallback.get().get(0);
  121. int bobBalance = (int) documentFound.get("balance");
  122. int newBobBalance = bobBalance + amountToTransfer;
  123. updateCallback = new SubscriberLatchWrapper<>();
  124. accountColl.updateOne(session, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance))).subscribe(updateCallback);
  125. updateCallback.await();
  126. // check Bob's new balance
  127. findCallback = new SubscriberLatchWrapper<>();
  128. accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback);
  129. documentFound = findCallback.get().get(0);
  130. updatedBalance = (int) documentFound.get("balance");
  131. Assert.assertEquals(updatedBalance, newBobBalance);
  132. // commit the transaction
  133. SubscriberLatchWrapper<Void> transactionCallback = new SubscriberLatchWrapper<>();
  134. session.commitTransaction().subscribe(transactionCallback);
  135. transactionCallback.await();
  136. }
  137. public class SubscriberLatchWrapper<T> implements Subscriber<T> {
  138. /**
  139. * A Subscriber that stores the publishers results and provides a latch so can block on completion.
  140. *
  141. * @param <T> The publishers result type
  142. */
  143. private final List<T> received;
  144. private final List<RuntimeException> errors;
  145. private final CountDownLatch latch;
  146. private volatile Subscription subscription;
  147. private volatile boolean completed;
  148. /**
  149. * Construct an instance
  150. */
  151. public SubscriberLatchWrapper() {
  152. this.received = new ArrayList<>();
  153. this.errors = new ArrayList<>();
  154. this.latch = new CountDownLatch(1);
  155. }
  156. @Override
  157. public void onSubscribe(final Subscription s) {
  158. subscription = s;
  159. subscription.request(Integer.MAX_VALUE);
  160. }
  161. @Override
  162. public void onNext(final T t) {
  163. received.add(t);
  164. }
  165. @Override
  166. public void onError(final Throwable t) {
  167. if (t instanceof RuntimeException) {
  168. errors.add((RuntimeException) t);
  169. } else {
  170. errors.add(new RuntimeException("Unexpected exception", t));
  171. }
  172. onComplete();
  173. }
  174. @Override
  175. public void onComplete() {
  176. completed = true;
  177. subscription.cancel();
  178. latch.countDown();
  179. }
  180. /**
  181. * Get received elements
  182. *
  183. * @return the list of received elements
  184. */
  185. public List<T> getReceived() {
  186. return received;
  187. }
  188. /**
  189. * Get received elements.
  190. *
  191. * @return the list of receive elements
  192. */
  193. public List<T> get() {
  194. return await().getReceived();
  195. }
  196. /**
  197. * Await completion or error
  198. *
  199. * @return this
  200. */
  201. public SubscriberLatchWrapper<T> await() {
  202. subscription.request(Integer.MAX_VALUE);
  203. try {
  204. if (!latch.await(300, TimeUnit.SECONDS)) {
  205. throw new MongoTimeoutException("Publisher onComplete timed out for 300 seconds");
  206. }
  207. } catch (InterruptedException e) {
  208. throw new MongoInterruptedException("Interrupted waiting for observeration", e);
  209. }
  210. if (!errors.isEmpty()) {
  211. throw errors.get(0);
  212. }
  213. return this;
  214. }
  215. public boolean getCompleted() {
  216. return this.completed;
  217. }
  218. public void close() {
  219. subscription.cancel();
  220. received.clear();
  221. }
  222. }

C

以下代码演示了如何将 Amazon DocumentDB 事务 API 与 C 结合使用。

  1. // Sample C code with core session
  2. bool core_session(mongoc_client_session_t *client_session, mongoc_collection_t* collection, bson_t *selector, int64_t balance){
  3. bool r = true;
  4. bson_error_t error;
  5. bson_t *opts = bson_new();
  6. bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (balance), "}");
  7. // set read & write concern
  8. mongoc_read_concern_t *read_concern = mongoc_read_concern_new ();
  9. mongoc_write_concern_t *write_concern = mongoc_write_concern_new ();
  10. mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new ();
  11. mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY);
  12. mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
  13. mongoc_transaction_opts_set_write_concern (txn_opts, write_concern);
  14. mongoc_transaction_opts_set_read_concern (txn_opts, read_concern);
  15. mongoc_client_session_start_transaction (client_session, txn_opts, &error);
  16. mongoc_client_session_append (client_session, opts, &error);
  17. r = mongoc_collection_update_one (collection, selector, update, opts, NULL, &error);
  18. mongoc_client_session_commit_transaction (client_session, NULL, &error);
  19. bson_destroy (opts);
  20. mongoc_transaction_opts_destroy(txn_opts);
  21. mongoc_read_concern_destroy(read_concern);
  22. mongoc_write_concern_destroy(write_concern);
  23. bson_destroy (update);
  24. return r;
  25. }
  26. void test_core_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){
  27. bson_t reply;
  28. bool r = true;
  29. const bson_t *doc;
  30. bson_iter_t iter;
  31. bson_error_t error;
  32. // find query
  33. bson_t *alice_query = bson_new ();
  34. BSON_APPEND_UTF8(alice_query, "name", "Alice");
  35. bson_t *bob_query = bson_new ();
  36. BSON_APPEND_UTF8(bob_query, "name", "Bob");
  37. // create session
  38. // set causal consistency to false
  39. mongoc_session_opt_t *session_opts = mongoc_session_opts_new ();
  40. mongoc_session_opts_set_causal_consistency (session_opts, false);
  41. // start the session
  42. mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error);
  43. // add session to options
  44. bson_t *opts = bson_new();
  45. mongoc_client_session_append (client_session, opts, &error);
  46. // deduct 500 from Alice
  47. // find account balance of Alice
  48. mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
  49. mongoc_cursor_next (cursor, &doc);
  50. bson_iter_init (&iter, doc);
  51. bson_iter_find (&iter, "balance");
  52. int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64;
  53. assert(alice_balance >= amount_to_transfer);
  54. int64_t new_alice_balance = alice_balance - amount_to_transfer;
  55. // core
  56. r = core_session (client_session, collection, alice_query, new_alice_balance);
  57. assert(r);
  58. // find account balance of Alice after transaction
  59. cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
  60. mongoc_cursor_next (cursor, &doc);
  61. bson_iter_init (&iter, doc);
  62. bson_iter_find (&iter, "balance");
  63. alice_balance = (bson_iter_value (&iter))->value.v_int64;
  64. assert(alice_balance == new_alice_balance);
  65. assert(alice_balance == 500);
  66. // add 500 to Bob's balance
  67. // find account balance of Bob
  68. cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
  69. mongoc_cursor_next (cursor, &doc);
  70. bson_iter_init (&iter, doc);
  71. bson_iter_find (&iter, "balance");
  72. int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64;
  73. int64_t new_bob_balance = bob_balance + amount_to_transfer;
  74. //core
  75. r = core_session (client_session, collection, bob_query, new_bob_balance);
  76. assert(r);
  77. // find account balance of Bob after transaction
  78. cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
  79. mongoc_cursor_next (cursor, &doc);
  80. bson_iter_init (&iter, doc);
  81. bson_iter_find (&iter, "balance");
  82. bob_balance = (bson_iter_value (&iter))->value.v_int64;
  83. assert(bob_balance == new_bob_balance);
  84. assert(bob_balance == 1500);
  85. // cleanup
  86. bson_destroy(alice_query);
  87. bson_destroy(bob_query);
  88. mongoc_client_session_destroy(client_session);
  89. bson_destroy(opts);
  90. mongoc_cursor_destroy(cursor);
  91. bson_destroy(doc);
  92. }
  93. int main(int argc, char* argv[]) {
  94. mongoc_init ();
  95. mongoc_client_t* client = mongoc_client_new (<connection uri>);
  96. bson_error_t error;
  97. // connect to bank db
  98. mongoc_database_t *database = mongoc_client_get_database (client, "bank");
  99. // access account collection
  100. mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account");
  101. // set amount to transfer
  102. int64_t amount_to_transfer = 500;
  103. // delete the collection if already existing
  104. mongoc_collection_drop(collection, &error);
  105. // open Alice account
  106. bson_t *alice_account = bson_new ();
  107. BSON_APPEND_UTF8(alice_account, "name", "Alice");
  108. BSON_APPEND_INT64(alice_account, "balance", 1000);
  109. // open Bob account
  110. bson_t *bob_account = bson_new ();
  111. BSON_APPEND_UTF8(bob_account, "name", "Bob");
  112. BSON_APPEND_INT64(bob_account, "balance", 1000);
  113. bool r = true;
  114. r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error);
  115. if (!r) {printf("Error encountered:%s", error.message);}
  116. r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error);
  117. if (!r) {printf("Error encountered:%s", error.message);}
  118. test_core_money_transfer(client, collection, amount_to_transfer);
  119. }

Scala

以下代码演示了如何将 Amazon DocumentDB 事务 API 与 Scala 结合使用。

  1. // Scala Core API
  2. def transferMoneyWithRetry(sessionObservable: SingleObservable[ClientSession] , database: MongoDatabase ): Unit = {
  3. val accountColl = database.getCollection("account")
  4. var amountToTransfer = 500
  5. var transactionObservable: Observable[ClientSession] = sessionObservable.map(clientSession => {
  6. clientSession.startTransaction()
  7. // deduct $500 from Alice's account
  8. var aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance")
  9. assert(aliceBalance >= amountToTransfer)
  10. var newAliceBalance = aliceBalance - amountToTransfer
  11. accountColl.updateOne(clientSession, Document("name" -> "Alice"), Document("$set" -> Document("balance" -> newAliceBalance))).await()
  12. aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance")
  13. assert(aliceBalance == newAliceBalance)
  14. // add $500 to Bob's account
  15. var bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance")
  16. var newBobBalance = bobBalance + amountToTransfer
  17. accountColl.updateOne(clientSession, Document("name" -> "Bob"), Document("$set" -> Document("balance" -> newBobBalance))).await()
  18. bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance")
  19. assert(bobBalance == newBobBalance)
  20. clientSession
  21. })
  22. transactionObservable.flatMap(clientSession => clientSession.commitTransaction()).await()
  23. }
  24. def doTransactionWithRetry(): Unit = {
  25. val client: MongoClient = MongoClientWrapper.getMongoClient()
  26. val database: MongoDatabase = client.getDatabase("bank")
  27. val accountColl = database.getCollection("account")
  28. accountColl.drop().await()
  29. val sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build()
  30. var sessionObservable: SingleObservable[ClientSession] = client.startSession(sessionOptions)
  31. accountColl.insertOne(Document("name" -> "Alice", "balance" -> 1000)).await()
  32. accountColl.insertOne(Document("name" -> "Bob", "balance" -> 1000)).await()
  33. var retry = true
  34. while (retry) {
  35. try {
  36. transferMoneyWithRetry(sessionObservable, database)
  37. println("transaction committed")
  38. retry = false
  39. }
  40. catch {
  41. case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
  42. println("retrying transaction")
  43. }
  44. case other: Throwable => {
  45. println("transaction failed")
  46. retry = false
  47. throw other
  48. }
  49. }
  50. }
  51. // check results outside of transaction
  52. assert(accountColl.find(Document("name" -> "Alice")).results().head.getInteger("balance") == 500)
  53. assert(accountColl.find(Document("name" -> "Bob")).results().head.getInteger("balance") == 1500)
  54. accountColl.drop().await()
  55. }

支持的 命令

Command支持

abortTransaction

commitTransaction

endSessions

killSession

killAllSession

killAllSessionsByPattern

refreshSessions

startSession

不支持的功能

Methods阶段或命令

db.collection.aggregate()

$collStats

$currentOp

$indexStats

$listSessions

$out

db.collection.count()

db.collection.countDocuments()

$where

$near

$nearSphere

db.collection.insert()

insert 如果不是针对现有集合运行的,则不支持 。如果此方法针对的是预先存在的集合,则支持此方法。

Sessions

MongoDB 会话是一个框架,用于跨分片支持可重试写入、因果一致性、事务和管理操作。创建会话时,客户端会生成一个逻辑会话标识符 (lsid),用于在向服务器发送命令时标记该会话中的所有操作。

Amazon DocumentDB 支持使用会话启用事务,但不支持因果一致性或可重试写入。

在 Amazon DocumentDB 中使用事务时,事务将从使用 session.startTransaction() API 的会话中发起,并且会话一次支持单个事务。同样,事务使用提交 (session.commitTransaction()) 或中止 (session.abortTransaction()) APIs 完成。

焦散一致性

原因一致性保证在单个客户端会话中,客户端将观察先写后读一致性、 Monatomical 读/写,并且写入操作将遵循读取并且这些保证适用于集群中的所有实例,而不仅仅是主实例。Amazon DocumentDB 不支持因果一致性,以下语句将导致错误。

  1. var mySession = db.getMongo().startSession();
  2. var mySessionObject = mySession.getDatabase('test').getCollection('account');
  3. mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}});
  4. //Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }
  5. mySessionObject.find()
  6. //Error: error: {
  7. // "ok" : 0,
  8. // "code" : 303,
  9. // "errmsg" : "Feature not supported: 'causal consistency'",
  10. // "operationTime" : Timestamp(1603461817, 493214)
  11. //}
  12. mySession.endSession()

您可以在会话中禁用因果一致性。请注意,这样做将使您能够利用会话框架,但不会对读取提供因果一致性保证。使用 Amazon DocumentDB 时,从主集群进行的读取将具有先写后读一致性,从副本实例进行的读取将具有最终一致性。事务是使用会话的主要使用案例。

  1. var mySession = db.getMongo().startSession({causalConsistency: false});
  2. var mySessionObject = mySession.getDatabase('test').getCollection('account');
  3. mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}});
  4. //Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }
  5. mySessionObject.find()
  6. //{ "_id" : 1, "name" : "Bob", "balance" : 100 }
  7. //{ "_id" : 2, "name" : "Alice", "balance" : 1700 }

可重试写入

可重试写入是一种功能,在该功能中,客户端尝试重试写入操作(一次、网络出现错误或客户端找不到主集群)。在 Amazon DocumentDB 中,不支持可重试写入,必须禁用。您可以使用连接字符串中的命令 (retryWrites=false) 将其禁用。以下是示例:

  1. mongodb://chimera:<insertYourPassword>@docdb-2019-01-29-02-57-28.cluster-ccuszbx3pn5e.us-east-1.docdb.amazonaws.com:27017/?ssl=true&ssl_ca_certs=rds-combined-ca-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false

事务错误

使用事务时,在某些情况下, 可能会产生一个错误,指出事务编号与任何进行中的事务都不匹配。

错误可在至少两个不同的场景中生成:

  • After the one-minute transaction timeout.
  • After an instance restart (due to patching, crash recovery, etc.), it is possible to receive this error even in cases where the transaction successfully committed. During an instance restart, the database can’t tell the difference between a transaction that successfully completed versus a transaction that aborted. In other words, the transaction completion state is ambiguous.

处理此错误的最佳方式是使事务更新具有静态 - 例如,通过使用 $set 排列器而不是递增/递减操作。请参阅以下内容:

  1. { "ok" : 0,
  2. "operationTime" : Timestamp(1603938167, 1),
  3. "code" : 251,
  4. "errmsg" : "Given transaction number 1 does not match any in-progress transactions."
  5. }