Transactions

Amazon DocumentDB (with MongoDB compatibility) now supports MongoDB 4.0 compatibility including transactions. You can perform transactions across multiple documents, statements, collections, and databases. Transactions simplify application development by enabling you to perform atomic, consistent, isolated, and durable (ACID) operations across one or more documents within an Amazon DocumentDB cluster. Common use cases for transactions include financial processing, fulfilling and managing orders, and building multi-player games.

There is no additional cost for transactions. You only pay for the read and write IOs that you consume as part of the transactions.

Requirements

To use the transactions feature, you need to meet the following requirements:

  • You must be using the Amazon DocumentDB 4.0 engine.

  • You must use a driver compatible with MongoDB 4.0 or greater.

Best Practices

Here are some best practices so that you can get the most using transactions with Amazon DocumentDB.

  • Always commit or abort the transaction after it is complete. Leaving a transaction in an incomplete state ties up database resources and can cause write conflicts.

  • It is recommended to keep transactions to the smallest number of commands needed. If you have transactions with multiple statements that can be divided up into multiple smaller transactions, it is advisable to do so to reduce the likelihood of a timeout. Always aim to create short transactions, not long-running reads.

Limitations

  • Amazon DocumentDB does not support cursors within a transaction.

  • Amazon DocumentDB cannot create new collections in a transaction and cannot query/update against non-existing collections.

  • Document-level write locks are subject to a 1 minute timeout, which is not configurable by the user.

  • No support for retryable writes, retryable commit and retryable abort.

  • Each Amazon DocumentDB instance has an upper bound limit on the number of concurrent transaction open on the instance at one time. For the limits, please see Instance Limits.

  • For a given transaction, the transaction log size must be less than 32MB.

  • Amazon DocumentDB does support count() within a transactions, but not all drivers support this capability. An alternative is to use the countDocuments() API, which translates the count query into an aggregation query on the client side.

  • Transactions have a one minute execution limit and sessions have a 30-minute timeout. If a transaction times out, it will be aborted, and any subsequent commands issued within the session for the existing transaction will yield the following error:

    1. WriteCommandError({
    2. "ok" : 0,
    3. "operationTime" : Timestamp(1603491424, 627726),
    4. "code" : 251,
    5. "errmsg" : "Given transaction number 0 does not match any in-progress transactions."
    6. })

Monitoring and Diagnostics

With the support for transactions in Amazon DocumentDB 4.0, additional CloudWatch metrics were added to help you monitor your transactions.

New CloudWatch Metrics

  • DatabaseTransactions: The number of open transactions taken at a one-minute period.

  • DatabaseTransactionsAborted: The number of aborted transactions taken at a one-minute period.

  • DatabaseTransactionsMax: The maximum number of open transactions in a one-minute period.

  • TransactionsAborted: The number of transactions aborted on an instance in a one-minute period.

  • TransactionsCommitted: The number of transactions committed on an instance in a one-minute period.

  • TransactionsOpen: The number of transactions open on an instance taken at a one-minute period.

  • TransactionsOpenMax: The maximum number of transactions open on an instance in a one-minute period.

  • TransactionsStarted: The number of transactions started on an instance in a one-minute period.

Note

For more CloudWatch metrics for Amazon DocumentDB, go to Monitoring Amazon DocumentDB with CloudWatch.

Additionally, new fields were added to both currentOp lsid, transactionThreadId, and a new state for “idle transaction” and serverStatus transactions: currentActive, currentInactive, currentOpen, totalAborted, totalCommitted, and totalStarted.

Transaction Isolation Level

When starting a transaction, you have the ability to specify the both the readConcern and writeConcern as shown in the example below:

mySession.startTransaction({readConcern: {level: 'snapshot'}, writeConcern: {w: 'majority'}});

For readConcern, Amazon DocumentDB supports snapshot isolation by default. If a readConcern of local, available, or majority are specified, Amazon DocumentDB will upgrade the readConcern level to snapshot. Amazon DocumentDB does not support the linearizable readConcern and specifying such a read concern will result in an error.

For writeConcern, Amazon DocumentDB supports majority by default and a write quorum is achieved when four copies of the data are persisted across three AZs. If a lower writeConcern is specified, Amazon DocumentDB will upgrade the writeConcern to majority. Further, all Amazon DocumentDB writes are journaled and journaling cannot be disabled.

Use Cases

In this section, we will walk through two use cases for transactions: multi-statement and multi-collection.

Multi-Statement Transactions

Amazon DocumentDB transactions are multi-statement, which means you can write a transaction that spans multiple statements with an explicit commit or rollback. You can group insert, update, update, and findAndModify actions as a single atomic operation.

A common use case for multi-statement transactions is a debit-credit transaction. For example: you owe a friend money for clothes. Thus, you need to debit (withdraw) $500 from your account and credit $500 (deposit) to your friend’s account. To perform that operation, you perform both the debt and credit operations within a single transaction to ensure atomicity. Doing so prevents scenarios where $500 is debited from your account, but not credited to your friend’s account. Here’s what this use case would look like:

  1. // *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario***
  2. // Setup bank account for Alice and Bob. Each have $1000 in their account
  3. var databaseName = "bank";
  4. var collectionName = "account";
  5. var amountToTransfer = 500;
  6. var session = db.getMongo().startSession({causalConsistency: false});
  7. var bankDB = session.getDatabase(databaseName);
  8. var accountColl = bankDB[collectionName];
  9. accountColl.drop();
  10. accountColl.insert({name: "Alice", balance: 1000});
  11. accountColl.insert({name: "Bob", balance: 1000});
  12. session.startTransaction();
  13. // deduct $500 from Alice's account
  14. var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
  15. var newAliceBalance = aliceBalance - amountToTransfer;
  16. accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
  17. var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
  18. // add $500 to Bob's account
  19. var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
  20. var newBobBalance = bobBalance + amountToTransfer;
  21. accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
  22. var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
  23. session.commitTransaction();
  24. accountColl.find();
  25. // *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario***
  26. // Setup bank account for Alice and Bob. Each have $1000 in their account
  27. var databaseName = "bank";
  28. var collectionName = "account";
  29. var amountToTransfer = 500;
  30. var session = db.getMongo().startSession({causalConsistency: false});
  31. var bankDB = session.getDatabase(databaseName);
  32. var accountColl = bankDB[collectionName];
  33. accountColl.drop();
  34. accountColl.insert({name: "Alice", balance: 1000});
  35. accountColl.insert({name: "Bob", balance: 1000});
  36. session.startTransaction();
  37. // deduct $500 from Alice's account
  38. var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
  39. var newAliceBalance = aliceBalance - amountToTransfer;
  40. accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
  41. var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
  42. session.abortTransaction();

Multi-Collection Transactions

Our transactions are also multi-collection, which means they can be used to perform multiple operations within a single transaction and across multiple collections. This provides a consistent view of data and maintains your data’s integrity. When you commit the commands as a single <>, the transactions are all-or-nothing executions—in that, they will either all succeed or all fail.

