Definition
The $source stage specifies a connection in the
Connection Registry to stream data
from. The following connection types are supported:
- Apache Kafka broker 
- MongoDB collection change stream 
- MongoDB database change stream 
- Document array 
Note
You can't use Atlas serverless instances as a
$source.
Syntax
Apache Kafka Broker
To operate on streaming data from an Apache Kafka broker, the
$source stage has the following prototype form:
{   "$source": {     "connectionName": "<registered-connection>",     "topic" : ["<source-topic>", ...],     "timeField": {       $toDate | $dateFromString: <expression>     },     "partitionIdleTimeout": {       "size": <duration-number>,       "unit": "<duration-unit>"     },     "config": {       "auto_offset_reset": "<start-event>",       "group_id": "<group-id>",       "keyFormat": "<deserialization-type>",       "keyFormatError": "<error-handling>"     },   } } 
The $source stage takes a document with the following fields:
| Field | Type | Necessity | Description | |
|---|---|---|---|---|
| 
 | string | Required | Label that identifies the connection in the Connection Registry, to ingest data from. | |
| 
 | string or array of strings | Required | Name of one or more Apache Kafka topics to stream messages from. If you want to stream messages from more than one topic, specify them in an array. | |
| 
 | document | Optional | Document that defines an authoritative timestamp for incoming messages. If you use  
 If you do not declare a  | |
| 
 | document | Optional | Document specifying the amount of time that a partition is allowed to be idle before it is ignored in watermark calculations. This field is disabled by default. To handle partitions that don't move forward due to idleness, set a value for this field. | |
| 
 | integer | Optional | Number specifying the duration of the partition idle timeout. | |
| 
 | string | Optional | Unit of time for the duration of the partition idle timeout. The value of  
 | |
| 
 | document | Optional | Document containing fields that override various default values. | |
| 
 | string | Optional | Specifies which event in the Apache Kafka source topic to begin
ingestion with.  
 Defaults to  | |
| 
 | string | Optional | ID of the kafka consumer group to associate with the stream processor. If omitted, Atlas Stream Processing associates the stream processing instance with an auto-generated ID in the following format: Atlas Stream Processing commits partition offsets to the Apache Kafka broker for the specified consumer group ID after a checkpoint is committed. It commits an offset when messages up through that offset are durably recorded in a checkpoint. This allows you to track the offset lag and progress of the stream processor directly from the Kafka broker consumer group metadata. | |
| 
 | string | Optional | Data type used to deserialize Apache Kafka key data. Must be one of the following values: 
 Defaults to  | |
| 
 | string | Optional | How to handle errors encountered when deserializing Apache Kafka key data. Must be one of the following values: 
 | 
Note
Atlas Stream Processing requires that documents in the source data stream be
valid json or ejson. Atlas Stream Processing sets the documents that
don't meet this requirement to your dead letter queue if you have configured one.
MongoDB Collection Change Stream
An Atlas collection change stream allows applications to access real-time data changes on a single collection. To learn how to open a change stream against a collection, see Change Streams.
To operate on streaming data from an Atlas collection change
stream, the $source stage has the following prototype form:
{   "$source": {     "connectionName": "<registered-connection>",     "timeField": {       $toDate | $dateFromString: <expression>     },     "db" : "<source-db>",     "coll" : ["<source-coll>",...],     "initialSync": {       "enable": <boolean>,       "parallelism": <integer>     },     "readPreference": "<read-preference>",     "readPreferenceTags": [       {"<key>": "<value>"},       . . .     ]     "config": {       "startAfter": <start-token> | "startAtOperationTime": <timestamp>,       "fullDocument": "<full-doc-condition>",       "fullDocumentOnly": <boolean>,       "fullDocumentBeforeChange": "<before-change-condition>",       "pipeline": [{         "<aggregation-stage>" : {           <stage-input>,           . . .         },         . . .       }],       "maxAwaitTimeMS": <time-ms>,     }   } } 
The $source stage takes a document with the following fields:
| Field | Type | Necessity | Description | 
|---|---|---|---|
| 
 | string | Conditional | Label that identifies the connection in the Connection Registry, to ingest data from. | 
| 
 | document | Optional | Document that defines an authoritative timestamp for incoming messages. If you use  
 If you do not declare a  | 
