Transactions
Amazon DocumentDB (with MongoDB compatibility) now supports MongoDB 4.0 compatibility including transactions. You can perform transactions across multiple documents, statements, collections, and databases. Transactions simplify application development by enabling you to perform atomic, consistent, isolated, and durable (ACID) operations across one or more documents within an Amazon DocumentDB cluster. Common use cases for transactions include financial processing, fulfilling and managing orders, and building multi-player games.
There is no additional cost for transactions. You only pay for the read and write IOs that you consume as part of the transactions.
Requirements
To use the transactions feature, you need to meet the following requirements:
You must be using the Amazon DocumentDB 4.0 engine.
You must use a driver compatible with MongoDB 4.0 or greater.
Best Practices
Here are some best practices so that you can get the most using transactions with Amazon DocumentDB.
Always commit or abort the transaction after it is complete. Leaving a transaction in an incomplete state ties up database resources and can cause write conflicts.
It is recommended to keep transactions to the smallest number of commands needed. If you have transactions with multiple statements that can be divided up into multiple smaller transactions, it is advisable to do so to reduce the likelihood of a timeout. Always aim to create short transactions, not long-running reads.
Limitations
Amazon DocumentDB does not support cursors within a transaction.
Amazon DocumentDB cannot create new collections in a transaction and cannot query/update against non-existing collections.
Document-level write locks are subject to a 1 minute timeout, which is not configurable by the user.
No support for retryable writes, retryable commit and retryable abort.
Each Amazon DocumentDB instance has an upper bound limit on the number of concurrent transaction open on the instance at one time. For the limits, please see Instance Limits.
For a given transaction, the transaction log size must be less than 32MB.
Amazon DocumentDB does support
count()
within a transactions, but not all drivers support this capability. An alternative is to use thecountDocuments()
API, which translates the count query into an aggregation query on the client side.Transactions have a one minute execution limit and sessions have a 30-minute timeout. If a transaction times out, it will be aborted, and any subsequent commands issued within the session for the existing transaction will yield the following error:
WriteCommandError({
"ok" : 0,
"operationTime" : Timestamp(1603491424, 627726),
"code" : 251,
"errmsg" : "Given transaction number 0 does not match any in-progress transactions."
})
Monitoring and Diagnostics
With the support for transactions in Amazon DocumentDB 4.0, additional CloudWatch metrics were added to help you monitor your transactions.
New CloudWatch Metrics
DatabaseTransactions
: The number of open transactions taken at a one-minute period.DatabaseTransactionsAborted
: The number of aborted transactions taken at a one-minute period.DatabaseTransactionsMax
: The maximum number of open transactions in a one-minute period.TransactionsAborted
: The number of transactions aborted on an instance in a one-minute period.TransactionsCommitted
: The number of transactions committed on an instance in a one-minute period.TransactionsOpen
: The number of transactions open on an instance taken at a one-minute period.TransactionsOpenMax
: The maximum number of transactions open on an instance in a one-minute period.TransactionsStarted
: The number of transactions started on an instance in a one-minute period.
Note
For more CloudWatch metrics for Amazon DocumentDB, go to Monitoring Amazon DocumentDB with CloudWatch.
Additionally, new fields were added to both currentOp
lsid
, transactionThreadId
, and a new state for “idle transaction
” and serverStatus
transactions: currentActive
, currentInactive
, currentOpen
, totalAborted
, totalCommitted
, and totalStarted
.
Transaction Isolation Level
When starting a transaction, you have the ability to specify the both the readConcern
and writeConcern
as shown in the example below:
mySession.startTransaction({readConcern: {level: 'snapshot'}, writeConcern: {w: 'majority'}});
For readConcern
, Amazon DocumentDB supports snapshot isolation by default. If a readConcern
of local, available, or majority are specified, Amazon DocumentDB will upgrade the readConcern
level to snapshot. Amazon DocumentDB does not support the linearizable readConcern
and specifying such a read concern will result in an error.
For writeConcern
, Amazon DocumentDB supports majority by default and a write quorum is achieved when four copies of the data are persisted across three AZs. If a lower writeConcern
is specified, Amazon DocumentDB will upgrade the writeConcern
to majority. Further, all Amazon DocumentDB writes are journaled and journaling cannot be disabled.
Use Cases
In this section, we will walk through two use cases for transactions: multi-statement and multi-collection.
Multi-Statement Transactions
Amazon DocumentDB transactions are multi-statement, which means you can write a transaction that spans multiple statements with an explicit commit or rollback. You can group insert
, update
, update
, and findAndModify
actions as a single atomic operation.
A common use case for multi-statement transactions is a debit-credit transaction. For example: you owe a friend money for clothes. Thus, you need to debit (withdraw) $500 from your account and credit $500 (deposit) to your friend’s account. To perform that operation, you perform both the debt and credit operations within a single transaction to ensure atomicity. Doing so prevents scenarios where $500 is debited from your account, but not credited to your friend’s account. Here’s what this use case would look like:
// *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var databaseName = "bank";
var collectionName = "account";
var amountToTransfer = 500;
var session = db.getMongo().startSession({causalConsistency: false});
var bankDB = session.getDatabase(databaseName);
var accountColl = bankDB[collectionName];
accountColl.drop();
accountColl.insert({name: "Alice", balance: 1000});
accountColl.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
var newAliceBalance = aliceBalance - amountToTransfer;
accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
// add $500 to Bob's account
var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
var newBobBalance = bobBalance + amountToTransfer;
accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
session.commitTransaction();
accountColl.find();
// *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var databaseName = "bank";
var collectionName = "account";
var amountToTransfer = 500;
var session = db.getMongo().startSession({causalConsistency: false});
var bankDB = session.getDatabase(databaseName);
var accountColl = bankDB[collectionName];
accountColl.drop();
accountColl.insert({name: "Alice", balance: 1000});
accountColl.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
var newAliceBalance = aliceBalance - amountToTransfer;
accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
session.abortTransaction();
Multi-Collection Transactions
Our transactions are also multi-collection, which means they can be used to perform multiple operations within a single transaction and across multiple collections. This provides a consistent view of data and maintains your data’s integrity. When you commit the commands as a single <>
, the transactions are all-or-nothing executions—in that, they will either all succeed or all fail.
Here is an example of multi-collection transactions, using the same scenario and data from the example for multi-statement transactions.
// *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var amountToTransfer = 500;
var collectionName = "account";
var session = db.getMongo().startSession({causalConsistency: false});
var accountCollInBankA = session.getDatabase("bankA")[collectionName];
var accountCollInBankB = session.getDatabase("bankB")[collectionName];
accountCollInBankA.drop();
accountCollInBankB.drop();
accountCollInBankA.insert({name: "Alice", balance: 1000});
accountCollInBankB.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
var newAliceBalance = aliceBalance - amountToTransfer;
accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
// add $500 to Bob's account
var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
var newBobBalance = bobBalance + amountToTransfer;
accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
session.commitTransaction();
accountCollInBankA.find(); // Alice holds $500 in bankA
accountCollInBankB.find(); // Bob holds $1500 in bankB
// *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var collectionName = "account";
var amountToTransfer = 500;
var session = db.getMongo().startSession({causalConsistency: false});
var accountCollInBankA = session.getDatabase("bankA")[collectionName];
var accountCollInBankB = session.getDatabase("bankB")[collectionName];
accountCollInBankA.drop();
accountCollInBankB.drop();
accountCollInBankA.insert({name: "Alice", balance: 1000});
accountCollInBankB.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
var newAliceBalance = aliceBalance - amountToTransfer;
accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
// add $500 to Bob's account
var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
var newBobBalance = bobBalance + amountToTransfer;
accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
session.abortTransaction();
accountCollInBankA.find(); // Alice holds $1000 in bankA
accountCollInBankB.find(); // Bob holds $1000 in bankB
Transaction API Examples for Callback API
The callback API is only available for 4.2+ drivers.
Javascript
The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Javascript.
// *** Transfer $500 from Alice to Bob inside a transaction: Success ***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var databaseName = "bank";
var collectionName = "account";
var amountToTransfer = 500;
var session = db.getMongo().startSession({causalConsistency: false});
var bankDB = session.getDatabase(databaseName);
var accountColl = bankDB[collectionName];
accountColl.drop();
accountColl.insert({name: "Alice", balance: 1000});
accountColl.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
assert(aliceBalance >= amountToTransfer);
var newAliceBalance = aliceBalance - amountToTransfer;
accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
assert.eq(newAliceBalance, findAliceBalance);
// add $500 to Bob's account
var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
var newBobBalance = bobBalance + amountToTransfer;
accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
assert.eq(newBobBalance, findBobBalance);
session.commitTransaction();
accountColl.find();
Node.js
The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Node.js.
// Node.js callback API:
const bankDB = await mongoclient.db("bank");
var accountColl = await bankDB.createCollection("account");
var amountToTransfer = 500;
const session = mongoclient.startSession({causalConsistency: false});
await accountColl.drop();
await accountColl.insertOne({name: "Alice", balance: 1000}, { session });
await accountColl.insertOne({name: "Bob", balance: 1000}, { session });
const transactionOptions = {
readConcern: { level: 'snapshot' },
writeConcern: { w: 'majority' }
};
// deduct $500 from Alice's account
var aliceBalance = await accountColl.findOne({name: "Alice"}, {session});
assert(aliceBalance.balance >= amountToTransfer);
var newAliceBalance = aliceBalance - amountToTransfer;
session.startTransaction(transactionOptions);
await accountColl.updateOne({name: "Alice"}, {$set: {balance: newAliceBalance}}, {session });
await session.commitTransaction();
aliceBalance = await accountColl.findOne({name: "Alice"}, {session});
assert(newAliceBalance == aliceBalance.balance);
// add $500 to Bob's account
var bobBalance = await accountColl.findOne({name: "Bob"}, {session});
var newBobBalance = bobBalance.balance + amountToTransfer;
session.startTransaction(transactionOptions);
await accountColl.updateOne({name: "Bob"}, {$set: {balance: newBobBalance}}, {session });
await session.commitTransaction();
bobBalance = await accountColl.findOne({name: "Bob"}, {session});
assert(newBobBalance == bobBalance.balance);
C#
The following code demonstrates how to utilize the Amazon DocumentDB transaction API with C#.
// C# Callback API
var dbName = "bank";
var collName = "account";
var amountToTransfer = 500;
using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false}))
{
var bankDB = client.GetDatabase(dbName);
var accountColl = bankDB.GetCollection<BsonDocument>(collName);
bankDB.DropCollection(collName);
accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } });
accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } });
// start transaction
var transactionOptions = new TransactionOptions(
readConcern: ReadConcern.Snapshot,
writeConcern: WriteConcern.WMajority);
var result = session.WithTransaction(
(sess, cancellationtoken) =>
{
// deduct $500 from Alice's account
var aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceBalance >= amountToTransfer);
var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer;
accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice"),
Builders<BsonDocument>.Update.Set("balance", newAliceBalance));
aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceBalance == newAliceBalance);
// add $500 from Bob's account
var bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
var newBobBalance = bobBalance.AsInt32 + amountToTransfer;
accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob"),
Builders<BsonDocument>.Update.Set("balance", newBobBalance));
bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
Debug.Assert(bobBalance == newBobBalance);
return "Transaction committed";
}, transactionOptions);
// check values outside of transaction
var aliceNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
var bobNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceNewBalance == 500);
Debug.Assert(bobNewBalance == 1500);
}
Ruby
The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Ruby.
// Ruby Callback API
dbName = "bank"
collName = "account"
amountToTransfer = 500
session = client.start_session(:causal_consistency=> false)
bankDB = Mongo::Database.new(client, dbName)
accountColl = bankDB[collName]
accountColl.drop()
accountColl.insert_one({"name"=>"Alice", "balance"=>1000})
accountColl.insert_one({"name"=>"Bob", "balance"=>1000})
# start transaction
session.with_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority}) do
# deduct $500 from Alice's account
aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
assert aliceBalance >= amountToTransfer
newAliceBalance = aliceBalance - amountToTransfer
accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session)
aliceBalance = accountColl.find({"name"=>>"Alice"}, :session=> session).first['balance']
assert_equal(newAliceBalance, aliceBalance)
# add $500 from Bob's account
bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
newBobBalance = bobBalance + amountToTransfer
accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session)
bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
assert_equal(newBobBalance, bobBalance)
end
# check results outside of transaction
aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance']
bobBalance = accountColl.find({"name"=>"Bob"}).first['balance']
assert_equal(aliceBalance, 500)
assert_equal(bobBalance, 1500)
session.end_session
Go
The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Go.
// Go - Callback API
type Account struct {
Name string
Balance int
}
ctx := context.TODO()
dbName := "bank"
collName := "account"
amountToTransfer := 500
session, err := client.StartSession(options.Session().SetCausalConsistency(false))
assert.NilError(t, err)
defer session.EndSession(ctx)
bankDB := client.Database(dbName)
accountColl := bankDB.Collection(collName)
accountColl.Drop(ctx)
_, err = accountColl.InsertOne(ctx, bson.M{"name" : "Alice", "balance":1000})
_, err = accountColl.InsertOne(ctx, bson.M{"name" : "Bob", "balance":1000})
transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
_, err = session.WithTransaction(ctx, func(sessionCtx mongo.SessionContext) (interface{}, error) {
var result Account
// deduct $500 from Alice's account
err = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result)
aliceBalance := result.Balance
newAliceBalance := aliceBalance - amountToTransfer
_, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}})
err = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result)
aliceBalance = result.Balance
assert.Equal(t, aliceBalance, newAliceBalance)
// add $500 to Bob's account
err = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result)
bobBalance := result.Balance
newBobBalance := bobBalance + amountToTransfer
_, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}})
err = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result)
bobBalance = result.Balance
assert.Equal(t, bobBalance, newBobBalance)
if err != nil {
return nil, err
}
return "transaction committed", err
}, transactionOptions)
// check results outside of transaction
var result Account
err = accountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&result)
aliceNewBalance := result.Balance
err = accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result)
bobNewBalance := result.Balance
assert.Equal(t, aliceNewBalance, 500)
assert.Equal(t, bobNewBalance, 1500)
// Go - Core API
type Account struct {
Name string
Balance int
}
func transferMoneyWithRetry(sessionContext mongo.SessionContext, accountColl *mongo.Collection, t *testing.T) error {
amountToTransfer := 500
transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
if err := sessionContext.StartTransaction(transactionOptions); err != nil {
panic(err)
}
var result Account
// deduct $500 from Alice's account
err := accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result)
aliceBalance := result.Balance
newAliceBalance := aliceBalance - amountToTransfer
_, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}})
if err != nil {
sessionContext.AbortTransaction(sessionContext)
}
err = accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result)
aliceBalance = result.Balance
assert.Equal(t, aliceBalance, newAliceBalance)
// add $500 to Bob's account
err = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result)
bobBalance := result.Balance
newBobBalance := bobBalance + amountToTransfer
_, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}})
if err != nil {
sessionContext.AbortTransaction(sessionContext)
}
err = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result)
bobBalance = result.Balance
assert.Equal(t, bobBalance, newBobBalance)
err = sessionContext.CommitTransaction(sessionContext)
return err
}
func doTransactionWithRetry(t *testing.T) {
ctx := context.TODO()
dbName := "bank"
collName := "account"
bankDB := client.Database(dbName)
accountColl := bankDB.Collection(collName)
client.UseSessionWithOptions(ctx, options.Session().SetCausalConsistency(false), func(sessionContext mongo.SessionContext) error {
accountColl.Drop(ctx)
accountColl.InsertOne(sessionContext, bson.M{"name" : "Alice", "balance":1000})
accountColl.InsertOne(sessionContext, bson.M{"name" : "Bob", "balance":1000})
for {
err := transferMoneyWithRetry(sessionContext, accountColl, t)
if err == nil {
println("transaction committed")
return nil
}
if mongoErr := err.(mongo.CommandError); mongoErr.HasErrorLabel("TransientTransactionError") {
continue
}
println("transaction failed")
return err
}
})
// check results outside of transaction
var result Account
accountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&esult)
aliceBalance := result.Balance
assert.Equal(t, aliceBalance, 500)
accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result)
bobBalance := result.Balance
assert.Equal(t, bobBalance, 1500)
}
Java
The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Java.
// Java (sync) - Callback API
MongoDatabase bankDB = mongoClient.getDatabase("bank");
MongoCollection accountColl = bankDB.getCollection("account");
accountColl.drop();
int amountToTransfer = 500;
// add sample data
accountColl.insertOne(new Document("name", "Alice").append("balance", 1000));
accountColl.insertOne(new Document("name", "Bob").append("balance", 1000));
TransactionOptions txnOptions = TransactionOptions.builder()
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build();
ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();
try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) {
clientSession.withTransaction(new TransactionBody<Void>() {
@Override
public Void execute() {
// deduct $500 from Alice's account
List<Document> documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
int aliceBalance = (int) documentList.get(0).get("balance");
int newAliceBalance = aliceBalance - amountToTransfer;
accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance)));
// check Alice's new balance
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
int updatedBalance = (int) documentList.get(0).get("balance");
Assert.assertEquals(updatedBalance, newAliceBalance);
// add $500 to Bob's account
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
int bobBalance = (int) documentList.get(0).get("balance");
int newBobBalance = bobBalance + amountToTransfer;
accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance)));
// check Bob's new balance
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
updatedBalance = (int) documentList.get(0).get("balance");
Assert.assertEquals(updatedBalance, newBobBalance);
return null;
}
}, txnOptions);
}
C
The following code demonstrates how to utilize the Amazon DocumentDB transaction API with C.
// Sample Code for C with Callback
#include <bson.h>
#include <mongoc.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
typedef struct {
int64_t balance;
bson_t *account;
bson_t *opts;
mongoc_collection_t *collection;
} ctx_t;
bool callback_session (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error)
{
bool r = true;
ctx_t *data = (ctx_t *) ctx;
bson_t local_reply;
bson_t *selector = data->account;
bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (data->balance), "}");
mongoc_collection_update_one (data->collection, selector, update, data->opts, &local_reply, error);
*reply = bson_copy (&local_reply);
bson_destroy (&local_reply);
bson_destroy (update);
return r;
}
void test_callback_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){
bson_t reply;
bool r = true;
const bson_t *doc;
bson_iter_t iter;
ctx_t alice_ctx;
ctx_t bob_ctx;
bson_error_t error;
// find query
bson_t *alice_query = bson_new ();
BSON_APPEND_UTF8(alice_query, "name", "Alice");
bson_t *bob_query = bson_new ();
BSON_APPEND_UTF8(bob_query, "name", "Bob");
// create session
// set causal consistency to false
mongoc_session_opt_t *session_opts = mongoc_session_opts_new ();
mongoc_session_opts_set_causal_consistency (session_opts, false);
// start the session
mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error);
// add session to options
bson_t *opts = bson_new();
mongoc_client_session_append (client_session, opts, &error);
// deduct 500 from Alice
// find account balance of Alice
mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64;
assert(alice_balance >= amount_to_transfer);
int64_t new_alice_balance = alice_balance - amount_to_transfer;
// set variables which will be used by callback function
alice_ctx.collection = collection;
alice_ctx.opts = opts;
alice_ctx.balance = new_alice_balance;
alice_ctx.account = alice_query;
// callback
r = mongoc_client_session_with_transaction (client_session, &callback_session, NULL, &alice_ctx, &reply, &error);
assert(r);
// find account balance of Alice after transaction
cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
alice_balance = (bson_iter_value (&iter))->value.v_int64;
assert(alice_balance == new_alice_balance);
assert(alice_balance == 500);
// add 500 to bob's balance
// find account balance of Bob
cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64;
int64_t new_bob_balance = bob_balance + amount_to_transfer;
bob_ctx.collection = collection;
bob_ctx.opts = opts;
bob_ctx.balance = new_bob_balance;
bob_ctx.account = bob_query;
// set read & write concern
mongoc_read_concern_t *read_concern = mongoc_read_concern_new ();
mongoc_write_concern_t *write_concern = mongoc_write_concern_new ();
mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new ();
mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY);
mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
mongoc_transaction_opts_set_write_concern (txn_opts, write_concern);
mongoc_transaction_opts_set_read_concern (txn_opts, read_concern);
// callback
r = mongoc_client_session_with_transaction (client_session, &callback_session, txn_opts, &bob_ctx, &reply, &error);
assert(r);
// find account balance of Bob after transaction
cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
bob_balance = (bson_iter_value (&iter))->value.v_int64;
assert(bob_balance == new_bob_balance);
assert(bob_balance == 1500);
// cleanup
bson_destroy(alice_query);
bson_destroy(bob_query);
mongoc_client_session_destroy(client_session);
bson_destroy(opts);
mongoc_transaction_opts_destroy(txn_opts);
mongoc_read_concern_destroy(read_concern);
mongoc_write_concern_destroy(write_concern);
mongoc_cursor_destroy(cursor);
bson_destroy(doc);
}
int main(int argc, char* argv[]) {
mongoc_init ();
mongoc_client_t* client = mongoc_client_new (<connection uri>);
bson_error_t error;
// connect to bank db
mongoc_database_t *database = mongoc_client_get_database (client, "bank");
// access account collection
mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account");
// set amount to transfer
int64_t amount_to_transfer = 500;
// delete the collection if already existing
mongoc_collection_drop(collection, &error);
// open Alice account
bson_t *alice_account = bson_new ();
BSON_APPEND_UTF8(alice_account, "name", "Alice");
BSON_APPEND_INT64(alice_account, "balance", 1000);
// open Bob account
bson_t *bob_account = bson_new ();
BSON_APPEND_UTF8(bob_account, "name", "Bob");
BSON_APPEND_INT64(bob_account, "balance", 1000);
bool r = true;
r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error);
if (!r) {printf("Error encountered:%s", error.message);}
r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error);
if (!r) {printf("Error encountered:%s", error.message);}
test_callback_money_transfer(client, collection, amount_to_transfer);
}
Python
The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Python.
// Sample Python code with callback api
import pymongo
def callback(session, balance, query):
collection.update_one(query, {'$set': {"balance": balance}}, session=session)
client = pymongo.MongoClient(<connection uri>)
rc_snapshot = pymongo.read_concern.ReadConcern('snapshot')
wc_majority = pymongo.write_concern.WriteConcern('majority')
# To start, drop and create an account collection and insert balances for both Alice and Bob
collection = client.get_database("bank").get_collection("account")
collection.drop()
collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000})
collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000})
amount_to_transfer = 500
# deduct 500 from Alice's account
alice_balance = collection.find_one({"name": "Alice"}).get("balance")
assert alice_balance >= amount_to_transfer
new_alice_balance = alice_balance - amount_to_transfer
with client.start_session({'causalConsistency':False}) as session:
session.with_transaction(lambda s: callback(s, new_alice_balance, {"name": "Alice"}), read_concern=rc_snapshot, write_concern=wc_majority)
updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance")
assert updated_alice_balance == new_alice_balance
# add 500 to Bob's account
bob_balance = collection.find_one({"name": "Bob"}).get("balance")
assert bob_balance >= amount_to_transfer
new_bob_balance = bob_balance + amount_to_transfer
with client.start_session({'causalConsistency':False}) as session:
session.with_transaction(lambda s: callback(s, new_bob_balance, {"name": "Bob"}), read_concern=rc_snapshot, write_concern=wc_majority)
updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance")
assert updated_bob_balance == new_bob_balance
Sample Python code with Core api
import pymongo
client = pymongo.MongoClient(<connection_string>)
rc_snapshot = pymongo.read_concern.ReadConcern('snapshot')
wc_majority = pymongo.write_concern.WriteConcern('majority')
# To start, drop and create an account collection and insert balances for both Alice and Bob
collection = client.get_database("bank").get_collection("account")
collection.drop()
collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000})
collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000})
amount_to_transfer = 500
# deduct 500 from Alice's account
alice_balance = collection.find_one({"name": "Alice"}).get("balance")
assert alice_balance >= amount_to_transfer
new_alice_balance = alice_balance - amount_to_transfer
with client.start_session({'causalConsistency':False}) as session:
session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority)
collection.update_one({"name": "Alice"}, {'$set': {"balance": new_alice_balance}}, session=session)
session.commit_transaction()
updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance")
assert updated_alice_balance == new_alice_balance
# add 500 to Bob's account
bob_balance = collection.find_one({"name": "Bob"}).get("balance")
assert bob_balance >= amount_to_transfer
new_bob_balance = bob_balance + amount_to_transfer
with client.start_session({'causalConsistency':False}) as session:
session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority)
collection.update_one({"name": "Bob"}, {'$set': {"balance": new_bob_balance}}, session=session)
session.commit_transaction()
updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance")
assert updated_bob_balance == new_bob_balance
Transaction API Examples for Core API
Javascript
The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Javascript.
// *** Transfer $500 from Alice to Bob inside a transaction: Success ***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var databaseName = "bank";
var collectionName = "account";
var amountToTransfer = 500;
var session = db.getMongo().startSession({causalConsistency: false});
var bankDB = session.getDatabase(databaseName);
var accountColl = bankDB[collectionName];
accountColl.drop();
accountColl.insert({name: "Alice", balance: 1000});
accountColl.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
assert(aliceBalance >= amountToTransfer);
var newAliceBalance = aliceBalance - amountToTransfer;
accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
assert.eq(newAliceBalance, findAliceBalance);
// add $500 to Bob's account
var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
var newBobBalance = bobBalance + amountToTransfer;
accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
assert.eq(newBobBalance, findBobBalance);
session.commitTransaction();
accountColl.find();
C#
The following code demonstrates how to utilize the Amazon DocumentDB transaction API with C#.
// C# Core API
public void TransferMoneyWithRetry(IMongoCollection<bSondocument> accountColl, IClientSessionHandle session)
{
var amountToTransfer = 500;
// start transaction
var transactionOptions = new TransactionOptions(
readConcern: ReadConcern.Snapshot,
writeConcern: WriteConcern.WMajority);
session.StartTransaction(transactionOptions);
try
{
// deduct $500 from Alice's account
var aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceBalance >= amountToTransfer);
var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer;
accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Alice"),
Builders<bSondocument>.Update.Set("balance", newAliceBalance));
aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceBalance == newAliceBalance);
// add $500 from Bob's account
var bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
var newBobBalance = bobBalance.AsInt32 + amountToTransfer;
accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Bob"),
Builders<bSondocument>.Update.Set("balance", newBobBalance));
bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
Debug.Assert(bobBalance == newBobBalance);
}
catch (Exception e)
{
session.AbortTransaction();
throw;
}
session.CommitTransaction();
}
}
public void DoTransactionWithRetry(MongoClient client)
{
var dbName = "bank";
var collName = "account";
using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false}))
{
try
{
var bankDB = client.GetDatabase(dbName);
var accountColl = bankDB.GetCollection<bSondocument>(collName);
bankDB.DropCollection(collName);
accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } });
accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } });
while(true) {
try
{
TransferMoneyWithRetry(accountColl, session);
break;
}
catch (MongoException e)
{
if(e.HasErrorLabel("TransientTransactionError"))
{
continue;
}
else
{
throw;
}
}
}
// check values outside of transaction
var aliceNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
var bobNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceNewBalance == 500);
Debug.Assert(bobNewBalance == 1500);
}
catch (Exception e)
{
Console.WriteLine("Error running transaction: " + e.Message);
}
}
}
Ruby
The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Ruby.
# Ruby Core API
def transfer_money_w_retry(session, accountColl)
amountToTransfer = 500
session.start_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority})
# deduct $500 from Alice's account
aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
assert aliceBalance >= amountToTransfer
newAliceBalance = aliceBalance - amountToTransfer
accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session)
aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
assert_equal(newAliceBalance, aliceBalance)
# add $500 to Bob's account
bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
newBobBalance = bobBalance + amountToTransfer
accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session)
bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
assert_equal(newBobBalance, bobBalance)
session.commit_transaction
end
def do_txn_w_retry(client)
dbName = "bank"
collName = "account"
session = client.start_session(:causal_consistency=> false)
bankDB = Mongo::Database.new(client, dbName)
accountColl = bankDB[collName]
accountColl.drop()
accountColl.insert_one({"name"=>"Alice", "balance"=>1000})
accountColl.insert_one({"name"=>"Bob", "balance"=>1000})
begin
transferMoneyWithRetry(session, accountColl)
puts "transaction committed"
rescue Mongo::Error => e
if e.label?('TransientTransactionError')
retry
else
puts "transaction failed"
raise
end
end
# check results outside of transaction
aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance']
bobBalance = accountColl.find({"name"=>"Bob"}).first['balance']
assert_equal(aliceBalance, 500)
assert_equal(bobBalance, 1500)
end
Java
The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Java.
// Java (sync) - Core API
public void transferMoneyWithRetry() {
// connect to server
MongoClientURI mongoURI = new MongoClientURI(uri);
MongoClient mongoClient = new MongoClient(mongoURI);
MongoDatabase bankDB = mongoClient.getDatabase("bank");
MongoCollection accountColl = bankDB.getCollection("account");
accountColl.drop();
// insert some sample data
accountColl.insertOne(new Document("name", "Alice").append("balance", 1000));
accountColl.insertOne(new Document("name", "Bob").append("balance", 1000));
while (true) {
try {
doTransferMoneyWithRetry(accountColl, mongoClient);
break;
} catch (MongoException e) {
if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
continue;
} else {
throw e;
}
}
}
}
public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) {
int amountToTransfer = 500;
TransactionOptions txnOptions = TransactionOptions.builder()
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build();
ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();
try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) {
clientSession.startTransaction(txnOptions);
// deduct $500 from Alice's account
List<Document> documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
int aliceBalance = (int) documentList.get(0).get("balance");
Assert.assertTrue(aliceBalance >= amountToTransfer);
int newAliceBalance = aliceBalance - amountToTransfer;
accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance)));
// check Alice's new balance
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
int updatedBalance = (int) documentList.get(0).get("balance");
Assert.assertEquals(updatedBalance, newAliceBalance);
// add $500 to Bob's account
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
int bobBalance = (int) documentList.get(0).get("balance");
int newBobBalance = bobBalance + amountToTransfer;
accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance)));
// check Bob's new balance
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
updatedBalance = (int) documentList.get(0).get("balance");
Assert.assertEquals(updatedBalance, newBobBalance);
// commit transaction
clientSession.commitTransaction();
}
}
// Java (async) -- Core API
public void transferMoneyWithRetry() {
// connect to the server
MongoClient mongoClient = MongoClients.create(uri);
MongoDatabase bankDB = mongoClient.getDatabase("bank");
MongoCollection accountColl = bankDB.getCollection("account");
SubscriberLatchWrapper<Void> dropCallback = new SubscriberLatchWrapper<>();
mongoClient.getDatabase("bank").drop().subscribe(dropCallback);
dropCallback.await();
// insert some sample data
SubscriberLatchWrapper<InsertOneResult> insertionCallback = new SubscriberLatchWrapper<>();
accountColl.insertOne(new Document("name", "Alice").append("balance", 1000)).subscribe(insertionCallback);
insertionCallback.await();
insertionCallback = new SubscriberLatchWrapper<>();
accountColl.insertOne(new Document("name", "Bob").append("balance", 1000)).subscribe(insertionCallback);;
insertionCallback.await();
while (true) {
try {
doTransferMoneyWithRetry(accountColl, mongoClient);
break;
} catch (MongoException e) {
if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
continue;
} else {
throw e;
}
}
}
}
public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) {
int amountToTransfer = 500;
// start the transaction
TransactionOptions txnOptions = TransactionOptions.builder()
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build();
ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();
SubscriberLatchWrapper<ClientSession> sessionCallback = new SubscriberLatchWrapper<>();
mongoClient.startSession(sessionOptions).subscribe(sessionCallback);
ClientSession session = sessionCallback.get().get(0);
session.startTransaction(txnOptions);
// deduct $500 from Alice's account
SubscriberLatchWrapper<Document> findCallback = new SubscriberLatchWrapper<>();
accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback);
Document documentFound = findCallback.get().get(0);
int aliceBalance = (int) documentFound.get("balance");
int newAliceBalance = aliceBalance - amountToTransfer;
SubscriberLatchWrapper<UpdateResult> updateCallback = new SubscriberLatchWrapper<>();
accountColl.updateOne(session, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance))).subscribe(updateCallback);
updateCallback.await();
// check Alice's new balance
findCallback = new SubscriberLatchWrapper<>();
accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback);
documentFound = findCallback.get().get(0);
int updatedBalance = (int) documentFound.get("balance");
Assert.assertEquals(updatedBalance, newAliceBalance);
// add $500 to Bob's account
findCallback = new SubscriberLatchWrapper<>();
accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback);
documentFound = findCallback.get().get(0);
int bobBalance = (int) documentFound.get("balance");
int newBobBalance = bobBalance + amountToTransfer;
updateCallback = new SubscriberLatchWrapper<>();
accountColl.updateOne(session, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance))).subscribe(updateCallback);
updateCallback.await();
// check Bob's new balance
findCallback = new SubscriberLatchWrapper<>();
accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback);
documentFound = findCallback.get().get(0);
updatedBalance = (int) documentFound.get("balance");
Assert.assertEquals(updatedBalance, newBobBalance);
// commit the transaction
SubscriberLatchWrapper<Void> transactionCallback = new SubscriberLatchWrapper<>();
session.commitTransaction().subscribe(transactionCallback);
transactionCallback.await();
}
public class SubscriberLatchWrapper<T> implements Subscriber<T> {
/**
* A Subscriber that stores the publishers results and provides a latch so can block on completion.
*
* @param <T> The publishers result type
*/
private final List<T> received;
private final List<RuntimeException> errors;
private final CountDownLatch latch;
private volatile Subscription subscription;
private volatile boolean completed;
/**
* Construct an instance
*/
public SubscriberLatchWrapper() {
this.received = new ArrayList<>();
this.errors = new ArrayList<>();
this.latch = new CountDownLatch(1);
}
@Override
public void onSubscribe(final Subscription s) {
subscription = s;
subscription.request(Integer.MAX_VALUE);
}
@Override
public void onNext(final T t) {
received.add(t);
}
@Override
public void onError(final Throwable t) {
if (t instanceof RuntimeException) {
errors.add((RuntimeException) t);
} else {
errors.add(new RuntimeException("Unexpected exception", t));
}
onComplete();
}
@Override
public void onComplete() {
completed = true;
subscription.cancel();
latch.countDown();
}
/**
* Get received elements
*
* @return the list of received elements
*/
public List<T> getReceived() {
return received;
}
/**
* Get received elements.
*
* @return the list of receive elements
*/
public List<T> get() {
return await().getReceived();
}
/**
* Await completion or error
*
* @return this
*/
public SubscriberLatchWrapper<T> await() {
subscription.request(Integer.MAX_VALUE);
try {
if (!latch.await(300, TimeUnit.SECONDS)) {
throw new MongoTimeoutException("Publisher onComplete timed out for 300 seconds");
}
} catch (InterruptedException e) {
throw new MongoInterruptedException("Interrupted waiting for observeration", e);
}
if (!errors.isEmpty()) {
throw errors.get(0);
}
return this;
}
public boolean getCompleted() {
return this.completed;
}
public void close() {
subscription.cancel();
received.clear();
}
}
C
The following code demonstrates how to utilize the Amazon DocumentDB transaction API with C.
// Sample C code with core session
bool core_session(mongoc_client_session_t *client_session, mongoc_collection_t* collection, bson_t *selector, int64_t balance){
bool r = true;
bson_error_t error;
bson_t *opts = bson_new();
bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (balance), "}");
// set read & write concern
mongoc_read_concern_t *read_concern = mongoc_read_concern_new ();
mongoc_write_concern_t *write_concern = mongoc_write_concern_new ();
mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new ();
mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY);
mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
mongoc_transaction_opts_set_write_concern (txn_opts, write_concern);
mongoc_transaction_opts_set_read_concern (txn_opts, read_concern);
mongoc_client_session_start_transaction (client_session, txn_opts, &error);
mongoc_client_session_append (client_session, opts, &error);
r = mongoc_collection_update_one (collection, selector, update, opts, NULL, &error);
mongoc_client_session_commit_transaction (client_session, NULL, &error);
bson_destroy (opts);
mongoc_transaction_opts_destroy(txn_opts);
mongoc_read_concern_destroy(read_concern);
mongoc_write_concern_destroy(write_concern);
bson_destroy (update);
return r;
}
void test_core_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){
bson_t reply;
bool r = true;
const bson_t *doc;
bson_iter_t iter;
bson_error_t error;
// find query
bson_t *alice_query = bson_new ();
BSON_APPEND_UTF8(alice_query, "name", "Alice");
bson_t *bob_query = bson_new ();
BSON_APPEND_UTF8(bob_query, "name", "Bob");
// create session
// set causal consistency to false
mongoc_session_opt_t *session_opts = mongoc_session_opts_new ();
mongoc_session_opts_set_causal_consistency (session_opts, false);
// start the session
mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error);
// add session to options
bson_t *opts = bson_new();
mongoc_client_session_append (client_session, opts, &error);
// deduct 500 from Alice
// find account balance of Alice
mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64;
assert(alice_balance >= amount_to_transfer);
int64_t new_alice_balance = alice_balance - amount_to_transfer;
// core
r = core_session (client_session, collection, alice_query, new_alice_balance);
assert(r);
// find account balance of Alice after transaction
cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
alice_balance = (bson_iter_value (&iter))->value.v_int64;
assert(alice_balance == new_alice_balance);
assert(alice_balance == 500);
// add 500 to Bob's balance
// find account balance of Bob
cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64;
int64_t new_bob_balance = bob_balance + amount_to_transfer;
//core
r = core_session (client_session, collection, bob_query, new_bob_balance);
assert(r);
// find account balance of Bob after transaction
cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
bob_balance = (bson_iter_value (&iter))->value.v_int64;
assert(bob_balance == new_bob_balance);
assert(bob_balance == 1500);
// cleanup
bson_destroy(alice_query);
bson_destroy(bob_query);
mongoc_client_session_destroy(client_session);
bson_destroy(opts);
mongoc_cursor_destroy(cursor);
bson_destroy(doc);
}
int main(int argc, char* argv[]) {
mongoc_init ();
mongoc_client_t* client = mongoc_client_new (<connection uri>);
bson_error_t error;
// connect to bank db
mongoc_database_t *database = mongoc_client_get_database (client, "bank");
// access account collection
mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account");
// set amount to transfer
int64_t amount_to_transfer = 500;
// delete the collection if already existing
mongoc_collection_drop(collection, &error);
// open Alice account
bson_t *alice_account = bson_new ();
BSON_APPEND_UTF8(alice_account, "name", "Alice");
BSON_APPEND_INT64(alice_account, "balance", 1000);
// open Bob account
bson_t *bob_account = bson_new ();
BSON_APPEND_UTF8(bob_account, "name", "Bob");
BSON_APPEND_INT64(bob_account, "balance", 1000);
bool r = true;
r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error);
if (!r) {printf("Error encountered:%s", error.message);}
r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error);
if (!r) {printf("Error encountered:%s", error.message);}
test_core_money_transfer(client, collection, amount_to_transfer);
}
Scala
The following code demonstrates how to utilize the Amazon DocumentDB transaction API with Scala.
// Scala Core API
def transferMoneyWithRetry(sessionObservable: SingleObservable[ClientSession] , database: MongoDatabase ): Unit = {
val accountColl = database.getCollection("account")
var amountToTransfer = 500
var transactionObservable: Observable[ClientSession] = sessionObservable.map(clientSession => {
clientSession.startTransaction()
// deduct $500 from Alice's account
var aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance")
assert(aliceBalance >= amountToTransfer)
var newAliceBalance = aliceBalance - amountToTransfer
accountColl.updateOne(clientSession, Document("name" -> "Alice"), Document("$set" -> Document("balance" -> newAliceBalance))).await()
aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance")
assert(aliceBalance == newAliceBalance)
// add $500 to Bob's account
var bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance")
var newBobBalance = bobBalance + amountToTransfer
accountColl.updateOne(clientSession, Document("name" -> "Bob"), Document("$set" -> Document("balance" -> newBobBalance))).await()
bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance")
assert(bobBalance == newBobBalance)
clientSession
})
transactionObservable.flatMap(clientSession => clientSession.commitTransaction()).await()
}
def doTransactionWithRetry(): Unit = {
val client: MongoClient = MongoClientWrapper.getMongoClient()
val database: MongoDatabase = client.getDatabase("bank")
val accountColl = database.getCollection("account")
accountColl.drop().await()
val sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build()
var sessionObservable: SingleObservable[ClientSession] = client.startSession(sessionOptions)
accountColl.insertOne(Document("name" -> "Alice", "balance" -> 1000)).await()
accountColl.insertOne(Document("name" -> "Bob", "balance" -> 1000)).await()
var retry = true
while (retry) {
try {
transferMoneyWithRetry(sessionObservable, database)
println("transaction committed")
retry = false
}
catch {
case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
println("retrying transaction")
}
case other: Throwable => {
println("transaction failed")
retry = false
throw other
}
}
}
// check results outside of transaction
assert(accountColl.find(Document("name" -> "Alice")).results().head.getInteger("balance") == 500)
assert(accountColl.find(Document("name" -> "Bob")).results().head.getInteger("balance") == 1500)
accountColl.drop().await()
}
Supported Commands
Command | Supported |
---|---|
| Yes |
| Yes |
| Yes |
| Yes |
| Yes |
| No |
| No |
| Yes |
Unsupported Capabilities
Methods | Stages or Commands |
---|---|
|
|
|
|
|
|
Sessions
MongoDB sessions are a framework that is used to support retryable writes, causal consistency, transactions, and manage operations across shards. When a session is created, a logical session identifier (lsid) is generated by the client and is used to tag all operations within that session when sending commands to the server.
Amazon DocumentDB supports the use of sessions to enable transactions, but does not support causal consistency or retryable writes.
When utilizing transactions within Amazon DocumentDB, a transaction will be initiated from within a session using the session.startTransaction()
API and a session supports a single transaction at a time. Similarly, transactions are completed using either the commit (session.commitTransaction()
) or abort (session.abortTransaction()
) APIs.
Causal consistency
Causal consistency guarantees that within a single client session the client will observe read-after-write consistency, monatomic reads/writes, and writes will follow reads and these guarantees apply across all instances in a cluster, not just the primary. Amazon DocumentDB does not support causal consistency and the following statement will result in an error.
var mySession = db.getMongo().startSession();
var mySessionObject = mySession.getDatabase('test').getCollection('account');
mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}});
//Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }
mySessionObject.find()
//Error: error: {
// "ok" : 0,
// "code" : 303,
// "errmsg" : "Feature not supported: 'causal consistency'",
// "operationTime" : Timestamp(1603461817, 493214)
//}
mySession.endSession()
You can disable causal consistency within a session. Please note, doing so will enable you to utilize the session framework, but will not provide causal consistency guarantees for reads. When using Amazon DocumentDB, reads from the primary will be read-after-write consistent and reads from the replica instances will be eventually consistent. Transactions are the primary use case for utilizing sessions.
var mySession = db.getMongo().startSession({causalConsistency: false});
var mySessionObject = mySession.getDatabase('test').getCollection('account');
mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}});
//Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }
mySessionObject.find()
//{ "_id" : 1, "name" : "Bob", "balance" : 100 }
//{ "_id" : 2, "name" : "Alice", "balance" : 1700 }
Retryable writes
Retryable writes is a capability in which the client will attempt to retry write operations, one time, when network errors occur or if the client is unable to find the primary. In Amazon DocumentDB, retryable writes are not supported and must be disabled. You can disabled it with the command (retryWrites=false
) in the connection string. Below is an example:
mongodb://chimera:<insertYourPassword>@docdb-2019-01-29-02-57-28.cluster-ccuszbx3pn5e.us-east-1.docdb.amazonaws.com:27017/?ssl=true&ssl_ca_certs=rds-combined-ca-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false
Transaction Errors
When using transactions, there are scenarios that can yeld an error that states that a transaction number does not match any in progress transaction.
The error can be generated in at least two different scenarios:
- After the one-minute transaction timeout.
- After an instance restart (due to patching, crash recovery, etc.), it is possible to receive this error even in cases where the transaction successfully committed. During an instance restart, the database can’t tell the difference between a transaction that successfully completed versus a transaction that aborted. In other words, the transaction completion state is ambiguous.
The best way to handle this error is to make transactional updates idempotent — for example, by using the $set
mutator instead of an increment/decrement operation. See below:
{ "ok" : 0,
"operationTime" : Timestamp(1603938167, 1),
"code" : 251,
"errmsg" : "Given transaction number 1 does not match any in-progress transactions."
}