Here is an example of multi-collection transactions, using the same scenario and data from the example for multi-statement transactions.

  1. // *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario***
  2. // Setup bank account for Alice and Bob. Each have $1000 in their account
  3. var amountToTransfer = 500;
  4. var collectionName = "account";
  5. var session = db.getMongo().startSession({causalConsistency: false});
  6. var accountCollInBankA = session.getDatabase("bankA")[collectionName];
  7. var accountCollInBankB = session.getDatabase("bankB")[collectionName];
  8. accountCollInBankA.drop();
  9. accountCollInBankB.drop();
  10. accountCollInBankA.insert({name: "Alice", balance: 1000});
  11. accountCollInBankB.insert({name: "Bob", balance: 1000});
  12. session.startTransaction();
  13. // deduct $500 from Alice's account
  14. var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
  15. var newAliceBalance = aliceBalance - amountToTransfer;
  16. accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
  17. var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
  18. // add $500 to Bob's account
  19. var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
  20. var newBobBalance = bobBalance + amountToTransfer;
  21. accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
  22. var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
  23. session.commitTransaction();
  24. accountCollInBankA.find(); // Alice holds $500 in bankA
  25. accountCollInBankB.find(); // Bob holds $1500 in bankB
  26. // *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario***
  27. // Setup bank account for Alice and Bob. Each have $1000 in their account
  28. var collectionName = "account";
  29. var amountToTransfer = 500;
  30. var session = db.getMongo().startSession({causalConsistency: false});
  31. var accountCollInBankA = session.getDatabase("bankA")[collectionName];
  32. var accountCollInBankB = session.getDatabase("bankB")[collectionName];
  33. accountCollInBankA.drop();
  34. accountCollInBankB.drop();
  35. accountCollInBankA.insert({name: "Alice", balance: 1000});
  36. accountCollInBankB.insert({name: "Bob", balance: 1000});
  37. session.startTransaction();
  38. // deduct $500 from Alice's account
  39. var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
  40. var newAliceBalance = aliceBalance - amountToTransfer;
  41. accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
  42. var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
  43. // add $500 to Bob's account
  44. var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
  45. var newBobBalance = bobBalance + amountToTransfer;
  46. accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
  47. var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
  48. session.abortTransaction();
  49. accountCollInBankA.find(); // Alice holds $1000 in bankA
  50. accountCollInBankB.find(); // Bob holds $1000 in bankB

Transaction API Examples for Callback API

The callback API is only available for 4.2+ drivers.

Javascript

The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Javascript.

  1. // *** Transfer $500 from Alice to Bob inside a transaction: Success ***
  2. // Setup bank account for Alice and Bob. Each have $1000 in their account
  3. var databaseName = "bank";
  4. var collectionName = "account";
  5. var amountToTransfer = 500;
  6. var session = db.getMongo().startSession({causalConsistency: false});
  7. var bankDB = session.getDatabase(databaseName);
  8. var accountColl = bankDB[collectionName];
  9. accountColl.drop();
  10. accountColl.insert({name: "Alice", balance: 1000});
  11. accountColl.insert({name: "Bob", balance: 1000});
  12. session.startTransaction();
  13. // deduct $500 from Alice's account
  14. var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
  15. assert(aliceBalance >= amountToTransfer);
  16. var newAliceBalance = aliceBalance - amountToTransfer;
  17. accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
  18. var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
  19. assert.eq(newAliceBalance, findAliceBalance);
  20. // add $500 to Bob's account
  21. var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
  22. var newBobBalance = bobBalance + amountToTransfer;
  23. accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
  24. var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
  25. assert.eq(newBobBalance, findBobBalance);
  26. session.commitTransaction();
  27. accountColl.find();

Node.js

The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Node.js.

  1. // Node.js callback API:
  2. const bankDB = await mongoclient.db("bank");
  3. var accountColl = await bankDB.createCollection("account");
  4. var amountToTransfer = 500;
  5. const session = mongoclient.startSession({causalConsistency: false});
  6. await accountColl.drop();
  7. await accountColl.insertOne({name: "Alice", balance: 1000}, { session });
  8. await accountColl.insertOne({name: "Bob", balance: 1000}, { session });
  9. const transactionOptions = {
  10. readConcern: { level: 'snapshot' },
  11. writeConcern: { w: 'majority' }
  12. };
  13. // deduct $500 from Alice's account
  14. var aliceBalance = await accountColl.findOne({name: "Alice"}, {session});
  15. assert(aliceBalance.balance >= amountToTransfer);
  16. var newAliceBalance = aliceBalance - amountToTransfer;
  17. session.startTransaction(transactionOptions);
  18. await accountColl.updateOne({name: "Alice"}, {$set: {balance: newAliceBalance}}, {session });
  19. await session.commitTransaction();
  20. aliceBalance = await accountColl.findOne({name: "Alice"}, {session});
  21. assert(newAliceBalance == aliceBalance.balance);
  22. // add $500 to Bob's account
  23. var bobBalance = await accountColl.findOne({name: "Bob"}, {session});
  24. var newBobBalance = bobBalance.balance + amountToTransfer;
  25. session.startTransaction(transactionOptions);
  26. await accountColl.updateOne({name: "Bob"}, {$set: {balance: newBobBalance}}, {session });
  27. await session.commitTransaction();
  28. bobBalance = await accountColl.findOne({name: "Bob"}, {session});
  29. assert(newBobBalance == bobBalance.balance);

C#

The following code demonstrates how to utilize the Amazon DocumentDB transaction API with C#.

  1. // C# Callback API
  2. var dbName = "bank";
  3. var collName = "account";
  4. var amountToTransfer = 500;
  5. using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false}))
  6. {
  7. var bankDB = client.GetDatabase(dbName);
  8. var accountColl = bankDB.GetCollection<BsonDocument>(collName);
  9. bankDB.DropCollection(collName);
  10. accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } });
  11. accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } });
  12. // start transaction
  13. var transactionOptions = new TransactionOptions(
  14. readConcern: ReadConcern.Snapshot,
  15. writeConcern: WriteConcern.WMajority);
  16. var result = session.WithTransaction(
  17. (sess, cancellationtoken) =>
  18. {
  19. // deduct $500 from Alice's account
  20. var aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
  21. Debug.Assert(aliceBalance >= amountToTransfer);
  22. var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer;
  23. accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice"),
  24. Builders<BsonDocument>.Update.Set("balance", newAliceBalance));
  25. aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
  26. Debug.Assert(aliceBalance == newAliceBalance);
  27. // add $500 from Bob's account
  28. var bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
  29. var newBobBalance = bobBalance.AsInt32 + amountToTransfer;
  30. accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob"),
  31. Builders<BsonDocument>.Update.Set("balance", newBobBalance));
  32. bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
  33. Debug.Assert(bobBalance == newBobBalance);
  34. return "Transaction committed";
  35. }, transactionOptions);
  36. // check values outside of transaction
  37. var aliceNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
  38. var bobNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
  39. Debug.Assert(aliceNewBalance == 500);
  40. Debug.Assert(bobNewBalance == 1500);
  41. }

Ruby

