$merge (aggregation)
Definition
Note
The following page discusses the $merge
stage, whichoutputs the aggregation pipeline results to a collection. For adiscussion of the $mergeObjects
operator which mergesdocuments into a single document, see $mergeObjects
instead.
New in version 4.2.
Writes the results of the aggregation pipeline to a specified collection. The$merge
operator must be the last stage in thepipeline.
The $merge
stage:
- Can output to a collection in the same or different database.
- Creates a new collection if the output collection does not alreadyexist.
- Can incorporate results (insert new documents, merge documents,replace documents, keep existing documents, fail the operation,process documents with a custom update pipeline) into an existingcollection.
- Can output to a sharded collection. Input collection canalso be sharded.For a comparison with the
$out
stage which also outputsthe aggregation results to a collection, seeComparison with $out.
Syntax
$merge
has the following syntax:
- { $merge: {
- into: <collection> -or- { db: <db>, coll: <collection> },
- on: <identifier field> -or- [ <identifier field1>, ...], // Optional
- let: <variables>, // Optional
- whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
- whenNotMatched: <insert|discard|fail> // Optional
- } }
For example:
- { $merge: { into: "myOutput", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
If using all default options for $merge
(including writingto a collection in the same database), you can use the simplified form:
- { $merge: <collection> } // Output collection is in the same database
The $merge
takes a document with the following fields:
Field | Description | ||||||||
---|---|---|---|---|---|---|---|---|---|
into | The output collection. Specify either:-The collection name as a string to output to a collection inthe same database where the aggregation is run. For example:into: "myOutput" -The database and collection name in a document to output to acollection in the specified database. For example:into: { db:"myDB", coll:"myOutput" } Note- If the output collection does not exist, $merge creates the collection: - For a replica set or a standalone, ifthe output database does not exist, $merge also creates the database. - For a sharded cluster, thespecified output database must already exist.- The output collection cannot be the same collection as thecollection being aggregated.- The output collection cannot appear in any other stages ofthe pipeline.- The output collection can be a sharded collection. | ||||||||
on | Optional. Field or fields that act as a unique identifier for adocument. The identifier determines if a results documentmatches an already existing documentin the output collection. Specify either:-A single field name as a string. For example:on: "_id" -A combination of fields in an array. For example:on: [ "date", "customerId" ] The order of the fields in the array does not matter, and youcannot specify the same field multiple times.For the specified field or fields:- The aggregation results documents must contain the field(s)specified in the on , unless the on field is the_id field. If the _id field is missing from aresults document, MongoDB adds it automatically.- The specified field or fields cannot contain a null or an arrayvalue.$merge requires a unique,index with keys that correspond to the on identifier fields. Although the order of the index keyspecification does not matter, the unique index must only containthe on fields as its keys.- The index must also have the same collation as the aggregation’s collation.- The unique index can be a sparseindex.- For output collections that already exist, the correspondingindex must already exist.The default value for on depends on the output collection:-If the output collection does not exist, the onidentifier must be and defaults to the _id field. Thecorresponding unique _id index is automatically created.TipTo use a different on identifier field(s)for a collection that does not exist, you can create thecollection first by creating a unique index on the desiredfield(s). See the section on non-existent outputcollection for an example.-If the existing output collection is unsharded, the on identifier defaults to the _id field.-If the existing output collection is a sharded collection, theon identifier defaults to all the shardkey fields and the _id field. If specifying a differenton identifier, the on must contain all the shard keyfields. | ||||||||
whenMatched | Optional. The behavior of $merge if a result documentand an existing document in the collection have the same valuefor the specified on field(s).You can specify either:-One of the pre-defined action strings:ActionDescription“replace”Replace the existing document in the outputcollection with the matching resultsdocument.When performing a replace, the replacement documentcannot result in a modification of the _id value or,if the output collection is sharded, the shard keyvalue. Otherwise, the operation results in an error.TipTo avoid this error, if the onfield does not include the _id field, remove the_id field in the aggregation results to avoid theerror, such as with a preceding $unset stage, etc.“keepExisting”Keep the existing document in the outputcollection.“merge” (Default)Merge the matching documents (similar to the$mergeObjects operator). - If the results document contains fields not in theexisting document, add these new fields to theexisting document. - If the results document contains fields in the existingdocument, replace the existing field values with thosefrom the results document.For example, if the output collection has the document:{ _id: 1, a: 1, b: 1 } And the aggregation results has the document:{ _id: 1, b: 5, z: 1 } Then, the merged document is:{ _id: 1, a: 1, b: 5, z: 1 } When performing a merge, the merged document cannotresult in a modification of the _id value or, if theoutput collection is sharded, the shard key value.Otherwise, the operation results in an error.TipTo avoid this error, if the onfield does not include the _id field, remove the_id field in the aggregation results to avoid theerror, such as with a preceding $unset stage, etc.“fail”Stop and fail the aggregation operation. Any changes tothe output collection from previous documents are notreverted.-An aggregation pipeline to update the document in thecollection.[ <stage1>, <stage2> … ] The pipeline can only consist of the following stages: - $addFields and its alias $set - $project and its alias $unset - $replaceRoot and its alias $replaceWith The pipeline cannot modify the on field’svalue. For example, if you are matching on the field month ,the pipeline cannot modify the month field.The whenMatched pipeline can directly access the fields ofthe existing documents in the output collection using$<field> .To access the fields from the aggregation results documents,use either: - The built-in $$new variable to access the field, i.e.$$new.<field> . The $$new variable is only availableif the let specification is omitted. - The user-defined variables in the letfield, i.e. $$<uservariable>.<field> . | ||||||||
let | Optional. Specifies variables accessible for use in thewhenMatched pipelineSpecify a document with the variable name and value expression:{ <var_1>: <expression>, …, <var_n>: <expression> } If unspecified, defaults to { new: "$$ROOT" } ; i.e. thewhenMatched pipeline can accessthe $$new variable.To access the let variables in the whenMatched pipeline, use the double dollar signs ($$)prefix and variable name $$<variable> . | ||||||||
whenNotMatched | Optional. The behavior of $merge if a result documentdoes not match an existing document in the out collection.You can specify one of the pre-defined action strings:
|
Considerations
_id Field Generation
If the _id
field is not present in a document from theaggregation pipeline results, the $merge
stage generatesit automatically.
For example, in the following aggregation pipeline,$project
excludes the _id
field from the documentspassed into $merge
. When $merge
writes thesedocuments to the "newCollection"
, $merge
generates anew _id
field and value.
- db.sales.aggregate( [
- { $project: { _id: 0 } },
- { $merge : { into : "newCollection" } }
- ] )
Create a New Collection if Output Collection is Non-Existent
The $merge
operation creates a new collection if thespecified output collection does not exist.
- The output collection is created when
$merge
writesthe first document into the collection and is immediately visible. - If the aggregation fails, any writes completed by the
$merge
before the error will not be rolled back.
Note
For a replica set or a standalone, if theoutput database does not exist, $merge
also createsthe database.
For a sharded cluster, the specifiedoutput database must already exist.
If the output collection does not exist, $merge
requiresthe on identifier to be the _id
field. To use adifferent on
field value for a collection that does not exist, youcan create the collection first by creating a unique index on thedesired field(s) first. For example, if the output collectionnewDailySales201905
does not exist and you want to specify thesalesDate
field as the on identifier:
- db.newDailySales201905.createIndex( { salesDate: 1 }, { unique: true } )
- db.sales.aggregate( [
- { $match: { date: { $gte: new Date("2019-05-01"), $lt: new Date("2019-06-01") } } },
- { $group: { _id: { $dateToString: { format: "%Y-%m-%d", date: "$date" } }, totalqty: { $sum: "$quantity" } } },
- { $project: { _id: 0, salesDate: { $toDate: "$_id" }, totalqty: 1 } },
- { $merge : { into : "newDailySales201905", on: "salesDate" } }
- ] )
Output to a Sharded Collection
The $merge
stage can output to a sharded collection.When the output collection is sharded, $merge
usesthe _id
field and all the shard key fields as the default on identifier. If you override the default, the on identifier must include all the shard key fields:
- { $merge: {
- into: "<shardedColl>" or { db:"<sharding enabled db>", coll: "<shardedColl>" },
- on: [ "<shardkeyfield1>", "<shardkeyfield2>",... ], // Shard key fields and any additional fields
- let: <variables>, // Optional
- whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
- whenNotMatched: <insert|discard|fail> // Optional
- } }
For example, in a database that has sharding enabled
, use the sh.shardCollection()
methodto create a new sharded collection newrestaurants
with thepostcode
field as the shard key.
- sh.enableSharding("exampledb"); // Sharding must be enabled in the database
- sh.shardCollection(
- "exampledb.newrestaurants", // Namespace of the collection to shard
- { postcode: 1 }, // Shard key
- );
The newrestaurants
collection will contain documents withinformation on new restaurant openings by month (date
field) andpostcode (shard key); specifically, the onidentifier is ["date", "postcode"]
(the ordering of the fieldsdoes not matter). Because $merge
requires a unique,index with keys that correspond to the on identifier fields, create the unique index(the ordering of the fields do not matter): [1]
- use exampledb
- db.newrestaurants.createIndex( { postcode: 1, date: 1 }, { unique: true } )
With the sharded collection restaurants
and the unique indexcreated, you can use $merge
to output the aggregationresults to this collection, matching on [ "date", "postcode" ]
as in the following example:
- use exampledb
- db.openings.aggregate([
- { $group: {
- _id: { date: { $dateToString: { format: "%Y-%m", date: "$date" } }, postcode: "$postcode" },
- restaurants: { $push: "$restaurantName" } } },
- { $project: { _id: 0, postcode: "$_id.postcode", date: "$_id.date", restaurants: 1 } },
- { $merge: { into: "newrestaurants", "on": [ "date", "postcode" ], whenMatched: "replace", whenNotMatched: "insert" } }
- ])
[1] | The sh.shardCollection() method can also create aunique index on the shard key when passed the { unique: true} option if: the shard key is range-based; the collection is empty; and a uniqueindex on the shard key doesn’t already exist.In the example above, because the on identifier is the shardkey and another field, a separate operation to create thecorresponding index is required. |
Replace Documents ($merge) vs Replace Collection ($out)
$merge
can replace an existing document in the outputcollection if the aggregation results contain a document ordocuments that match based on the onspecification. As such, $merge
can replace all documentsin the existing collection if the aggregation results includematching documents for all existing documents in the collection andyou specify “replace” forwhenMatched.
However, to replace an existing collection regardless of theaggregation results, use $out
instead.
Existing Documents and _id and Shard Key Values
The $merge
errors if the $merge
results in achange to an existing document’s _id
value.
Tip
To avoid this error, if the on field does notinclude the _id
field, remove the _id
field in theaggregation results to avoid the error, such as with a preceding$unset
stage, etc.
Additionally, for a sharded collection, $merge
alsoerrors if it results in a change to the shard key value of anexising document.
Any writes completed by the $merge
before the error willnot be rolled back.
Unique Index Constraints
If the unique index used by $merge
for on field(s) is dropped mid-aggregation, there is noguarantee that the aggregation will be killed. If the aggregationcontinues, there is no guarantee that documents do not haveduplicate on
field values.
If the $merge
attempts to write a document that violatesany unique index on the output collection, the operation errors; forexample:
- Insert a non-matching document that violates a unique index otherthan the index on the on field(s).
- Fail if there is a matchingdocument in the collection. Specifically, the operation attemptsto insert the matching document which violates the unique index onthe on field(s).
- Replace an existing document with a new document thatviolates a unique index other than the index on the on field(s).
- Merge the matching documents thatresults in a document that violates a unique index other than theindex on the on field(s).
Comparison with $out
Restrictions
Restrictions | Description |
---|---|
Output Collection | - The output collection cannot be the same collection as thecollection being aggregated.- The output collection cannot be a collection that appears inany other stages of the pipeline. For example, theaggregation pipeline cannot contain a $lookup stage or a $graphLookup stage that refers to theoutput collection. |
Transactions | An aggregation pipeline cannot use $merge inside atransaction. |
View definitionSeparate from materialized view | View definition cannot include the$merge stage. If the view definition includes nestedpipeline (e.g. the view definition includes $facet stage), this $merge stage restriction applies to thenested pipelines as well. |
$lookup stage | $lookup stage’s nested pipeline cannot include the$merge stage. |
$facet stage | $facet stage’s nested pipeline cannot include the$merge stage. |
"linearizable" read concern | The $merge stage cannot be used in conjunction with readconcern "linearizable" . That is, if you specify"linearizable" read concern fordb.collection.aggregate() , you cannot include the$merge stage in the pipeline. |
Examples
On-Demand Materialized View: Initial Creation
If the output collection does not exist, the $merge
createsthe collection.
For example, a collection named salaries
in the zoo
databaseis populated with the employee salary and department history:
- db.getSiblingDB("zoo").salaries.insertMany([
- { "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
- { "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
- { "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
- { "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
- { "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
- { "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
- { "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
- { "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
- { "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
- { "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
- ])
You can use the $group
and $merge
stages toinitially create a collection named budgets
(in the reporting
database) from the data currently in the salaries
collection:
Note
For a replica set or a standalone deployment, if the outputdatabase does not exist, $merge
also creates thedatabase.
For a sharded cluster deployment, the specified output databasemust already exist.
- db.getSiblingDB("zoo").salaries.aggregate( [
- { $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } },
- { $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
- ] )
$group
stage to group the salaries by thefiscal_year
anddept
.$merge
stage writes the output of the preceding$group
stage to thebudgets
collection in thereporting
database.
To view the documents in the new budgets
collection:
- db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )
The budgets
collection contains the following documents:
- { "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
- { "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
- { "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
- { "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
- { "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 }
- { "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }
See also
On-Demand Materialized View: Update/Replace Data
The following example refers to the collections from the previousexample.
The example salaries
collection contains theemployee salary and department history:
- { "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
- { "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
- { "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
- { "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
- { "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
- { "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
- { "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
- { "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
- { "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
- { "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
The example budgets
collection contains the cumulative yearlybudgets:
- { "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
- { "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
- { "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
- { "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
- { "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 }
- { "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }
During the current fiscal year (2019
in this example), new employeesare added to the salaries
collection and new head counts arepre-allocated for the next year:
- db.getSiblingDB("zoo").salaries.insertMany([
- { "_id" : 11, employee: "Wren", dept: "Z", salary: 100000, fiscal_year: 2019 },
- { "_id" : 12, employee: "Zebra", dept: "A", salary: 150000, fiscal_year: 2019 },
- { "_id" : 13, employee: "headcount1", dept: "Z", salary: 120000, fiscal_year: 2020 },
- { "_id" : 14, employee: "headcount2", dept: "Z", salary: 120000, fiscal_year: 2020 }
- ])
To update the budgets
collection to reflect the newsalary information, the following aggregation pipeline uses:
$match
stage to find all documents withfiscal_year
greater than or equal to2019
.$group
stage to group the salaries by thefiscal_year
anddept
.$merge
to write the result set to thebudgets
collection, replacing documents with the same_id
value (inthis example, a document with the fiscal year and dept). Fordocuments that do not have matches in the collection,$merge
inserts the new documents.
- db.getSiblingDB("zoo").salaries.aggregate( [
- { $match : { fiscal_year: { $gte : 2019 } } },
- { $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } },
- { $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
- ] )
After the aggregation is run, view the documents in the budgets
collection:
- db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )
The budgets
collection incorporates the new salary data for fiscalyear 2019 and adds new documents for fiscal year 2020:
- { "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
- { "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
- { "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
- { "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
- { "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 275000 }
- { "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 410000 }
- { "_id" : { "fiscal_year" : 2020, "dept" : "Z" }, "salaries" : 240000 }
See also
Only Insert New Data
To ensure that the $merge
does not overwrite existing datain the collection, set whenMatched tokeepExisting or fail.
The example salaries
collection in the zoo
database containsthe employee salary and department history:
- { "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
- { "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
- { "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
- { "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
- { "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
- { "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
- { "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
- { "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
- { "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
- { "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
A collection orgArchive
in the reporting
databasecontains historical departmental organization records for the pastfiscal years. Archived records should not be modified.
- { "_id" : ObjectId("5cd8c68261baa09e9f3622be"), "employees" : [ "Ant", "Gecko" ], "dept" : "A", "fiscal_year" : 2018 }
- { "_id" : ObjectId("5cd8c68261baa09e9f3622bf"), "employees" : [ "Ant", "Bee" ], "dept" : "A", "fiscal_year" : 2017 }
- { "_id" : ObjectId("5cd8c68261baa09e9f3622c0"), "employees" : [ "Bee", "Cat" ], "dept" : "Z", "fiscal_year" : 2018 }
- { "_id" : ObjectId("5cd8c68261baa09e9f3622c1"), "employees" : [ "Cat" ], "dept" : "Z", "fiscal_year" : 2017 }
The orgArchive
collection has a unique compound indexon the fiscal_year
and dept
fields; i.e. there should be atmost one record for the same fiscal year and department combination:
- db.getSiblingDB("reporting").orgArchive.createIndex ( { fiscal_year: 1, dept: 1 }, { unique: true } )
At the end of current fiscal year (2019
in this example), thesalaries
collection contain the following documents:
- { "_id" : 1, "employee" : "Ant", "dept" : "A", "salary" : 100000, "fiscal_year" : 2017 }
- { "_id" : 2, "employee" : "Bee", "dept" : "A", "salary" : 120000, "fiscal_year" : 2017 }
- { "_id" : 3, "employee" : "Cat", "dept" : "Z", "salary" : 115000, "fiscal_year" : 2017 }
- { "_id" : 4, "employee" : "Ant", "dept" : "A", "salary" : 115000, "fiscal_year" : 2018 }
- { "_id" : 5, "employee" : "Bee", "dept" : "Z", "salary" : 145000, "fiscal_year" : 2018 }
- { "_id" : 6, "employee" : "Cat", "dept" : "Z", "salary" : 135000, "fiscal_year" : 2018 }
- { "_id" : 7, "employee" : "Gecko", "dept" : "A", "salary" : 100000, "fiscal_year" : 2018 }
- { "_id" : 8, "employee" : "Ant", "dept" : "A", "salary" : 125000, "fiscal_year" : 2019 }
- { "_id" : 9, "employee" : "Bee", "dept" : "Z", "salary" : 160000, "fiscal_year" : 2019 }
- { "_id" : 10, "employee" : "Cat", "dept" : "Z", "salary" : 150000, "fiscal_year" : 2019 }
- { "_id" : 11, "employee" : "Wren", "dept" : "Z", "salary" : 100000, "fiscal_year" : 2019 }
- { "_id" : 12, "employee" : "Zebra", "dept" : "A", "salary" : 150000, "fiscal_year" : 2019 }
- { "_id" : 13, "employee" : "headcount1", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }
- { "_id" : 14, "employee" : "headcount2", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }
To update the orgArchive
collection to include the fiscalyear 2019
that has just ended, the following aggregation pipelineuses:
$match
stage to find all documents withfiscal_year
equal to2019
.$group
stage to group the employees by thefiscal_year
anddept
.$project
stage to suppress the_id
field and addseparatedept
andfiscal_year
field. When the documentsare passed to$merge
,$merge
automaticallygenerates a new_id
field for the documents.$merge
to write the result set toorgArchive
.
The $merge
stage matches documents on the dept
and fiscal_year
fields and fails
when matched. That is, if a document already exists for the samedepartment and fiscal year, the $merge
errors.
- db.getSiblingDB("zoo").salaries.aggregate( [
- { $match: { fiscal_year: 2019 }},
- { $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, employees: { $push: "$employee" } } },
- { $project: { _id: 0, dept: "$_id.dept", fiscal_year: "$_id.fiscal_year", employees: 1 } },
- { $merge : { into : { db: "reporting", coll: "orgArchive" }, on: [ "dept", "fiscal_year" ], whenMatched: "fail" } }
- ] )
After the operation, the orgArchive
collection contains the followingdocuments:
- { "_id" : ObjectId("5caccc6a66b22dd8a8cc419f"), "employees" : [ "Ahn", "Bess" ], "dept" : "A", "fiscal_year" : 2017 }
- { "_id" : ObjectId("5caccc6a66b22dd8a8cc419e"), "employees" : [ "Ahn", "Gee" ], "dept" : "A", "fiscal_year" : 2018 }
- { "_id" : ObjectId("5caccd0b66b22dd8a8cc438e"), "employees" : [ "Ahn", "Zeb" ], "dept" : "A", "fiscal_year" : 2019 }
- { "_id" : ObjectId("5caccc6a66b22dd8a8cc41a0"), "employees" : [ "Carl" ], "dept" : "Z", "fiscal_year" : 2017 }
- { "_id" : ObjectId("5caccc6a66b22dd8a8cc41a1"), "employees" : [ "Bess", "Carl" ], "dept" : "Z", "fiscal_year" : 2018 }
- { "_id" : ObjectId("5caccd0b66b22dd8a8cc438d"), "employees" : [ "Bess", "Carl", "Wen" ], "dept" : "Z", "fiscal_year" : 2019 }
If the orgArchive
collection already contained a document for2019 for department "A"
and/or "B"
, the aggregationfails because of the duplicate key error. However, any document insertedbefore the error will not be rolled back.
If you specify keepExisting for the matching document, theaggregation does not affect the matching document and does not errorwith duplicate key error. Similarly, if you specifyreplace, theoperation would not fail; however, the operation would replace theexisting document.
Merge Results from Multiple Collections
By default, if a document in the aggregation results matches adocument in the collection, the $merge
stagemerges the documents.
An example collection purchaseorders
is populated with thepurchase order information by quarter and regions:
- db.purchaseorders.insertMany( [
- { _id: 1, quarter: "2019Q1", region: "A", qty: 200, reportDate: new Date("2019-04-01") },
- { _id: 2, quarter: "2019Q1", region: "B", qty: 300, reportDate: new Date("2019-04-01") },
- { _id: 3, quarter: "2019Q1", region: "C", qty: 700, reportDate: new Date("2019-04-01") },
- { _id: 4, quarter: "2019Q2", region: "B", qty: 300, reportDate: new Date("2019-07-01") },
- { _id: 5, quarter: "2019Q2", region: "C", qty: 1000, reportDate: new Date("2019-07-01") },
- { _id: 6, quarter: "2019Q2", region: "A", qty: 400, reportDate: new Date("2019-07-01") },
- ] )
Another example collection reportedsales
is populated with thereported sales information by quarter and regions:
- db.reportedsales.insertMany( [
- { _id: 1, quarter: "2019Q1", region: "A", qty: 400, reportDate: new Date("2019-04-02") },
- { _id: 2, quarter: "2019Q1", region: "B", qty: 550, reportDate: new Date("2019-04-02") },
- { _id: 3, quarter: "2019Q1", region: "C", qty: 1000, reportDate: new Date("2019-04-05") },
- { _id: 4, quarter: "2019Q2", region: "B", qty: 500, reportDate: new Date("2019-07-02") },
- ] )
Assume that, for reporting purposes, you want to view the data byquarter in the following format:
- { "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
- { "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }
You can use the $merge
to merge in results from thepurchaseorders
collection and the reportedsales
collectionto create a new collection quarterlyreport
.
To create the quarterlyreport
collection, you can use thefollowing pipeline:
- db.purchaseorders.aggregate( [
- { $group: { _id: "$quarter", purchased: { $sum: "$qty" } } }, // group purchase orders by quarter
- { $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
- ])
- First stage:
- The
$group
stage groups by the quarter and uses$sum
to add theqty
fields into a newpurchased
field. For example:
- { "_id" : "2019Q2", "purchased" : 1700 }
- { "_id" : "2019Q1", "purchased" : 1200 }
- Second stage:
- The
merge
stage writes the documents to thequarterlyreport
collection in the same database. If the stagefinds an existing document in the collection that matcheson the_id
field, the stage merges the matchingdocuments. Otherwise, the stage inserts the document. For theinitial creation, no documents should match.
To view the documents in the collection, run the following operation:
- db.quarterlyreport.find().sort( { _id: 1 } )
The collection contains the following documents:
- { "_id" : "2019Q1", "sales" : 1200, "purchased" : 1200 }
- { "_id" : "2019Q2", "sales" : 1700, "purchased" : 1700 }
Similarly, run the following aggregation pipeline against thereportedsales
collection to merge the sales results into thequarterlyreport
collection.
- db.reportedsales.aggregate( [
- { $group: { _id: "$quarter", sales: { $sum: "$qty" } } }, // group sales by quarter
- { $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
- ])
- First stage:
- The
$group
stage groups by the quarter and uses$sum
to add theqty
fields into a newsales
field. For example:
- { "_id" : "2019Q2", "sales" : 500 }
- { "_id" : "2019Q1", "sales" : 1950 }
- Second stage:
- The
merge
stage writes the documents to thequarterlyreport
collection in the same database. If the stagefinds an existing document in the collection that matcheson the_id
field (the quarter), the stage mergesthe matching documents. Otherwise, the stage inserts the document.
To view the documents in the quarterlyreport
collection afterthe data has been merged, run the following operation:
- db.quarterlyreport.find().sort( { _id: 1 } )
The collection contains the following documents:
- { "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
- { "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }
Use the Pipeline to Customize the Merge
The $merge
can use a custom update pipeline when documents match. ThewhenMatched pipeline can havethe following stages:
$addFields
and its alias$set
$project
and its alias$unset
$replaceRoot
and its alias$replaceWith
An example collection votes
is populated with the daily votetally. Create the collection with the following documents:s
- db.votes.insertMany([
- { date: new Date("2019-05-01"), "thumbsup" : 1, "thumbsdown" : 1 },
- { date: new Date("2019-05-02"), "thumbsup" : 3, "thumbsdown" : 1 },
- { date: new Date("2019-05-03"), "thumbsup" : 1, "thumbsdown" : 1 },
- { date: new Date("2019-05-04"), "thumbsup" : 2, "thumbsdown" : 2 },
- { date: new Date("2019-05-05"), "thumbsup" : 6, "thumbsdown" : 10 },
- { date: new Date("2019-05-06"), "thumbsup" : 13, "thumbsdown" : 16 }
- ])
Another example collection monthlytotals
has the up-to-datemonthly vote totals. Create the collection with the followingdocument:
- db.monthlytotals.insertOne({ "_id" : "2019-05", "thumbsup" : 26, "thumbsdown" : 31 } )
At the end of each day, that day’s votes is inserted into thevotes
collection:
- db.votes.insertOne(
- { date: new Date("2019-05-07"), "thumbsup" : 14, "thumbsdown" : 10 }
- )
You can use $merge
with an custom pipeline to update theexisting document in the collection monthlytotals
:
- db.votes.aggregate([
- { $match: { date: { $gte: new Date("2019-05-07"), $lt: new Date("2019-05-08") } } },
- { $project: { _id: { $dateToString: { format: "%Y-%m", date: "$date" } }, thumbsup: 1, thumbsdown: 1 } },
- { $merge: {
- into: "monthlytotals",
- on: "_id",
- whenMatched: [
- { $addFields: {
- thumbsup: { $add:[ "$thumbsup", "$$new.thumbsup" ] },
- thumbsdown: { $add: [ "$thumbsdown", "$$new.thumbsdown" ] }
- } } ],
- whenNotMatched: "insert"
- } }
- ])
- First stage:
- The
$match
stage finds the specific day’s votes. Forexample:
- { "_id" : ObjectId("5ce6097c436eb7e1203064a6"), "date" : ISODate("2019-05-07T00:00:00Z"), "thumbsup" : 14, "thumbsdown" : 10 }
- Second stage:
- The
$project
stage sets the_id
field to ayear-month string. For example:
- { "thumbsup" : 14, "thumbsdown" : 10, "_id" : "2019-05" }
- Third stage:
The
merge
stage writes the documents to themonthlytotals
collection in the same database. If the stagefinds an existing document in the collection that matcheson the_id
field, the stage uses a pipeline toadd thethumbsup
votes and thethumbsdown
votes.- This pipeline cannot directly accesses the fields from theresults document. To access the
thumbsup
field and thethumbsdown
field in the results document, the pipeline usesthe$$new
variable; i.e.$$new.thumbsup
and$new.thumbsdown
. - This pipeline can directly accesses the
thumbsup
fieldand thethumbsdown
field in the existing document in thecollection; i.e.$thumbsup
and$thumbsdown
.The resulting document replaces the existing document.
- This pipeline cannot directly accesses the fields from theresults document. To access the
To view documents in the monthlytotals
collection after the mergeoperation, run the following operation:
- db.monthlytotals.find()
The collection contains the following document:
- { "_id" : "2019-05", "thumbsup" : 40, "thumbsdown" : 41 }