Distributed Local Writes for Insert Only Workloads
MongoDB Tag Aware Sharding allows administrators to control data distributionin a sharded cluster by defining ranges of the shard key and taggingthem to one or more shards.
This tutorial uses Zones along with a multi-datacentersharded cluster deployment and application-side logic to support distributedlocal writes, as well as high write availability in the event of a replica setelection or datacenter failure.
Tip
Changed in version 4.0.3: By defining the zones and the zone ranges before sharding an emptyor a non-existing collection, the shard collection operation createschunks for the defined zone ranges as well as any additional chunksto cover the entire range of the shard key values and performs aninitial chunk distribution based on the zone ranges. This initialcreation and distribution of chunks allows for faster setup of zonedsharding. After the initial distribution, the balancer manages thechunk distribution going forward.
See Pre-Define Zones and Zone Ranges for an Empty or Non-Existing Collection for an example.
Important
The concepts discussed in this tutorial require a specific deploymentarchitecture, as well as application-level logic.
These concepts require familiarity with MongoDB sharded clusters, replica sets, and the generalbehavior of zones.
This tutorial assumes an insert-only or insert-intensive workload. Theconcepts and strategies discussed in this tutorial are not well suited foruse cases that require fast reads or updates.
Scenario
Consider an insert-intensive application, where reads are infrequent and lowpriority compared to writes. The application writes documents to a shardedcollection, and requires near-constant uptime from thedatabase to support its SLAs or SLOs.
The following represents a partial view of the format of documents theapplication writes to the database:
- {
- "_id" : ObjectId("56f08c447fe58b2e96f595fa"),
- "message_id" : 329620,
- "datacenter" : "alfa",
- "userid" : 123,
- ...
- }
- {
- "_id" : ObjectId("56f08c447fe58b2e96f595fb"),
- "message_id" : 578494,
- "datacenter" : "bravo",
- "userid" : 456,
- ...
- }
- {
- "_id" : ObjectId("56f08c447fe58b2e96f595fc"),
- "message_id" : 689979,
- "datacenter" : "bravo",
- "userid" : 789,
- ...
- }
Shard Key
The collection uses the { datacenter : 1, userid : 1 }
compound index asthe shard key.
The datacenter
field in each document allows for creating a tag range oneach distinct datacenter value. Without the datacenter
field, it would notbe possible to associate a document with a specific datacenter.
The userid
field provides a high cardinalityand low frequency component to the shard keyrelative to datacenter
.
See Choosing a Shard Key for moregeneral instructions on selecting a shard key.
Architecture
The deployment consists of two datacenters, alfa
and bravo
. There aretwo shards, shard0000
and shard0001
. Each shard is a replicaset with three members. shard0000
has two members on alfa
and onepriority 0 member on bravo
.shard0001
has two members on bravo
and one priority 0 member on alfa
.
Tags
This application requires one tag per datacenter. Each shard has onetag assigned to it based on the datacenter containing the majority ofits replica set members. There are two tag ranges, one for each datacenter.
alfa
Datacenter- Tag shards with a majority of members on this datacenter as
alfa
.
Create a tag range with:
- a lower bound of
{ "datacenter" : "alfa", "userid" : MinKey }
, - an upper bound of
{ "datacenter" : "alfa", "userid" : MaxKey }
, and - the tag
alfa
bravo
Datacenter- Tag shards with a majority of members on this datacenter as
bravo
.
Create a tag range with:
- a lower bound of
{ "datacenter" : "bravo", "userid" : MinKey }
, - an upper bound of
{ "datacenter" : "bravo", "userid" : MaxKey }
, and - the tag
bravo
Note
The MinKey andMaxKey values are reserved specialvalues for comparisons
Based on theconfigured tags and tag ranges, mongos
routes documents withdatacenter : alfa
to the alfa
datacenter, and documents withdatacenter : bravo
to the bravo
datacenter.
Write Operations
If an inserted or updated document matches a configured tag range, it can onlybe written to a shard with the related tag.
MongoDB can write documents that do not match a configured tag range to anyshard in the cluster.
Note
The behavior described above requires the cluster to be in a steady statewith no chunks violating a configured tag range. See the following sectionon the balancer formore information.
Balancer
The balancermigrates the tagged chunks to the appropriate shard. Untilthe migration, shards may contain chunks that violate configured tag rangesand tags. Once balancing completes, shards should only contain chunks whoseranges do not violate its assigned tags and tag ranges.
Adding or removing tags or tag ranges can result in chunk migrations.Depending on the size of your data set and the number of chunks a tag rangeaffects, these migrations may impact cluster performance. Consider runningyour balancer during specific scheduled windows.See Schedule the Balancing Window for a tutorial on how to set ascheduling window.
Application Behavior
By default, the application writes to the nearest datacenter. If the localdatacenter is down, or if writes to that datacenter are not acknowledgedwithin a set time period, the application switches to the other availabledatacenter by changing the value of the datacenter
field before attemptingto write the document to the database.
The application supports write timeouts. The application usesWrite Concern to set a timeout for each writeoperation.
If the application encounters a write or timeout error, it modifies thedatacenter
field in each document and performs the write. This routes thedocument to the other datacenter. If both datacenters are down, then writescannot succeed. See Resolve Write Failure.
The application periodically checks connectivity to any datacenters marked as “down”. If connectivity is restored, the application cancontinue performing normal write operations.
Given the switching logic, as well as any load balancers or similar mechanismsin place to handle client traffic between datacenters, the application cannotpredict which of the two datacenters a given document was written to. Toensure that no documents are missed as a part of read operations, theapplication must perform broadcast queries by not including the datacenter
field as apart of any query.
The application performs reads using a read preference of nearest
to reduce latency.
It is possible for a write operation to succeed despite a reported timeouterror. The application responds to the error by attempting to re-writethe document to the other datacenter - this can result in a document beingduplicated across both datacenters. The application resolves duplicatesas a part of the read logic.
Switching Logic
The application has logic to switch datacenters if one or more writes fail, orif writes are not acknowledged within a set timeperiod. The application modifies the datacenter
field based on the targetdatacenter’s tag to direct thedocument towards that datacenter.
For example, an application attempting to write to the alfa
datacentermight follow this general procedure:
- Attempt to write document, specifying
datacenter : alfa
. - On write timeout or error, log
alfa
as momentarily down. - Attempt to write same document, modifying
datacenter : bravo
. - On write timeout or error, log
bravo
as momentarily down. - If both
alfa
andbravo
are down, log and report errors.See Resolve Write Failure.
Procedure
Configure Shard Tags
You must be connected to a mongos
associated with the targetsharded cluster in order to proceed. You cannot create tags byconnecting directly to a shard replica set member.
Tag each shard.
Tag each shard in the alfa
data center with the alfa
tag.
- sh.addShardTag("shard0000", "alfa")
Tag each shard in the bravo
data center with the bravo
tag.
- sh.addShardTag("shard0001", "bravo")
You can review the tags assigned to any given shard by runningsh.status()
.
Define ranges for each tag.
Define the range for the alfa
database and associate it to the alfa
tag using the sh.addTagRange()
method. This method requires:
- The full namespace of the target collection.
- The inclusive lower bound of the range.
- The exclusive upper bound of the range.
- The name of the tag.
- sh.addTagRange(
- "<database>.<collection>",
- { "datacenter" : "alfa", "userid" : MinKey },
- { "datacenter" : "alfa", "userid" : MaxKey },
- "alfa"
- )
Define the range for the bravo
database and associate it to thebravo
tag using the sh.addTagRange()
method. This methodrequires:
- The full namespace of the target collection.
- The inclusive lower bound of the range.
- The exclusive upper bound of the range.
- The name of the tag.
- sh.addTagRange(
- "<database>.<collection>",
- { "datacenter" : "bravo", "userid" : MinKey },
- { "datacenter" : "bravo", "userid" : MaxKey },
- "bravo"
- )
The MinKey
and MaxKey
values are reserved specialvalues for comparisons. MinKey
always compares as less thanevery other possible value, while MaxKey
always compares asgreater than every other possible value. The configured ranges capture everyuser for each datacenter
.
Review the changes.
The next time the balancer runs, itsplits andmigrates chunks across theshards respecting the tag ranges and tags.
Once balancing finishes, the shards tagged as alfa
should onlycontain documents with datacenter : alfa
, while shards tagged asbravo
should only contain documents with datacenter : bravo
.
You can review the chunk distribution by running sh.status()
.
Resolve Write Failure
When the application’s default datacenter is down or inaccessible, theapplication changes the datacenter
field to the otherdatacenter.
For example, the application attempts to write the following document to thealfa
datacenter by default:
- {
- "_id" : ObjectId("56f08c447fe58b2e96f595fa"),
- "message_id" : 329620,
- "datacenter" : "alfa",
- "userid" : 123,
- ...
- }
If the application receives an error on attempted write, or if the writeacknowledgement takes too long, the application logs the datacenter asunavailable and alters the datacenter
field to point to the bravo
datacenter.
- {
- "_id" : ObjectId("56f08c457fe58b2e96f595fb"),
- "message_id" : 329620,
- "datacenter" : "bravo",
- "userid" : 123,
- ...
- }
The application periodically checks the alfa
datacenter forconnectivity. If the datacenter is reachable again, the application can resumenormal writes.
Note
It is possible that the original write to datacenter : alfa
succeeded,especially if the error was related to a timeout.If so, the document with message_id : 329620
may now be duplicatedacross both datacenters. Applications must resolve duplicates as a partof read operations.
Resolve Duplicate Documents on Reads
The application’s switching logic allows for potential document duplication.When performing reads, the application resolves any duplicate documents on theapplication layer.
The following query searches for documents where the userid
is 123
.Note that while userid
is part of the shard key, the query does notinclude the datacenter
field, and therefore does not perform atargeted read operation.
- db.collection.find( { "userid" : 123 } )
The results show that the document with message_id
of 329620
has beeninserted into MongoDB twice, probably as a result of a delayed writeacknowledgement.
- {
- "_id" : ObjectId("56f08c447fe58b2e96f595fa"),
- "message_id" : 329620
- "datacenter" : "alfa",
- "userid" : 123,
- data : {...}
- }
- {
- "_id" : ObjectId("56f08c457fe58b2e96f595fb"),
- "message_id" : 329620
- "datacenter" : "bravo",
- "userid" : 123,
- ...
- }
The application can either ignore the duplicates, taking one of the twodocuments, or it can attempt to trim the duplicates until only a singledocument remains.
One method for trimming duplicates is to use theObjectId.getTimestamp()
method to extract the timestamp from the_id
field. The application can then keep either the first documentinserted, or the last document inserted. This assumes the_id
field uses the MongoDB ObjectId
.
For example, using getTimestamp()
on the documentwith ObjectId("56f08c447fe58b2e96f595fa")
returns:
- ISODate("2016-03-22T00:05:24Z")
Using getTimestamp()
on the document withObjectId("56f08c457fe58b2e96f595fb")
returns:
- ISODate("2016-03-22T00:05:25Z")