The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Ruby.

  1. // Ruby Callback API
  2. dbName = "bank"
  3. collName = "account"
  4. amountToTransfer = 500
  5. session = client.start_session(:causal_consistency=> false)
  6. bankDB = Mongo::Database.new(client, dbName)
  7. accountColl = bankDB[collName]
  8. accountColl.drop()
  9. accountColl.insert_one({"name"=>"Alice", "balance"=>1000})
  10. accountColl.insert_one({"name"=>"Bob", "balance"=>1000})
  11. # start transaction
  12. session.with_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority}) do
  13. # deduct $500 from Alice's account
  14. aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
  15. assert aliceBalance >= amountToTransfer
  16. newAliceBalance = aliceBalance - amountToTransfer
  17. accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session)
  18. aliceBalance = accountColl.find({"name"=>>"Alice"}, :session=> session).first['balance']
  19. assert_equal(newAliceBalance, aliceBalance)
  20. # add $500 from Bob's account
  21. bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
  22. newBobBalance = bobBalance + amountToTransfer
  23. accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session)
  24. bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
  25. assert_equal(newBobBalance, bobBalance)
  26. end
  27. # check results outside of transaction
  28. aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance']
  29. bobBalance = accountColl.find({"name"=>"Bob"}).first['balance']
  30. assert_equal(aliceBalance, 500)
  31. assert_equal(bobBalance, 1500)
  32. session.end_session

Go

The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Go.

  1. // Go - Callback API
  2. type Account struct {
  3. Name string
  4. Balance int
  5. }
  6. ctx := context.TODO()
  7. dbName := "bank"
  8. collName := "account"
  9. amountToTransfer := 500
  10. session, err := client.StartSession(options.Session().SetCausalConsistency(false))
  11. assert.NilError(t, err)
  12. defer session.EndSession(ctx)
  13. bankDB := client.Database(dbName)
  14. accountColl := bankDB.Collection(collName)
  15. accountColl.Drop(ctx)
  16. _, err = accountColl.InsertOne(ctx, bson.M{"name" : "Alice", "balance":1000})
  17. _, err = accountColl.InsertOne(ctx, bson.M{"name" : "Bob", "balance":1000})
  18. transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()).
  19. SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
  20. _, err = session.WithTransaction(ctx, func(sessionCtx mongo.SessionContext) (interface{}, error) {
  21. var result Account
  22. // deduct $500 from Alice's account
  23. err = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result)
  24. aliceBalance := result.Balance
  25. newAliceBalance := aliceBalance - amountToTransfer
  26. _, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}})
  27. err = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result)
  28. aliceBalance = result.Balance
  29. assert.Equal(t, aliceBalance, newAliceBalance)
  30. // add $500 to Bob's account
  31. err = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result)
  32. bobBalance := result.Balance
  33. newBobBalance := bobBalance + amountToTransfer
  34. _, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}})
  35. err = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result)
  36. bobBalance = result.Balance
  37. assert.Equal(t, bobBalance, newBobBalance)
  38. if err != nil {
  39. return nil, err
  40. }
  41. return "transaction committed", err
  42. }, transactionOptions)
  43. // check results outside of transaction
  44. var result Account
  45. err = accountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&result)
  46. aliceNewBalance := result.Balance
  47. err = accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result)
  48. bobNewBalance := result.Balance
  49. assert.Equal(t, aliceNewBalance, 500)
  50. assert.Equal(t, bobNewBalance, 1500)
  51. // Go - Core API
  52. type Account struct {
  53. Name string
  54. Balance int
  55. }
  56. func transferMoneyWithRetry(sessionContext mongo.SessionContext, accountColl *mongo.Collection, t *testing.T) error {
  57. amountToTransfer := 500
  58. transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()).
  59. SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
  60. if err := sessionContext.StartTransaction(transactionOptions); err != nil {
  61. panic(err)
  62. }
  63. var result Account
  64. // deduct $500 from Alice's account
  65. err := accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result)
  66. aliceBalance := result.Balance
  67. newAliceBalance := aliceBalance - amountToTransfer
  68. _, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}})
  69. if err != nil {
  70. sessionContext.AbortTransaction(sessionContext)
  71. }
  72. err = accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result)
  73. aliceBalance = result.Balance
  74. assert.Equal(t, aliceBalance, newAliceBalance)
  75. // add $500 to Bob's account
  76. err = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result)
  77. bobBalance := result.Balance
  78. newBobBalance := bobBalance + amountToTransfer
  79. _, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}})
  80. if err != nil {
  81. sessionContext.AbortTransaction(sessionContext)
  82. }
  83. err = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result)
  84. bobBalance = result.Balance
  85. assert.Equal(t, bobBalance, newBobBalance)
  86. err = sessionContext.CommitTransaction(sessionContext)
  87. return err
  88. }
  89. func doTransactionWithRetry(t *testing.T) {
  90. ctx := context.TODO()
  91. dbName := "bank"
  92. collName := "account"
  93. bankDB := client.Database(dbName)
  94. accountColl := bankDB.Collection(collName)
  95. client.UseSessionWithOptions(ctx, options.Session().SetCausalConsistency(false), func(sessionContext mongo.SessionContext) error {
  96. accountColl.Drop(ctx)
  97. accountColl.InsertOne(sessionContext, bson.M{"name" : "Alice", "balance":1000})
  98. accountColl.InsertOne(sessionContext, bson.M{"name" : "Bob", "balance":1000})
  99. for {
  100. err := transferMoneyWithRetry(sessionContext, accountColl, t)
  101. if err == nil {
  102. println("transaction committed")
  103. return nil
  104. }
  105. if mongoErr := err.(mongo.CommandError); mongoErr.HasErrorLabel("TransientTransactionError") {
  106. continue
  107. }
  108. println("transaction failed")
  109. return err
  110. }
  111. })
  112. // check results outside of transaction
  113. var result Account
  114. accountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&esult)
  115. aliceBalance := result.Balance
  116. assert.Equal(t, aliceBalance, 500)
  117. accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result)
  118. bobBalance := result.Balance
  119. assert.Equal(t, bobBalance, 1500)
  120. }

Java

The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Java.

  1. // Java (sync) - Callback API
  2. MongoDatabase bankDB = mongoClient.getDatabase("bank");
  3. MongoCollection accountColl = bankDB.getCollection("account");
  4. accountColl.drop();
  5. int amountToTransfer = 500;
  6. // add sample data
  7. accountColl.insertOne(new Document("name", "Alice").append("balance", 1000));
  8. accountColl.insertOne(new Document("name", "Bob").append("balance", 1000));
  9. TransactionOptions txnOptions = TransactionOptions.builder()
  10. .readConcern(ReadConcern.SNAPSHOT)
  11. .writeConcern(WriteConcern.MAJORITY)
  12. .build();
  13. ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();
  14. try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) {
  15. clientSession.withTransaction(new TransactionBody<Void>() {
  16. @Override
  17. public Void execute() {
  18. // deduct $500 from Alice's account
  19. List<Document> documentList = new ArrayList<>();
  20. accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
  21. int aliceBalance = (int) documentList.get(0).get("balance");
  22. int newAliceBalance = aliceBalance - amountToTransfer;
  23. accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance)));
  24. // check Alice's new balance
  25. documentList = new ArrayList<>();
  26. accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
  27. int updatedBalance = (int) documentList.get(0).get("balance");
  28. Assert.assertEquals(updatedBalance, newAliceBalance);
  29. // add $500 to Bob's account
  30. documentList = new ArrayList<>();
  31. accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
  32. int bobBalance = (int) documentList.get(0).get("balance");
  33. int newBobBalance = bobBalance + amountToTransfer;
  34. accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance)));
  35. // check Bob's new balance
  36. documentList = new ArrayList<>();
  37. accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
  38. updatedBalance = (int) documentList.get(0).get("balance");
  39. Assert.assertEquals(updatedBalance, newBobBalance);
  40. return null;
  41. }
  42. }, txnOptions);
  43. }

