Perform Incremental Map-Reduce
Map-reduce operations can handle complex aggregation tasks. To performmap-reduce operations, MongoDB provides the mapReduce
command and, in the mongo
shell, thedb.collection.mapReduce()
wrapper method.
If the map-reduce data set is constantly growing, you may want toperform an incremental map-reduce rather thanperforming the map-reduce operation over the entire data set each time.
To perform incremental map-reduce:
- Run a map-reduce job over the current collection and output theresult to a separate collection.
- When you have more data to process, run subsequent map-reduce jobwith:
- the
query
parameter that specifies conditions that matchonly the new documents. - the
out
parameter that specifies thereduce
action tomerge the new results into the existing output collection.Consider the following example where you schedule a map-reduceoperation on asessions
collection to run at the end of each day.
- the
Data Setup
The sessions
collection contains documents that log users’ sessionseach day, for example:
- db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );
- db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );
- db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );
- db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );
- db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );
- db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );
- db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );
- db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );
Initial Map-Reduce of Current Collection
Run the first map-reduce operation as follows:
- Define the map function that maps the
userid
to anobject that contains the fieldsuserid
,total_time
,count
,andavg_time
:
- var mapFunction = function() {
- var key = this.userid;
- var value = {
- userid: this.userid,
- total_time: this.length,
- count: 1,
- avg_time: 0
- };
- emit( key, value );
- };
- Define the corresponding reduce function with two arguments
key
andvalues
to calculate the total time and the count.Thekey
corresponds to theuserid
, and thevalues
is anarray whose elements corresponds to the individual objects mapped to theuserid
in themapFunction
.
- var reduceFunction = function(key, values) {
- var reducedObject = {
- userid: key,
- total_time: 0,
- count:0,
- avg_time:0
- };
- values.forEach( function(value) {
- reducedObject.total_time += value.total_time;
- reducedObject.count += value.count;
- }
- );
- return reducedObject;
- };
- Define the finalize function with two arguments
key
andreducedValue
. The function modifies thereducedValue
documentto add another fieldaverage
and returns the modified document.
- var finalizeFunction = function (key, reducedValue) {
- if (reducedValue.count > 0)
- reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
- return reducedValue;
- };
- Perform map-reduce on the
session
collection using themapFunction
, thereduceFunction
, and thefinalizeFunction
functions. Output the results to a collectionsession_stat
. If thesession_stat
collection already exists,the operation will replace the contents:
- db.sessions.mapReduce( mapFunction,
- reduceFunction,
- {
- out: "session_stat",
- finalize: finalizeFunction
- }
- )
Subsequent Incremental Map-Reduce
Later, as the sessions
collection grows, you can run additionalmap-reduce operations. For example, add new documents to thesessions
collection:
- db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } );
- db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } );
- db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } );
- db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } );
At the end of the day, perform incremental map-reduce on thesessions
collection, but use the query
field to select only thenew documents. Output the results to the collection session_stat
,but reduce
the contents with the results of the incrementalmap-reduce:
- db.sessions.mapReduce( mapFunction,
- reduceFunction,
- {
- query: { ts: { $gt: ISODate('2011-11-05 00:00:00') } },
- out: { reduce: "session_stat" },
- finalize: finalizeFunction
- }
- );