db.collection.watch()
Definition
mongo
Shell Method
This page documents the mongo
shell method, and doesnot refer to the MongoDB Node.js driver (or any other driver)method. For corresponding MongoDB driver API, refer to your specificMongoDB driver documentation instead.
For replica sets and sharded clusters only
Opens a change stream cursor on thecollection.
ParameterTypeDescriptionpipeline
arrayAggregation pipeline consistingof one or more of the following aggregation stages:
$addFields
$match
$project
$replaceRoot
$replaceWith
(Available starting in MongoDB 4.2)$redact
$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)Specify a pipeline to filter/modify the change events output.
Starting in MongoDB 4.2, change streams will throw an exception ifthe change stream aggregation pipeline modifies an event’s _id field.options
documentOptional. Additional options that modify the behavior ofwatch()
.
You must pass an empty array []
to the pipeline
parameter ifyou are not specifying a pipeline but are passing the options
document.
The options
document can contain the following fields and values:
FieldTypeDescriptionresumeAfter
documentOptional. Directs watch
to attempt resumingnotifications starting after the operation specified in the resumetoken.
Each change stream event document includes a resume token as theid
field. Pass the _entire _id
field of the change eventdocument that represents the operation you want to resume after.
resumeAfter
is mutually exclusive with startAfter
andstartAtOperationTime
.startAfter
documentOptional. Directs watch
to attempt starting a newchange stream after the operation specified in the resume token.Allows notifications to resume after an invalidate event.
Each change stream event document includes a resume token as theid
field. Pass the _entire _id
field of the change eventdocument that represents the operation you want to resume after.
startAfter
is mutually exclusive with resumeAfter
andstartAtOperationTime
.
New in version 4.2.
fullDocument
stringOptional. By default, watch()
returns the delta ofthose fields modified by an update operation, instead of the entireupdated document.
Set fullDocument
to "updateLookup"
to directwatch()
to look up the most currentmajority-committed version of the updated document.watch()
returns a fullDocument
field withthe document lookup in addition to the updateDescription
delta.batchSize
intOptional. Specifies the maximum number of change events to return in eachbatch of the response from the MongoDB cluster.
Has the same functionality as cursor.batchSize()
.maxAwaitTimeMS
intOptional. The maximum amount of time in milliseconds the server waits for newdata changes to report to the change stream cursor before returning anempty batch.
Defaults to 1000
milliseconds.collation
documentOptional. Pass a collation documentto specify a collation for thechange stream cursor.
Starting in MongoDB 4.2, defaults to simple
binary comparison ifomitted. In earlier versions, change streams opened on a singlecollection would inherit the collection’s default collation.startAtOperationTime
TimestampOptional. The starting point for the change stream. If the specified startingpoint is in the past, it must be in the time range of the oplog. Tocheck the time range of the oplog, seers.printReplicationInfo()
.
startAtOperationTime
is mutually exclusive with resumeAfter
and startAfter
.
New in version 4.0.
Returns:A cursor that remains open as long as a connection to theMongoDB deployment remains open and the collection exists.See Change Events for examples of changeevent documents.
See also
Availability
Deployment
db.collection.watch()
is available for replica set andsharded cluster deployments :
- For a replica set, you can issue
db.collection.watch()
onany data-bearing member. - For a sharded cluster, you must issue
db.collection.watch()
on amongos
instance.
Storage Engine
You can only use db.collection.watch()
with the WiredTiger storage engine.
Read Concern majority Support
Starting in MongoDB 4.2, change streams areavailable regardless of the "majority"
read concernsupport; that is, read concern majority
support can be eitherenabled (default) or disabledto use change streams.
In MongoDB 4.0 and earlier, change streams areavailable only if "majority"
read concern support isenabled (default).
Behavior
db.collection.watch()
only notifies on data changes that havepersisted to a majority of data-bearing members.- The change stream cursor remains open untilone of the following occurs:
- The cursor is explicitly closed.
- An invalidate event occurs; forexample, a collection drop or rename.
- The connection to the MongoDB deployment is closed.
- If the deployment is a sharded cluster, a shard removal may cause anopen change stream cursor to close, and the closed change stream cursor maynot be fully resumable.
Resumability
Unlike the MongoDB drivers, themongo
shell does not automatically attempt to resume achange stream cursor after an error. The MongoDB drivers make _one_attempt to automatically resume a change stream cursor after certainerrors.
db.collection.watch()
uses information stored in the oplog to produce thechange event description and generate a resume token associated tothat operation. If the operation identified by the resume tokenpassed to the resumeAfter
or startAfter
option has alreadydropped off the oplog, db.collection.watch()
cannot resume thechange stream.
See Resume a Change Stream for more information onresuming a change stream.
Note
- You cannot use
resumeAfter
to resume a change stream after aninvalidate event (for example, a collectiondrop or rename) closes the stream. Starting in MongoDB 4.2, you can usestartAfter to start a new changestream after an invalidate event. - If the deployment is a sharded cluster, a shard removal may cause anopen change stream cursor to close, and the closed change stream cursor maynot be fully resumable.
Full Document Lookup of Update Operations
By default, the change stream cursor returnsspecific field changes/deltas for update operations. You can alsoconfigure the change stream to look up and return the currentmajority-committed version of the changed document. Depending on otherwrite operations that may have occurred between the update and thelookup, the returned document may differ significantly from thedocument at the time of the update.
Depending on the number of changes applied during the update operationand the size of the full document, there is a risk that the size of thechange event document for an update operation is greater than the 16MBBSON document limit. If this occurs, the server closes the change streamcursor and returns an error.
Access Control
When running with access control, the user must have thefind
and changeStream
privilege actions onthe collection resource. That is, a user musthave a role that grants the following privilege:
- { resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
The built-in read
role provides the appropriateprivileges.
Examples
Open a Change Stream
The following operation opens a change stream cursor against thedata.sensors
collection:
- watchCursor = db.getSiblingDB("data").sensors.watch()
Iterate the cursor to check for new events. Use thecursor.isExhausted()
method to ensure the loop only exitsif the change stream cursor is closed and there are no objectsremaining in the latest batch:
- while (!watchCursor.isExhausted()){
- if (watchCursor.hasNext()){
- printjson(watchCursor.next());
- }
- }
For complete documentation on change stream output, seeChange Events.
Change Stream with Full Document Update Lookup
Set the fullDocument
option to "updateLookup"
to direct thechange stream cursor to lookup the most current majority-committedversion of the document associated to an update change stream event.
The following operation opens a change stream cursor againstthe data.sensors
collection using thefullDocument : "updateLookup"
option.
- watchCursor = db.getSiblingDB("data").sensors.watch(
- [],
- { fullDocument : "updateLookup" }
- )
Iterate the cursor to check for new events. Use thecursor.isExhausted()
method to ensure the loop only exitsif the change stream cursor is closed and there are no objectsremaining in the latest batch:
- while (!watchCursor.isExhausted()){
- if (watchCursor.hasNext()){
- printjson(watchCursor.next());
- }
- }
For any update operation, the change event returns the result of thedocument lookup in the fullDocument
field.
For an example of the full document update output, see changestream update event.
For complete documentation on change stream output, seeChange Events.
Change Stream with Aggregation Pipeline Filter
Note
Starting in MongoDB 4.2, change streams will throw an exception ifthe change stream aggregation pipeline modifies an event’s _id field.
The following operation opens a change stream cursor against thedata.sensors
collection using an aggregation pipeline tofilter only insert
events:
- watchCursor = db.getSiblingDB("data").sensors.watch(
- [
- { $match : {"operationType" : "insert" } }
- ]
- )
Iterate the cursor to check for new events. Use thecursor.isExhausted()
method to ensure the loop only exitsif the change stream cursor is closed and there are no objectsremaining in the latest batch:
- while (!watchCursor.isExhausted()){
- if (watchCursor.hasNext()){
- printjson(watchCursor.next());
- }
- }
The change stream cursor only returns change events where theoperationType
is insert
. For complete documentation onchange stream output, see Change Events.
Resuming a Change Stream
Every document returned by a change stream cursor includes a resumetoken as the _id
field. To resume a change stream, pass the entire_id
document of the change event you want to resume from toeither the resumeAfter
or startAfter
option ofwatch()
.
The following operation resumes a change stream cursor against thedata.sensors
collection using a resume token. Thisassumes that the operation that generated the resume token has notrolled off the cluster’s oplog.
- let watchCursor = db.getSiblingDB("data").sensors.watch();
- let firstChange;
- while (!watchCursor.isExhausted()) {
- if (watchCursor.hasNext()) {
- firstChange = watchCursor.next();
- break;
- }
- }
- watchCursor.close();
- let resumeToken = firstChange._id;
- resumedWatchCursor = db.getSiblingDB("data").sensors.watch(
- [],
- { resumeAfter : resumeToken }
- )
Iterate the cursor to check for new events. Use thecursor.isExhausted()
method to ensure the loop only exitsif the change stream cursor is closed and there are no objectsremaining in the latest batch:
- while (!resumedWatchCursor.isExhausted()){
- if (resumedWatchCursor.hasNext()){
- printjson(watchCursor.next());
- }
- }
See Resume a Change Stream for complete documentation onresuming a change stream.