C

The following code demonstrates how to utilize the Amazon DocumentDB transaction API with C.

  1. // Sample Code for C with Callback
  2. #include <bson.h>
  3. #include <mongoc.h>
  4. #include <stdio.h>
  5. #include <string.h>
  6. #include <assert.h>
  7. typedef struct {
  8. int64_t balance;
  9. bson_t *account;
  10. bson_t *opts;
  11. mongoc_collection_t *collection;
  12. } ctx_t;
  13. bool callback_session (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error)
  14. {
  15. bool r = true;
  16. ctx_t *data = (ctx_t *) ctx;
  17. bson_t local_reply;
  18. bson_t *selector = data->account;
  19. bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (data->balance), "}");
  20. mongoc_collection_update_one (data->collection, selector, update, data->opts, &local_reply, error);
  21. *reply = bson_copy (&local_reply);
  22. bson_destroy (&local_reply);
  23. bson_destroy (update);
  24. return r;
  25. }
  26. void test_callback_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){
  27. bson_t reply;
  28. bool r = true;
  29. const bson_t *doc;
  30. bson_iter_t iter;
  31. ctx_t alice_ctx;
  32. ctx_t bob_ctx;
  33. bson_error_t error;
  34. // find query
  35. bson_t *alice_query = bson_new ();
  36. BSON_APPEND_UTF8(alice_query, "name", "Alice");
  37. bson_t *bob_query = bson_new ();
  38. BSON_APPEND_UTF8(bob_query, "name", "Bob");
  39. // create session
  40. // set causal consistency to false
  41. mongoc_session_opt_t *session_opts = mongoc_session_opts_new ();
  42. mongoc_session_opts_set_causal_consistency (session_opts, false);
  43. // start the session
  44. mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error);
  45. // add session to options
  46. bson_t *opts = bson_new();
  47. mongoc_client_session_append (client_session, opts, &error);
  48. // deduct 500 from Alice
  49. // find account balance of Alice
  50. mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
  51. mongoc_cursor_next (cursor, &doc);
  52. bson_iter_init (&iter, doc);
  53. bson_iter_find (&iter, "balance");
  54. int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64;
  55. assert(alice_balance >= amount_to_transfer);
  56. int64_t new_alice_balance = alice_balance - amount_to_transfer;
  57. // set variables which will be used by callback function
  58. alice_ctx.collection = collection;
  59. alice_ctx.opts = opts;
  60. alice_ctx.balance = new_alice_balance;
  61. alice_ctx.account = alice_query;
  62. // callback
  63. r = mongoc_client_session_with_transaction (client_session, &callback_session, NULL, &alice_ctx, &reply, &error);
  64. assert(r);
  65. // find account balance of Alice after transaction
  66. cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
  67. mongoc_cursor_next (cursor, &doc);
  68. bson_iter_init (&iter, doc);
  69. bson_iter_find (&iter, "balance");
  70. alice_balance = (bson_iter_value (&iter))->value.v_int64;
  71. assert(alice_balance == new_alice_balance);
  72. assert(alice_balance == 500);
  73. // add 500 to bob's balance
  74. // find account balance of Bob
  75. cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
  76. mongoc_cursor_next (cursor, &doc);
  77. bson_iter_init (&iter, doc);
  78. bson_iter_find (&iter, "balance");
  79. int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64;
  80. int64_t new_bob_balance = bob_balance + amount_to_transfer;
  81. bob_ctx.collection = collection;
  82. bob_ctx.opts = opts;
  83. bob_ctx.balance = new_bob_balance;
  84. bob_ctx.account = bob_query;
  85. // set read & write concern
  86. mongoc_read_concern_t *read_concern = mongoc_read_concern_new ();
  87. mongoc_write_concern_t *write_concern = mongoc_write_concern_new ();
  88. mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new ();
  89. mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY);
  90. mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
  91. mongoc_transaction_opts_set_write_concern (txn_opts, write_concern);
  92. mongoc_transaction_opts_set_read_concern (txn_opts, read_concern);
  93. // callback
  94. r = mongoc_client_session_with_transaction (client_session, &callback_session, txn_opts, &bob_ctx, &reply, &error);
  95. assert(r);
  96. // find account balance of Bob after transaction
  97. cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
  98. mongoc_cursor_next (cursor, &doc);
  99. bson_iter_init (&iter, doc);
  100. bson_iter_find (&iter, "balance");
  101. bob_balance = (bson_iter_value (&iter))->value.v_int64;
  102. assert(bob_balance == new_bob_balance);
  103. assert(bob_balance == 1500);
  104. // cleanup
  105. bson_destroy(alice_query);
  106. bson_destroy(bob_query);
  107. mongoc_client_session_destroy(client_session);
  108. bson_destroy(opts);
  109. mongoc_transaction_opts_destroy(txn_opts);
  110. mongoc_read_concern_destroy(read_concern);
  111. mongoc_write_concern_destroy(write_concern);
  112. mongoc_cursor_destroy(cursor);
  113. bson_destroy(doc);
  114. }
  115. int main(int argc, char* argv[]) {
  116. mongoc_init ();
  117. mongoc_client_t* client = mongoc_client_new (<connection uri>);
  118. bson_error_t error;
  119. // connect to bank db
  120. mongoc_database_t *database = mongoc_client_get_database (client, "bank");
  121. // access account collection
  122. mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account");
  123. // set amount to transfer
  124. int64_t amount_to_transfer = 500;
  125. // delete the collection if already existing
  126. mongoc_collection_drop(collection, &error);
  127. // open Alice account
  128. bson_t *alice_account = bson_new ();
  129. BSON_APPEND_UTF8(alice_account, "name", "Alice");
  130. BSON_APPEND_INT64(alice_account, "balance", 1000);
  131. // open Bob account
  132. bson_t *bob_account = bson_new ();
  133. BSON_APPEND_UTF8(bob_account, "name", "Bob");
  134. BSON_APPEND_INT64(bob_account, "balance", 1000);
  135. bool r = true;
  136. r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error);
  137. if (!r) {printf("Error encountered:%s", error.message);}
  138. r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error);
  139. if (!r) {printf("Error encountered:%s", error.message);}
  140. test_callback_money_transfer(client, collection, amount_to_transfer);
  141. }

Python