| 
 | string | Required | Name of a MongoDB database hosted on the Atlas instance
specified by  | 
| 
 | string or array of strings | Required | Name of one or more MongoDB collections hosted on the Atlas
instance specified by  | 
| 
 | document | Optional | Document containing fields pertaining to  Atlas Stream Processing  If you enable  ImportantYou can only use  | 
| 
 | boolean | Conditional | Determines whether or not to enable  | 
| 
 | integer | Optional | Determines the level of parallelism with which to process the
 | 
| 
 | document | Optional | Read preference for
 Defaults to  | 
| 
 | document | Optional | Read preference tags for
 | 
| 
 | document | Optional | Document containing fields that override various default values. | 
| 
 | token | Conditional | The change event after which the source begins reporting. This takes the form of a resume token. You can use only one of either  | 
| 
 | timestamp | Conditional | The operation time after which the source should begin reporting. You can use only one of either  Accepts MongoDB Extended JSON  | 
| 
 | string | Conditional | Setting that controls whether a change stream source should return a full document, or only the changes when an update occurs. Must be one of the following: 
 If you do not specify a value for fullDocument, it defaults to
 To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection. | 
| 
 | boolean | Conditional | Setting that controls whether a change stream source returns
the entire change event document including all metadata, or
only the contents of  To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection. | 
| 
 | string | Optional | Specifies whether a change stream source should include the full document in its original "before changes" state in the output. Must be one of the following: 
 If you do not specify a value for  To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection. | 
| 
 | document | Optional | Specifies an aggregation pipeline to filter change stream output before passing it on for further processing. This pipeline must conform to the parameters described in Modify Change Stream Output. ImportantEach Change Event includes  | 
| 
 | integer | Optional | Maximum time, in milliseconds, to wait for new data changes to report to the change stream cursor before returning an empty batch. Defaults to  | 
MongoDB Database Change Stream
An Atlas database change stream allows applications to access real-time data changes on a single database. To learn how to open a change stream against a database, see Change Streams.
To operate on streaming data from an Atlas database change
stream, the $source stage has the following prototype form:
{   "$source": {     "connectionName": "<registered-connection>",     "timeField": {       $toDate | $dateFromString: <expression>     },     "db" : "<source-db>",     "config": {       "startAfter": <start-token> | "startAtOperationTime": <timestamp>,       "fullDocument": "<full-doc-condition>",       "fullDocumentOnly": <boolean>,       "fullDocumentBeforeChange": "<before-change-condition>",       "pipeline": [{         "<aggregation-stage>" : {           <stage-input>,           . . .         },         . . .       }]     },   } } 
The $source stage takes a document with the following fields:
| Field | Type | Necessity | Description | 
|---|---|---|---|
| 
 | string | Conditional | Label that identifies the connection in the Connection Registry, to ingest data from. | 
| 
 | document | Optional | Document that defines an authoritative timestamp for incoming messages. If you use  
 If you do not declare a  | 
| 
 | string | Required | Name of a MongoDB database hosted on the Atlas instance
specified by  | 
| 
 | document | Optional | Document containing fields that override various default values. | 
| 
 | token | Conditional | The change event after which the source begins reporting. This takes the form of a resume token. You can use only one of either  | 
| 
 | timestamp | Conditional | The operation time after which the source should begin reporting. You can use only one of either  Accepts MongoDB Extended JSON  | 
| 
 | string | Conditional | Setting that controls whether a change stream source should return a full document, or only the changes when an update occurs. Must be one of the following: 
 If you do not specify a value for fullDocument, it defaults to
 To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database. | 
| 
 | boolean | Conditional | Setting that controls whether a change stream source returns
the entire change event document including all metadata, or
only the contents of  To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database. | 
| 
 | string | Optional | Specifies whether a change stream source should include the full document in its original "before changes" state in the output. Must be one of the following: 
 If you do not specify a value for  To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database. | 
| 
 | document | Optional | Specifies an aggregation pipeline to filter change stream output at the point of origin. This pipeline must conform to the parameters described in Modify Change Stream Output. ImportantEach Change Event includes  | 
