Definition
The $merge stage specifies a connection in the Connection Registry to write messages to. The connection must be an Atlas connection.
A $merge pipeline stage has the following prototype form:
{ "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> }, "on": "<identifier field>" | [ "<identifier field1>", ...], "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "whenMatched": "replace | keepExisting | merge | delete | <pipeline> | <expression>", "whenNotMatched": "insert | discard | expression", "parallelism": <integer> } }
Syntax
The Atlas Stream Processing version of $merge
uses most of the same fields as the Atlas Data Federation version. Atlas Stream Processing also
uses the following fields that are either unique to its implementation
of $merge, or modified to suit it. To learn more about the fields
shared with Atlas Data Federation $merge, see $merge syntax.
Field | Necessity | Description |
|---|---|---|
| Required | Simplified to reflect Atlas Stream Processing supporting To learn more, see this description
of the Atlas Data Federation |
| Optional | Extends functionality compared to the Atlas Data Federation When set to If you use a dynamic expression value, it must resolve to one of the following strings:
|
| Optional | Extends functionality compared to the Atlas Data Federation If you use a dynamic expression value, it must resolve to one of the following strings:
|
| Conditional | Number of threads to which to distribute write operations.
Must be an integer value between If you use a dynamic expression value for |
Behavior
Limitations
$merge must be the last stage of any pipeline it appears in. You can
use only one $merge stage per pipeline.
The on field has special requirements for $merge against
sharded collections. To learn more, see
$merge syntax.
If you use a dynamic expression value for into.coll or into.db,
you can't set a parallelism value greater than 1.
$merge can't write to time series collections. To write documents to time series
collections, use the $emit stage.
You need to have Atlas admin role to use
$merge on sharded collections.
Dynamic Expressions
You can use a dynamic expression as the value of the following fields:
into.dbinto.coll
This enables your stream processor to write messages to different target Atlas collections on a message-by-message basis.
Example
You have a stream of transaction events that generates messages of the following form:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
To sort each of these into a distinct Atlas database and
collection, you can write the following $merge stage:
$merge: { into: { connectionName: "db1", db: "$customerStatus", coll: "$transactionType" } }
This $merge stage:
Writes the
Very Important Industriesmessage to a Atlas collection namedVIP.subscription.Writes the
N. E. Buddymessage to a Atlas collection namedemployee.requisition.Writes the
Khan Traktormessage to a Atlas collection namedcontractor.billableHours.
You can only use dynamic expressions that evaluate to strings. For more information on dynamic expressions, see expression operators.
If you specify a database or collection with a dynamic expression, but Atlas Stream Processing cannot evaluate the expression for a given message, Atlas Stream Processing sends that message to the dead letter queue if configured and processes subsequent messages. If there is no dead letter queue configured, then Atlas Stream Processing skips the message completely and processes subsequent messages.
Saving Data from Kafka Topics
To save streaming data from multiple Apache Kafka Topics into collections
in your Atlas cluster, use the $merge stage with
the $source stage. The $source stage
specifies the topics from which to read data. The $merge
stage writes the data to the target collection.
Use the following syntax:
{ "$source": { "connectionName": "<registered-kafka-connection>", "topic": [ "<topic-name-1>", "<topic-name-2>", ... ] } }, { "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> } }, ... }
Examples
Basic Example
A streaming data source generates detailed weather reports from various locations, conformant to the schema of the Sample Weather Dataset. The following aggregation has three stages:
The
$sourcestage establishes a connection with the Apache Kafka broker collecting these reports in a topic namedmy_weatherdata, exposing each record as it is ingested to the subsequent aggregation stages. This stage also overrides the name of the timestamp field it projects, setting it toingestionTime.The
$matchstage excludes documents that have adewPoint.valueof less than or equal to5.0and passes along the documents withdewPoint.valuegreater than5.0to the next stage.The
$mergestage writes the output to an Atlas collection namedstreamin thesample_weatherstreamdatabase. If no such database or collection exist, Atlas creates them.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
To view the documents in the resulting sample_weatherstream.stream
collection, connect to your Atlas cluster and run the following command:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
Note
The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.
Replicate Change Stream Events
You can use the $merge.whenMatched and $merge.whenNotMatched
parameters to replicate the effects of Change Stream events according
to their operation type.
The following aggregation has four stages:
The
$sourcestage establishes a connection to thedb1.coll1collection on an Atlas cluster over theatlas1connection.The
$addFieldsstage enriches the ingested documents with afullDocument._isDeletefield set to the value of an equality check between the"$operationTypevalue of each document and"delete". This equality evaluates to a boolean.The
$replaceRootstage replaces the document with the value of the enriched$fullDocumentfield.The
$mergestage writes todb1.coll1over theatlas2connection, performing two checks on each document:First, the
whenMatchedfield checks if the document matches an existing document indb1.coll1collection by_id, the default match field sinceonis not explicitly set. If it does andfullDocument._isDeleteis set totrue, then Atlas deletes the matching document. If it does match andfullDocument._isDeleteis set tofalse, then Atlas replaces the matching document with the new one from the streaming data source.Second, if Atlas Stream Processing finds no such matching document and
fullDocument._isDeleteis true, Atlas discards the document instead of writing it to the collection. If there is no such matching document andfullDocument._isDeleteis false, Atlas inserts the document from the streaming data source into the collection.
{ $source: { connectionName: “atlas1”, db: “db1”, coll: “coll1”, fullDocument: “required” } }, { $addFields: { “fullDocument._isDelete”: { $eq: [ “$operationType”, “delete” ] } } }, { $replaceRoot: { newRoot: “$fullDocument” } }, { $merge: { into: { connectionName: “atlas2”, db: “db1”, coll: “coll1” }, whenMatched: { $cond: { if: “$_isDelete”, then: “delete”, else: “replace” } }, whenNotMatched: { $cond: { if: “$_isDelete”, then: “discard”, else: “insert” } }, } }