The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Python.

  1. // Sample Python code with callback api
  2. import pymongo
  3. def callback(session, balance, query):
  4. collection.update_one(query, {'$set': {"balance": balance}}, session=session)
  5. client = pymongo.MongoClient(<connection uri>)
  6. rc_snapshot = pymongo.read_concern.ReadConcern('snapshot')
  7. wc_majority = pymongo.write_concern.WriteConcern('majority')
  8. # To start, drop and create an account collection and insert balances for both Alice and Bob
  9. collection = client.get_database("bank").get_collection("account")
  10. collection.drop()
  11. collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000})
  12. collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000})
  13. amount_to_transfer = 500
  14. # deduct 500 from Alice's account
  15. alice_balance = collection.find_one({"name": "Alice"}).get("balance")
  16. assert alice_balance >= amount_to_transfer
  17. new_alice_balance = alice_balance - amount_to_transfer
  18. with client.start_session({'causalConsistency':False}) as session:
  19. session.with_transaction(lambda s: callback(s, new_alice_balance, {"name": "Alice"}), read_concern=rc_snapshot, write_concern=wc_majority)
  20. updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance")
  21. assert updated_alice_balance == new_alice_balance
  22. # add 500 to Bob's account
  23. bob_balance = collection.find_one({"name": "Bob"}).get("balance")
  24. assert bob_balance >= amount_to_transfer
  25. new_bob_balance = bob_balance + amount_to_transfer
  26. with client.start_session({'causalConsistency':False}) as session:
  27. session.with_transaction(lambda s: callback(s, new_bob_balance, {"name": "Bob"}), read_concern=rc_snapshot, write_concern=wc_majority)
  28. updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance")
  29. assert updated_bob_balance == new_bob_balance
  30. Sample Python code with Core api
  31. import pymongo
  32. client = pymongo.MongoClient(<connection_string>)
  33. rc_snapshot = pymongo.read_concern.ReadConcern('snapshot')
  34. wc_majority = pymongo.write_concern.WriteConcern('majority')
  35. # To start, drop and create an account collection and insert balances for both Alice and Bob
  36. collection = client.get_database("bank").get_collection("account")
  37. collection.drop()
  38. collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000})
  39. collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000})
  40. amount_to_transfer = 500
  41. # deduct 500 from Alice's account
  42. alice_balance = collection.find_one({"name": "Alice"}).get("balance")
  43. assert alice_balance >= amount_to_transfer
  44. new_alice_balance = alice_balance - amount_to_transfer
  45. with client.start_session({'causalConsistency':False}) as session:
  46. session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority)
  47. collection.update_one({"name": "Alice"}, {'$set': {"balance": new_alice_balance}}, session=session)
  48. session.commit_transaction()
  49. updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance")
  50. assert updated_alice_balance == new_alice_balance
  51. # add 500 to Bob's account
  52. bob_balance = collection.find_one({"name": "Bob"}).get("balance")
  53. assert bob_balance >= amount_to_transfer
  54. new_bob_balance = bob_balance + amount_to_transfer
  55. with client.start_session({'causalConsistency':False}) as session:
  56. session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority)
  57. collection.update_one({"name": "Bob"}, {'$set': {"balance": new_bob_balance}}, session=session)
  58. session.commit_transaction()
  59. updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance")
  60. assert updated_bob_balance == new_bob_balance

Transaction API Examples for Core API

Javascript

The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Javascript.

  1. // *** Transfer $500 from Alice to Bob inside a transaction: Success ***
  2. // Setup bank account for Alice and Bob. Each have $1000 in their account
  3. var databaseName = "bank";
  4. var collectionName = "account";
  5. var amountToTransfer = 500;
  6. var session = db.getMongo().startSession({causalConsistency: false});
  7. var bankDB = session.getDatabase(databaseName);
  8. var accountColl = bankDB[collectionName];
  9. accountColl.drop();
  10. accountColl.insert({name: "Alice", balance: 1000});
  11. accountColl.insert({name: "Bob", balance: 1000});
  12. session.startTransaction();
  13. // deduct $500 from Alice's account
  14. var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
  15. assert(aliceBalance >= amountToTransfer);
  16. var newAliceBalance = aliceBalance - amountToTransfer;
  17. accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
  18. var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
  19. assert.eq(newAliceBalance, findAliceBalance);
  20. // add $500 to Bob's account
  21. var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
  22. var newBobBalance = bobBalance + amountToTransfer;
  23. accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
  24. var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
  25. assert.eq(newBobBalance, findBobBalance);
  26. session.commitTransaction();
  27. accountColl.find();

C#

The following code demonstrates how to utilize the Amazon DocumentDB transaction API with C#.

  1. // C# Core API
  2. public void TransferMoneyWithRetry(IMongoCollection<bSondocument> accountColl, IClientSessionHandle session)
  3. {
  4. var amountToTransfer = 500;
  5. // start transaction
  6. var transactionOptions = new TransactionOptions(
  7. readConcern: ReadConcern.Snapshot,
  8. writeConcern: WriteConcern.WMajority);
  9. session.StartTransaction(transactionOptions);
  10. try
  11. {
  12. // deduct $500 from Alice's account
  13. var aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
  14. Debug.Assert(aliceBalance >= amountToTransfer);
  15. var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer;
  16. accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Alice"),
  17. Builders<bSondocument>.Update.Set("balance", newAliceBalance));
  18. aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
  19. Debug.Assert(aliceBalance == newAliceBalance);
  20. // add $500 from Bob's account
  21. var bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
  22. var newBobBalance = bobBalance.AsInt32 + amountToTransfer;
  23. accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Bob"),
  24. Builders<bSondocument>.Update.Set("balance", newBobBalance));
  25. bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
  26. Debug.Assert(bobBalance == newBobBalance);
  27. }
  28. catch (Exception e)
  29. {
  30. session.AbortTransaction();
  31. throw;
  32. }
  33. session.CommitTransaction();
  34. }
  35. }
  36. public void DoTransactionWithRetry(MongoClient client)
  37. {
  38. var dbName = "bank";
  39. var collName = "account";
  40. using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false}))
  41. {
  42. try
  43. {
  44. var bankDB = client.GetDatabase(dbName);
  45. var accountColl = bankDB.GetCollection<bSondocument>(collName);
  46. bankDB.DropCollection(collName);
  47. accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } });
  48. accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } });
  49. while(true) {
  50. try
  51. {
  52. TransferMoneyWithRetry(accountColl, session);
  53. break;
  54. }
  55. catch (MongoException e)
  56. {
  57. if(e.HasErrorLabel("TransientTransactionError"))
  58. {
  59. continue;
  60. }
  61. else
  62. {
  63. throw;
  64. }
  65. }
  66. }
  67. // check values outside of transaction
  68. var aliceNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
  69. var bobNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
  70. Debug.Assert(aliceNewBalance == 500);
  71. Debug.Assert(bobNewBalance == 1500);
  72. }
  73. catch (Exception e)
  74. {
  75. Console.WriteLine("Error running transaction: " + e.Message);
  76. }
  77. }
  78. }

Ruby