| 
 | integer | Optional | Maximum time, in milliseconds, to wait for new data changes to report to the change stream cursor before returning an empty batch. Defaults to  | 
MongoDB Cluster-wide Change Stream Source
To operate on streaming data from an entire Atlas cluster change
stream, the $source stage has the following prototype form:
{   "$source": {     "connectionName": "<registered-connection>",     "timeField": {       $toDate | $dateFromString: <expression>     },     "config": {       "startAfter": <start-token> | "startAtOperationTime": <timestamp>,       "fullDocument": "<full-doc-condition>",       "fullDocumentOnly": <boolean>,       "fullDocumentBeforeChange": "<before-change-condition>",       "pipeline": [{         "<aggregation-stage>" : {           <stage-input>,           . . .         },         . . .       }]     },   } } 
The $source stage takes a document with the following fields:
| Field | Type | Necessity | Description | 
|---|---|---|---|
| 
 | string | Conditional | Label that identifies the connection in the Connection Registry, to ingest data from. | 
| 
 | document | Optional | Document that defines an authoritative timestamp for incoming messages. If you use  
 If you do not declare a  | 
| 
 | document | Optional | Document containing fields that override various default values. | 
| 
 | token | Conditional | The change event after which the source begins reporting. This takes the form of a resume token. You can use only one of either  | 
| 
 | date | timestamp | Conditional | The operation time after which the source should begin reporting. You can use only one of either  Accepts MongoDB Extended JSON  | 
| 
 | string | Conditional | Setting that controls whether a change stream source should return a full document, or only the changes when an update occurs. Must be one of the following: 
 If you do not specify a value for fullDocument, it defaults to
 To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database. | 
| 
 | boolean | Conditional | Setting that controls whether a change stream source returns
the entire change event document including all metadata, or
only the contents of  To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database. | 
| 
 | string | Optional | Specifies whether a change stream source should include the full document in its original "before changes" state in the output. Must be one of the following: 
 If you do not specify a value for  To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database. | 
| 
 | document | Optional | Specifies an aggregation pipeline to filter change stream output at the point of origin. This pipeline must conform to the parameters described in Modify Change Stream Output. Note that Atlas Stream Processing expects to receive the  | 
| 
 | integer | Optional | Maximum time, in milliseconds, to wait for new data changes to report to the change stream cursor before returning an empty batch. Defaults to  | 
Document Array
To operate on an array of documents, the $source stage has the
following prototype form:
{   "$source": {     "timeField": {       $toDate | $dateFromString: <expression>     },     "documents" : [{source-doc},...] | <expression>   } } 
The $source stage takes a document with the following fields:
| Field | Type | Necessity | Description | 
|---|---|---|---|
| 
 | document | Optional | Document that defines an authoritative timestamp for incoming messages. If you use  
 If you do not declare a  | 
| 
 | array | Conditional | Array of documents to use as a streaming data source. The