The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Ruby.

  1. # Ruby Core API
  2. def transfer_money_w_retry(session, accountColl)
  3. amountToTransfer = 500
  4. session.start_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority})
  5. # deduct $500 from Alice's account
  6. aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
  7. assert aliceBalance >= amountToTransfer
  8. newAliceBalance = aliceBalance - amountToTransfer
  9. accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session)
  10. aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
  11. assert_equal(newAliceBalance, aliceBalance)
  12. # add $500 to Bob's account
  13. bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
  14. newBobBalance = bobBalance + amountToTransfer
  15. accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session)
  16. bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
  17. assert_equal(newBobBalance, bobBalance)
  18. session.commit_transaction
  19. end
  20. def do_txn_w_retry(client)
  21. dbName = "bank"
  22. collName = "account"
  23. session = client.start_session(:causal_consistency=> false)
  24. bankDB = Mongo::Database.new(client, dbName)
  25. accountColl = bankDB[collName]
  26. accountColl.drop()
  27. accountColl.insert_one({"name"=>"Alice", "balance"=>1000})
  28. accountColl.insert_one({"name"=>"Bob", "balance"=>1000})
  29. begin
  30. transferMoneyWithRetry(session, accountColl)
  31. puts "transaction committed"
  32. rescue Mongo::Error => e
  33. if e.label?('TransientTransactionError')
  34. retry
  35. else
  36. puts "transaction failed"
  37. raise
  38. end
  39. end
  40. # check results outside of transaction
  41. aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance']
  42. bobBalance = accountColl.find({"name"=>"Bob"}).first['balance']
  43. assert_equal(aliceBalance, 500)
  44. assert_equal(bobBalance, 1500)
  45. end

Java

The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Java.

  1. // Java (sync) - Core API
  2. public void transferMoneyWithRetry() {
  3. // connect to server
  4. MongoClientURI mongoURI = new MongoClientURI(uri);
  5. MongoClient mongoClient = new MongoClient(mongoURI);
  6. MongoDatabase bankDB = mongoClient.getDatabase("bank");
  7. MongoCollection accountColl = bankDB.getCollection("account");
  8. accountColl.drop();
  9. // insert some sample data
  10. accountColl.insertOne(new Document("name", "Alice").append("balance", 1000));
  11. accountColl.insertOne(new Document("name", "Bob").append("balance", 1000));
  12. while (true) {
  13. try {
  14. doTransferMoneyWithRetry(accountColl, mongoClient);
  15. break;
  16. } catch (MongoException e) {
  17. if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
  18. continue;
  19. } else {
  20. throw e;
  21. }
  22. }
  23. }
  24. }
  25. public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) {
  26. int amountToTransfer = 500;
  27. TransactionOptions txnOptions = TransactionOptions.builder()
  28. .readConcern(ReadConcern.SNAPSHOT)
  29. .writeConcern(WriteConcern.MAJORITY)
  30. .build();
  31. ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();
  32. try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) {
  33. clientSession.startTransaction(txnOptions);
  34. // deduct $500 from Alice's account
  35. List<Document> documentList = new ArrayList<>();
  36. accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
  37. int aliceBalance = (int) documentList.get(0).get("balance");
  38. Assert.assertTrue(aliceBalance >= amountToTransfer);
  39. int newAliceBalance = aliceBalance - amountToTransfer;
  40. accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance)));
  41. // check Alice's new balance
  42. documentList = new ArrayList<>();
  43. accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
  44. int updatedBalance = (int) documentList.get(0).get("balance");
  45. Assert.assertEquals(updatedBalance, newAliceBalance);
  46. // add $500 to Bob's account
  47. documentList = new ArrayList<>();
  48. accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
  49. int bobBalance = (int) documentList.get(0).get("balance");
  50. int newBobBalance = bobBalance + amountToTransfer;
  51. accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance)));
  52. // check Bob's new balance
  53. documentList = new ArrayList<>();
  54. accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
  55. updatedBalance = (int) documentList.get(0).get("balance");
  56. Assert.assertEquals(updatedBalance, newBobBalance);
  57. // commit transaction
  58. clientSession.commitTransaction();
  59. }
  60. }
  61. // Java (async) -- Core API
  62. public void transferMoneyWithRetry() {
  63. // connect to the server
  64. MongoClient mongoClient = MongoClients.create(uri);
  65. MongoDatabase bankDB = mongoClient.getDatabase("bank");
  66. MongoCollection accountColl = bankDB.getCollection("account");
  67. SubscriberLatchWrapper<Void> dropCallback = new SubscriberLatchWrapper<>();
  68. mongoClient.getDatabase("bank").drop().subscribe(dropCallback);
  69. dropCallback.await();
  70. // insert some sample data
  71. SubscriberLatchWrapper<InsertOneResult> insertionCallback = new SubscriberLatchWrapper<>();
  72. accountColl.insertOne(new Document("name", "Alice").append("balance", 1000)).subscribe(insertionCallback);
  73. insertionCallback.await();
  74. insertionCallback = new SubscriberLatchWrapper<>();
  75. accountColl.insertOne(new Document("name", "Bob").append("balance", 1000)).subscribe(insertionCallback);;
  76. insertionCallback.await();
  77. while (true) {
  78. try {
  79. doTransferMoneyWithRetry(accountColl, mongoClient);
  80. break;
  81. } catch (MongoException e) {
  82. if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
  83. continue;
  84. } else {
  85. throw e;
  86. }
  87. }
  88. }
  89. }
  90. public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) {
  91. int amountToTransfer = 500;
  92. // start the transaction
  93. TransactionOptions txnOptions = TransactionOptions.builder()
  94. .readConcern(ReadConcern.SNAPSHOT)
  95. .writeConcern(WriteConcern.MAJORITY)
  96. .build();
  97. ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();
  98. SubscriberLatchWrapper<ClientSession> sessionCallback = new SubscriberLatchWrapper<>();
  99. mongoClient.startSession(sessionOptions).subscribe(sessionCallback);
  100. ClientSession session = sessionCallback.get().get(0);
  101. session.startTransaction(txnOptions);
  102. // deduct $500 from Alice's account
  103. SubscriberLatchWrapper<Document> findCallback = new SubscriberLatchWrapper<>();
  104. accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback);
  105. Document documentFound = findCallback.get().get(0);
  106. int aliceBalance = (int) documentFound.get("balance");
  107. int newAliceBalance = aliceBalance - amountToTransfer;
  108. SubscriberLatchWrapper<UpdateResult> updateCallback = new SubscriberLatchWrapper<>();
  109. accountColl.updateOne(session, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance))).subscribe(updateCallback);
  110. updateCallback.await();
  111. // check Alice's new balance
  112. findCallback = new SubscriberLatchWrapper<>();
  113. accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback);
  114. documentFound = findCallback.get().get(0);
  115. int updatedBalance = (int) documentFound.get("balance");
  116. Assert.assertEquals(updatedBalance, newAliceBalance);
  117. // add $500 to Bob's account
  118. findCallback = new SubscriberLatchWrapper<>();
  119. accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback);
  120. documentFound = findCallback.get().get(0);
  121. int bobBalance = (int) documentFound.get("balance");
  122. int newBobBalance = bobBalance + amountToTransfer;
  123. updateCallback = new SubscriberLatchWrapper<>();
  124. accountColl.updateOne(session, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance))).subscribe(updateCallback);
  125. updateCallback.await();
  126. // check Bob's new balance
  127. findCallback = new SubscriberLatchWrapper<>();
  128. accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback);
  129. documentFound = findCallback.get().get(0);
  130. updatedBalance = (int) documentFound.get("balance");
  131. Assert.assertEquals(updatedBalance, newBobBalance);
  132. // commit the transaction
  133. SubscriberLatchWrapper<Void> transactionCallback = new SubscriberLatchWrapper<>();
  134. session.commitTransaction().subscribe(transactionCallback);
  135. transactionCallback.await();
  136. }
  137. public class SubscriberLatchWrapper<T> implements Subscriber<T> {
  138. /**
  139. * A Subscriber that stores the publishers results and provides a latch so can block on completion.
  140. *
  141. * @param <T> The publishers result type
  142. */
  143. private final List<T> received;
  144. private final List<RuntimeException> errors;
  145. private final CountDownLatch latch;
  146. private volatile Subscription subscription;
  147. private volatile boolean completed;
  148. /**
  149. * Construct an instance
  150. */
  151. public SubscriberLatchWrapper() {
  152. this.received = new ArrayList<>();
  153. this.errors = new ArrayList<>();
  154. this.latch = new CountDownLatch(1);
  155. }
  156. @Override
  157. public void onSubscribe(final Subscription s) {
  158. subscription = s;
  159. subscription.request(Integer.MAX_VALUE);
  160. }
  161. @Override
  162. public void onNext(final T t) {
  163. received.add(t);
  164. }
  165. @Override
  166. public void onError(final Throwable t) {
  167. if (t instanceof RuntimeException) {
  168. errors.add((RuntimeException) t);
  169. } else {
  170. errors.add(new RuntimeException("Unexpected exception", t));
  171. }
  172. onComplete();
  173. }
  174. @Override
  175. public void onComplete() {
  176. completed = true;
  177. subscription.cancel();
  178. latch.countDown();
  179. }
  180. /**
  181. * Get received elements
  182. *
  183. * @return the list of received elements
  184. */
  185. public List<T> getReceived() {
  186. return received;
  187. }
  188. /**
  189. * Get received elements.
  190. *
  191. * @return the list of receive elements
  192. */
  193. public List<T> get() {
  194. return await().getReceived();
  195. }
  196. /**
  197. * Await completion or error
  198. *
  199. * @return this
  200. */
  201. public SubscriberLatchWrapper<T> await() {
  202. subscription.request(Integer.MAX_VALUE);
  203. try {
  204. if (!latch.await(300, TimeUnit.SECONDS)) {
  205. throw new MongoTimeoutException("Publisher onComplete timed out for 300 seconds");
  206. }
  207. } catch (InterruptedException e) {
  208. throw new MongoInterruptedException("Interrupted waiting for observeration", e);
  209. }
  210. if (!errors.isEmpty()) {
  211. throw errors.get(0);
  212. }
  213. return this;
  214. }
  215. public boolean getCompleted() {
  216. return this.completed;
  217. }
  218. public void close() {
  219. subscription.cancel();
  220. received.clear();
  221. }
  222. }