value of this field can either be an array of objects or an
expression that evaluates to an array of objects. Do not use this
field when using the  | 
Behavior
$source must be the first stage of any pipeline it appears
in. You can use only one $source stage per pipeline.
Examples
Kafka Example
A streaming data source generates detailed weather reports from various locations, conformant to the schema of the Sample Weather Dataset. The following aggregation has three stages:
- The - $sourcestage establishes a connection with the Apache Kafka broker collecting these reports in a topic named- my_weatherdata, exposing each record as it is ingested to the subsequent aggregation stages. This stage also overrides the name of the timestamp field it projects, setting it to- ingestionTime.
- The - $matchstage excludes documents that have a- dewPoint.valueof less than or equal to- 5.0and passes along the documents with- dewPoint.valuegreater than- 5.0to the next stage.
- The - $mergestage writes the output to an Atlas collection named- streamin the- sample_weatherstreamdatabase. If no such database or collection exist, Atlas creates them.
{   '$source': {     connectionName: 'sample_weatherdata',     topic: 'my_weatherdata'   } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, {   '$merge': {     into: {       connectionName: 'weatherStreamOutput',       db: 'sample_weatherstream',       coll: 'stream'     }   } } 
To view the documents in the resulting sample_weatherstream.stream
collection, connect to your Atlas cluster and run the following command:
db.getSiblingDB("sample_weatherstream").stream.find() 
{   _id: ObjectId('66ad2edfd4fcac13b1a28ce3'),   airTemperature: { quality: '1', value: 27.7 },   atmosphericPressureChange: {     quantity24Hours: { quality: '9', value: 99.9 },     quantity3Hours: { quality: '1' },     tendency: { code: '1', quality: '1' }   },   atmosphericPressureObservation: {     altimeterSetting: { quality: '1', value: 1015.9 },     stationPressure: { quality: '1', value: 1021.9 }   },   callLetters: 'CGDS',   dataSource: '4',   dewPoint: { quality: '9', value: 25.7 },   elevation: 9999,   extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },   ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),   liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },   pastWeatherObservationManual: {     atmosphericCondition: { quality: '1', value: '8' },     period: { quality: '9', value: 3 }   },   position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },   precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },   presentWeatherObservationManual: { condition: '53', quality: '1' },   pressure: { quality: '1', value: 1016.3 },   qualityControlProcess: 'V020',   seaSurfaceTemperature: { quality: '9', value: 27.6 },   sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],   skyCondition: {     cavok: 'N',     ceilingHeight: { determination: 'C', quality: '1', value: 6900 }   },   skyConditionObservation: {     highCloudGenus: { quality: '1', value: '05' },     lowCloudGenus: { quality: '9', value: '03' },     lowestCloudBaseHeight: { quality: '9', value: 150 },     lowestCloudCoverage: { quality: '1', value: '05' },     midCloudGenus: { quality: '9', value: '08' },     totalCoverage: { opaque: '99', quality: '1', value: '06' }   },   skyCoverLayer: {     baseHeight: { quality: '9', value: 99999 },     cloudType: { quality: '9', value: '05' },     coverage: { quality: '1', value: '04' }   },   st: 'x+35700-027900',   type: 'SAO',   visibility: {     distance: { quality: '1', value: 4000 },     variability: { quality: '1', value: 'N' }   },   waveMeasurement: {     method: 'I',     seaState: { code: '99', quality: '9' },     waves: { height: 99.9, period: 14, quality: '9' }   },   wind: {     direction: { angle: 280, quality: '9' },     speed: { quality: '1', rate: 30.3 },     type: '9'   } } 
Note
The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.
Change Stream Example
The following aggregation ingests data from the
cluster0-collection source, which connects to an Atlas
cluster loaded with the sample dataset.  To learn
how to create a stream processing instance and add a connection to an
Atlas cluster to the connection registry, see
Get Started with Atlas Stream Processing. This aggregation runs two stages to open a
change stream and record changes to the data collection in the
sample_weatherdata database:
- The - $sourcestage connects to the- cluster0-collectionsource and opens a change stream against the- datacollection in the- sample_weatherdatadatabase.
- The - $mergestage writes the filtered change stream documents to an Atlas collection named- data_changesin the- sample_weatherdatadatabase. If no such collection exists, Atlas creates it.
{   $source: {     connectionName: "cluster0-connection",     db : "sample_weatherdata",     coll : "data"   },   $merge: {     into: {       connectionName: "cluster0-connection",       db: "sample_weatherdata",       coll: "data_changes"     }   } } 
The following mongosh command deletes a data document:
db.getSiblingDB("sample_weatherdata").data.deleteOne(    { _id: ObjectId("5553a99ae4b02cf715120e4b") } ) 
After the data document is deleted, the stream processor writes the change stream event
document to the sample_weatherdata.data_changes collection.
To view the documents in the resulting sample_weatherdata.data_changes
collection, use mongosh to connect to your Atlas cluster and run the following command:
db.getSiblingDB("sample_weatherdata").data_changes.find() 
[   {     _id: {       _data: '8267A3D7A3000000012B042C0100296E5A1004800951B8EDE4430AB5C1B254BB3C96D6463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F696400645553A99AE4B02CF715120E4B000004'     },     clusterTime: Timestamp({ t: 1738790819, i: 1 }),     documentKey: { _id: ObjectId('5553a99ae4b02cf715120e4b') },     ns: { db: 'sample_weatherdata', coll: 'data' },     operationType: 'delete',     wallTime: ISODate('2025-02-05T21:26:59.313Z')   } ]