C

The following code demonstrates how to utilize the Amazon DocumentDB transaction API with C.

  1. // Sample C code with core session
  2. bool core_session(mongoc_client_session_t *client_session, mongoc_collection_t* collection, bson_t *selector, int64_t balance){
  3. bool r = true;
  4. bson_error_t error;
  5. bson_t *opts = bson_new();
  6. bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (balance), "}");
  7. // set read & write concern
  8. mongoc_read_concern_t *read_concern = mongoc_read_concern_new ();
  9. mongoc_write_concern_t *write_concern = mongoc_write_concern_new ();
  10. mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new ();
  11. mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY);
  12. mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
  13. mongoc_transaction_opts_set_write_concern (txn_opts, write_concern);
  14. mongoc_transaction_opts_set_read_concern (txn_opts, read_concern);
  15. mongoc_client_session_start_transaction (client_session, txn_opts, &error);
  16. mongoc_client_session_append (client_session, opts, &error);
  17. r = mongoc_collection_update_one (collection, selector, update, opts, NULL, &error);
  18. mongoc_client_session_commit_transaction (client_session, NULL, &error);
  19. bson_destroy (opts);
  20. mongoc_transaction_opts_destroy(txn_opts);
  21. mongoc_read_concern_destroy(read_concern);
  22. mongoc_write_concern_destroy(write_concern);
  23. bson_destroy (update);
  24. return r;
  25. }
  26. void test_core_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){
  27. bson_t reply;
  28. bool r = true;
  29. const bson_t *doc;
  30. bson_iter_t iter;
  31. bson_error_t error;
  32. // find query
  33. bson_t *alice_query = bson_new ();
  34. BSON_APPEND_UTF8(alice_query, "name", "Alice");
  35. bson_t *bob_query = bson_new ();
  36. BSON_APPEND_UTF8(bob_query, "name", "Bob");
  37. // create session
  38. // set causal consistency to false
  39. mongoc_session_opt_t *session_opts = mongoc_session_opts_new ();
  40. mongoc_session_opts_set_causal_consistency (session_opts, false);
  41. // start the session
  42. mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error);
  43. // add session to options
  44. bson_t *opts = bson_new();
  45. mongoc_client_session_append (client_session, opts, &error);
  46. // deduct 500 from Alice
  47. // find account balance of Alice
  48. mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
  49. mongoc_cursor_next (cursor, &doc);
  50. bson_iter_init (&iter, doc);
  51. bson_iter_find (&iter, "balance");
  52. int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64;
  53. assert(alice_balance >= amount_to_transfer);
  54. int64_t new_alice_balance = alice_balance - amount_to_transfer;
  55. // core
  56. r = core_session (client_session, collection, alice_query, new_alice_balance);
  57. assert(r);
  58. // find account balance of Alice after transaction
  59. cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
  60. mongoc_cursor_next (cursor, &doc);
  61. bson_iter_init (&iter, doc);
  62. bson_iter_find (&iter, "balance");
  63. alice_balance = (bson_iter_value (&iter))->value.v_int64;
  64. assert(alice_balance == new_alice_balance);
  65. assert(alice_balance == 500);
  66. // add 500 to Bob's balance
  67. // find account balance of Bob
  68. cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
  69. mongoc_cursor_next (cursor, &doc);
  70. bson_iter_init (&iter, doc);
  71. bson_iter_find (&iter, "balance");
  72. int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64;
  73. int64_t new_bob_balance = bob_balance + amount_to_transfer;
  74. //core
  75. r = core_session (client_session, collection, bob_query, new_bob_balance);
  76. assert(r);
  77. // find account balance of Bob after transaction
  78. cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
  79. mongoc_cursor_next (cursor, &doc);
  80. bson_iter_init (&iter, doc);
  81. bson_iter_find (&iter, "balance");
  82. bob_balance = (bson_iter_value (&iter))->value.v_int64;
  83. assert(bob_balance == new_bob_balance);
  84. assert(bob_balance == 1500);
  85. // cleanup
  86. bson_destroy(alice_query);
  87. bson_destroy(bob_query);
  88. mongoc_client_session_destroy(client_session);
  89. bson_destroy(opts);
  90. mongoc_cursor_destroy(cursor);
  91. bson_destroy(doc);
  92. }
  93. int main(int argc, char* argv[]) {
  94. mongoc_init ();
  95. mongoc_client_t* client = mongoc_client_new (<connection uri>);
  96. bson_error_t error;
  97. // connect to bank db
  98. mongoc_database_t *database = mongoc_client_get_database (client, "bank");
  99. // access account collection
  100. mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account");
  101. // set amount to transfer
  102. int64_t amount_to_transfer = 500;
  103. // delete the collection if already existing
  104. mongoc_collection_drop(collection, &error);
  105. // open Alice account
  106. bson_t *alice_account = bson_new ();
  107. BSON_APPEND_UTF8(alice_account, "name", "Alice");
  108. BSON_APPEND_INT64(alice_account, "balance", 1000);
  109. // open Bob account
  110. bson_t *bob_account = bson_new ();
  111. BSON_APPEND_UTF8(bob_account, "name", "Bob");
  112. BSON_APPEND_INT64(bob_account, "balance", 1000);
  113. bool r = true;
  114. r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error);
  115. if (!r) {printf("Error encountered:%s", error.message);}
  116. r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error);
  117. if (!r) {printf("Error encountered:%s", error.message);}
  118. test_core_money_transfer(client, collection, amount_to_transfer);
  119. }

Scala

The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Scala.

  1. // Scala Core API
  2. def transferMoneyWithRetry(sessionObservable: SingleObservable[ClientSession] , database: MongoDatabase ): Unit = {
  3. val accountColl = database.getCollection("account")
  4. var amountToTransfer = 500
  5. var transactionObservable: Observable[ClientSession] = sessionObservable.map(clientSession => {
  6. clientSession.startTransaction()
  7. // deduct $500 from Alice's account
  8. var aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance")
  9. assert(aliceBalance >= amountToTransfer)
  10. var newAliceBalance = aliceBalance - amountToTransfer
  11. accountColl.updateOne(clientSession, Document("name" -> "Alice"), Document("$set" -> Document("balance" -> newAliceBalance))).await()
  12. aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance")
  13. assert(aliceBalance == newAliceBalance)
  14. // add $500 to Bob's account
  15. var bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance")
  16. var newBobBalance = bobBalance + amountToTransfer
  17. accountColl.updateOne(clientSession, Document("name" -> "Bob"), Document("$set" -> Document("balance" -> newBobBalance))).await()
  18. bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance")
  19. assert(bobBalance == newBobBalance)
  20. clientSession
  21. })
  22. transactionObservable.flatMap(clientSession => clientSession.commitTransaction()).await()
  23. }
  24. def doTransactionWithRetry(): Unit = {
  25. val client: MongoClient = MongoClientWrapper.getMongoClient()
  26. val database: MongoDatabase = client.getDatabase("bank")
  27. val accountColl = database.getCollection("account")
  28. accountColl.drop().await()
  29. val sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build()
  30. var sessionObservable: SingleObservable[ClientSession] = client.startSession(sessionOptions)
  31. accountColl.insertOne(Document("name" -> "Alice", "balance" -> 1000)).await()
  32. accountColl.insertOne(Document("name" -> "Bob", "balance" -> 1000)).await()
  33. var retry = true
  34. while (retry) {
  35. try {
  36. transferMoneyWithRetry(sessionObservable, database)
  37. println("transaction committed")
  38. retry = false
  39. }
  40. catch {
  41. case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
  42. println("retrying transaction")
  43. }
  44. case other: Throwable => {
  45. println("transaction failed")
  46. retry = false
  47. throw other
  48. }
  49. }
  50. }
  51. // check results outside of transaction
  52. assert(accountColl.find(Document("name" -> "Alice")).results().head.getInteger("balance") == 500)
  53. assert(accountColl.find(Document("name" -> "Bob")).results().head.getInteger("balance") == 1500)
  54. accountColl.drop().await()
  55. }

Supported Commands

CommandSupported

abortTransaction

Yes

commitTransaction

Yes

endSessions

Yes

killSession

Yes

killAllSession

Yes

killAllSessionsByPattern

No

refreshSessions

No

startSession

Yes

Unsupported Capabilities

MethodsStages or Commands

db.collection.aggregate()

$collStats

$currentOp

$indexStats

$listSessions

$out

db.collection.count()

db.collection.countDocuments()

$where

$near

$nearSphere

db.collection.insert()

insert is not supported if it is not run against an existing collection. This method is supported if it targets a pre-existing collection.

Sessions

MongoDB sessions are a framework that is used to support retryable writes, causal consistency, transactions, and manage operations across shards. When a session is created, a logical session identifier (lsid) is generated by the client and is used to tag all operations within that session when sending commands to the server.

Amazon DocumentDB supports the use of sessions to enable transactions, but does not support causal consistency or retryable writes.

When utilizing transactions within Amazon DocumentDB, a transaction will be initiated from within a session using the session.startTransaction() API and a session supports a single transaction at a time. Similarly, transactions are completed using either the commit (session.commitTransaction()) or abort (session.abortTransaction()) APIs.

Causal consistency

Causal consistency guarantees that within a single client session the client will observe read-after-write consistency, monatomic reads/writes, and writes will follow reads and these guarantees apply across all instances in a cluster, not just the primary. Amazon DocumentDB does not support causal consistency and the following statement will result in an error.

  1. var mySession = db.getMongo().startSession();
  2. var mySessionObject = mySession.getDatabase('test').getCollection('account');
  3. mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}});
  4. //Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }
  5. mySessionObject.find()
  6. //Error: error: {
  7. // "ok" : 0,
  8. // "code" : 303,
  9. // "errmsg" : "Feature not supported: 'causal consistency'",
  10. // "operationTime" : Timestamp(1603461817, 493214)
  11. //}
  12. mySession.endSession()

You can disable causal consistency within a session. Please note, doing so will enable you to utilize the session framework, but will not provide causal consistency guarantees for reads. When using Amazon DocumentDB, reads from the primary will be read-after-write consistent and reads from the replica instances will be eventually consistent. Transactions are the primary use case for utilizing sessions.

  1. var mySession = db.getMongo().startSession({causalConsistency: false});
  2. var mySessionObject = mySession.getDatabase('test').getCollection('account');
  3. mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}});
  4. //Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }
  5. mySessionObject.find()
  6. //{ "_id" : 1, "name" : "Bob", "balance" : 100 }
  7. //{ "_id" : 2, "name" : "Alice", "balance" : 1700 }

Retryable writes

Retryable writes is a capability in which the client will attempt to retry write operations, one time, when network errors occur or if the client is unable to find the primary. In Amazon DocumentDB, retryable writes are not supported and must be disabled. You can disabled it with the command (retryWrites=false) in the connection string. Below is an example:

  1. mongodb://chimera:<insertYourPassword>@docdb-2019-01-29-02-57-28.cluster-ccuszbx3pn5e.us-east-1.docdb.amazonaws.com:27017/?ssl=true&ssl_ca_certs=rds-combined-ca-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false

Transaction Errors

When using transactions, there are scenarios that can yeld an error that states that a transaction number does not match any in progress transaction.

The error can be generated in at least two different scenarios:

  • After the one-minute transaction timeout.
  • After an instance restart (due to patching, crash recovery, etc.), it is possible to receive this error even in cases where the transaction successfully committed. During an instance restart, the database can’t tell the difference between a transaction that successfully completed versus a transaction that aborted. In other words, the transaction completion state is ambiguous.

The best way to handle this error is to make transactional updates idempotent — for example, by using the $set mutator instead of an increment/decrement operation. See below:

  1. { "ok" : 0,
  2. "operationTime" : Timestamp(1603938167, 1),
  3. "code" : 251,
  4. "errmsg" : "Given transaction number 1 does not match any in-progress transactions."
  5. }