Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Jokes
Docs Menu
Docs Home
/ /
/ / /

$source Stage (Stream Processing)

$source

The $source stage specifies a connection in the Connection Registry to stream data from. The following connection types are supported:

Note

You can't use Atlas serverless instances as a $source.

To operate on streaming data from an Apache Kafka broker, the $source stage has the following prototype form:

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : ["<source-topic>", ...],
"timeField": {
$toDate | $dateFromString: <expression>
},
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

The $source stage takes a document with the following fields:

Field
Type
Necessity
Description

connectionName

string

Required

Label that identifies the connection in the Connection Registry, to ingest data from.

topic

string or array of strings

Required

Name of one or more Apache Kafka topics to stream messages from. If you want to stream messages from more than one topic, specify them in an array.

timeField

document

Optional

Document that defines an authoritative timestamp for incoming messages.

If you use timeField, you must define it as one of the following:

  • a $toDate expression that takes a source message field as an argument

  • a $dateFromString expression that takes a source message field as an argument.

If you do not declare a timeField, Atlas Stream Processing creates a timestamp from the message timestamp provided by the source.

partitionIdleTimeout

document

Optional

Document specifying the amount of time that a partition is allowed to be idle before it is ignored in watermark calculations.

This field is disabled by default. To handle partitions that don't move forward due to idleness, set a value for this field.

partitionIdleTimeout.size

integer

Optional

Number specifying the duration of the partition idle timeout.

partitionIdleTimeout.unit

string

Optional

Unit of time for the duration of the partition idle timeout.

The value of unit can be one of the following:

  • "ms" (millisecond)

  • "second"

  • "minute"

  • "hour"

  • "day"

config

document

Optional

Document containing fields that override various default values.

config.auto_offset_reset

string

Optional

Specifies which event in the Apache Kafka source topic to begin ingestion with. auto_offset_reset takes the following values:

  • end, latest, or largest : to begin ingestion from the latest event in the topic at the time the aggregation is initialized.

  • earliest, beginning, or smallest : to begin ingestion from the earliest event in the topic.

Defaults to latest.

config.group_id

string

Optional

ID of the kafka consumer group to associate with the stream processor. If omitted, Atlas Stream Processing associates the stream processing instance with an auto-generated ID in the following format:

asp-${streamProcessorId}-consumer

Atlas Stream Processing commits partition offsets to the Apache Kafka broker for the specified consumer group ID after a checkpoint is committed. It commits an offset when messages up through that offset are durably recorded in a checkpoint. This allows you to track the offset lag and progress of the stream processor directly from the Kafka broker consumer group metadata.

config.keyFormat

string

Optional

Data type used to deserialize Apache Kafka key data. Must be one of the following values:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

Defaults to binData.

config.keyFormatError

string

Optional

How to handle errors encountered when deserializing Apache Kafka key data. Must be one of the following values:

  • dlq, which writes the document to your Dead Letter Queue.

  • passThrough, which sends the document to the next stage without key data.

Note

Atlas Stream Processing requires that documents in the source data stream be valid json or ejson. Atlas Stream Processing sets the documents that don't meet this requirement to your dead letter queue if you have configured one.

An Atlas collection change stream allows applications to access real-time data changes on a single collection. To learn how to open a change stream against a collection, see Change Streams.

To operate on streaming data from an Atlas collection change stream, the $source stage has the following prototype form:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"initialSync": {
"enable": <boolean>,
"parallelism": <integer>
},
"readPreference": "<read-preference>",
"readPreferenceTags": [
{"<key>": "<value>"},
. . .
]
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}],
"maxAwaitTimeMS": <time-ms>,
}
}
}

The $source stage takes a document with the following fields:

Field
Type
Necessity
Description

connectionName

string

Conditional

Label that identifies the connection in the Connection Registry, to ingest data from.

timeField

document

Optional

Document that defines an authoritative timestamp for incoming messages.

If you use timeField, you must define it as one of the following:

  • a $toDate expression that takes a source message field as an argument

  • a $dateFromString expression that takes a source message field as an argument.

If you do not declare a timeField, Atlas Stream Processing creates a timestamp from the message timestamp provided by the source.

db

string

Required

Name of a MongoDB database hosted on the Atlas instance specified by connectionName. The change stream of this database acts as the streaming data source.

coll

string or array of strings

Required

Name of one or more MongoDB collections hosted on the Atlas instance specified by connectionName. The change stream of these collections act as the streaming data source. If you omit this field, your stream processor will source from a MongoDB Database Change Stream.

initialSync

document

Optional

Document containing fields pertaining to initialSync functionality.

Atlas Stream Processing initialSync allows you to ingest preexisting documents in an Atlas collection as though they were insert changeEvent documents. If you enable initialSync, when you start your stream processor, it will first ingest and process all existing documents in the collection before proceeding to ingest and process new incoming changeEvent documents. Once the initialSync is complete, it doesn't repeat.

If you enable initialSync, you can't use $hoppingWindow, $sessionWindow, or $tumblingWindow stages in your pipeline.

Important

You can only use initialSync on collections where the _id value of the incoming documents are default generated ObjectId values or ordered int/long values. All _id values must be of the same type.

initialSync.enable

boolean

Conditional

Determines whether or not to enable initialSync. If you declare an initialSync field, you must set this field.

initialSync.parallelism

integer

Optional

Determines the level of parallelism with which to process the initialSync operation. If you do not specify a value, it defaults to 1.

readPreference

document

Optional

Read preference for initialSync operations.

Defaults to primary.

readPreferenceTags

document

Optional

Read preference tags for initialSync operations.

config

document

Optional

Document containing fields that override various default values.

config.startAfter

token

Conditional

The change event after which the source begins reporting. This takes the form of a resume token.

You can use only one of either config.startAfter or config.StartAtOperationTime.

config.startAtOperationTime

timestamp

Conditional

The operation time after which the source should begin reporting.

You can use only one of either config.startAfter or config.StartAtOperationTime.

Accepts MongoDB Extended JSON $date or $timestamp values.

config.fullDocument

string

Conditional

Setting that controls whether a change stream source should return a full document, or only the changes when an update occurs. Must be one of the following:

  • updateLookup : Returns only changes on update.

  • required : Must return a full document. If a full document is unavailable, returns nothing.

  • whenAvailable : Returns a full document whenever one is available, otherwise returns changes.

If you do not specify a value for fullDocument, it defaults to updateLookup.

To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection.

config.fullDocumentOnly

boolean

Conditional

Setting that controls whether a change stream source returns the entire change event document including all metadata, or only the contents of fullDocument. If set to true, the source returns only the contents of fullDocument.

To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection.

config.fullDocumentBeforeChange

string

Optional

Specifies whether a change stream source should include the full document in its original "before changes" state in the output. Must be one of the following:

  • off : Omits the fullDocumentBeforeChange field.

  • required : Must return a full document in its before changes state. If a full document in its before changes state is unavailable, the stream processor fails.

  • whenAvailable : Returns a full document in its before changes state whenever one is available, otherwise omits the fullDocumentBeforeChange field.

If you do not specify a value for fullDocumentBeforeChange, it defaults to off.

To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection.

config.pipeline

document

Optional

Specifies an aggregation pipeline to filter change stream output before passing it on for further processing. This pipeline must conform to the parameters described in Modify Change Stream Output.

Important

Each Change Event includes wallTime and clusterTime fields. Atlas Stream Processing stages after $source expect to receive these fields as the processor ingested them. To ensure proper processing of Change Stream data, do not modify these fields in $source.config.pipeline.

config.maxAwaitTimeMS

integer

Optional

Maximum time, in milliseconds, to wait for new data changes to report to the change stream cursor before returning an empty batch.

Defaults to 1000.

An Atlas database change stream allows applications to access real-time data changes on a single database. To learn how to open a change stream against a database, see Change Streams.

To operate on streaming data from an Atlas database change stream, the $source stage has the following prototype form:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

The $source stage takes a document with the following fields:

Field
Type
Necessity
Description

connectionName

string

Conditional

Label that identifies the connection in the Connection Registry, to ingest data from.

timeField

document

Optional

Document that defines an authoritative timestamp for incoming messages.

If you use timeField, you must define it as one of the following:

  • a $toDate expression that takes a source message field as an argument

  • a $dateFromString expression that takes a source message field as an argument.

If you do not declare a timeField, Atlas Stream Processing creates a timestamp from the message timestamp provided by the source.

db

string

Required

Name of a MongoDB database hosted on the Atlas instance specified by connectionName. The change stream of this database acts as the streaming data source.

config

document

Optional

Document containing fields that override various default values.

config.startAfter

token

Conditional

The change event after which the source begins reporting. This takes the form of a resume token.

You can use only one of either config.startAfter or config.StartAtOperationTime.

config.startAtOperationTime

timestamp

Conditional

The operation time after which the source should begin reporting.

You can use only one of either config.startAfter or config.StartAtOperationTime.

Accepts MongoDB Extended JSON $date or $timestamp values.

config.fullDocument

string

Conditional

Setting that controls whether a change stream source should return a full document, or only the changes when an update occurs. Must be one of the following:

  • updateLookup : Returns only changes on update.

  • required : Must return a full document. If a full document is unavailable, returns nothing.

  • whenAvailable : Returns a full document whenever one is available, otherwise returns changes.

If you do not specify a value for fullDocument, it defaults to updateLookup.

To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database.

config.fullDocumentOnly

boolean

Conditional

Setting that controls whether a change stream source returns the entire change event document including all metadata, or only the contents of fullDocument. If set to true, the source returns only the contents of fullDocument.

To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database.

config.fullDocumentBeforeChange

string

Optional

Specifies whether a change stream source should include the full document in its original "before changes" state in the output. Must be one of the following:

  • off : Omits the fullDocumentBeforeChange field.

  • required : Must return a full document in its before changes state. If a full document in its before changes state is unavailable, the stream processor fails.

  • whenAvailable : Returns a full document in its before changes state whenever one is available, otherwise omits the fullDocumentBeforeChange field.

If you do not specify a value for fullDocumentBeforeChange, it defaults to off.

To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database.

config.pipeline

document

Optional

Specifies an aggregation pipeline to filter change stream output at the point of origin. This pipeline must conform to the parameters described in Modify Change Stream Output.

Important

Each Change Event includes wallTime and clusterTime fields. Atlas Stream Processing stages after $source expect to receive these fields as the processor ingested them. To ensure proper processing of Change Stream data, do not modify these fields in $source.config.pipeline.

config.maxAwaitTimeMS

integer

Optional

Maximum time, in milliseconds, to wait for new data changes to report to the change stream cursor before returning an empty batch.

Defaults to 1000.

To operate on streaming data from an entire Atlas cluster change stream, the $source stage has the following prototype form:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

The $source stage takes a document with the following fields:

Field
Type
Necessity
Description

connectionName

string

Conditional

Label that identifies the connection in the Connection Registry, to ingest data from.

timeField

document

Optional

Document that defines an authoritative timestamp for incoming messages.

If you use timeField, you must define it as one of the following:

  • a $toDate expression that takes a source message field as an argument

  • a $dateFromString expression that takes a source message field as an argument.

If you do not declare a timeField, Atlas Stream Processing creates a timestamp from the message timestamp provided by the source.

config

document

Optional

Document containing fields that override various default values.

config.startAfter

token

Conditional

The change event after which the source begins reporting. This takes the form of a resume token.

You can use only one of either config.startAfter or config.StartAtOperationTime.

config.startAtOperationTime

date | timestamp

Conditional

The operation time after which the source should begin reporting.

You can use only one of either config.startAfter or config.StartAtOperationTime.

Accepts MongoDB Extended JSON $date or $timestamp values.

config.fullDocument

string

Conditional

Setting that controls whether a change stream source should return a full document, or only the changes when an update occurs. Must be one of the following:

  • updateLookup : Returns only changes on update.

  • required : Must return a full document. If a full document is unavailable, returns nothing.

  • whenAvailable : Returns a full document whenever one is available, otherwise returns changes.

If you do not specify a value for fullDocument, it defaults to updateLookup.

To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database.

config.fullDocumentOnly

boolean

Conditional

Setting that controls whether a change stream source returns the entire change event document including all metadata, or only the contents of fullDocument. If set to true, the source returns only the contents of fullDocument.

To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database.

config.fullDocumentBeforeChange

string

Optional

Specifies whether a change stream source should include the full document in its original "before changes" state in the output. Must be one of the following:

  • off : Omits the fullDocumentBeforeChange field.

  • required : Must return a full document in its before changes state. If a full document in its before changes state is unavailable, the stream processor fails.

  • whenAvailable : Returns a full document in its before changes state whenever one is available, otherwise omits the fullDocumentBeforeChange field.

If you do not specify a value for fullDocumentBeforeChange, it defaults to off.

To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database.

config.pipeline

document

Optional

Specifies an aggregation pipeline to filter change stream output at the point of origin. This pipeline must conform to the parameters described in Modify Change Stream Output.

Note that Atlas Stream Processing expects to receive the wallTime and clusterTime fields from each ingested Change Event. To ensure proper processing of Change Stream data, do not modify these fields in $source.config.pipeline.

config.maxAwaitTimeMS

integer

Optional

Maximum time, in milliseconds, to wait for new data changes to report to the change stream cursor before returning an empty batch.

Defaults to 1000.

To operate on an array of documents, the $source stage has the following prototype form:

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"documents" : [{source-doc},...] | <expression>
}
}

The $source stage takes a document with the following fields:

Field
Type
Necessity
Description

timeField

document

Optional

Document that defines an authoritative timestamp for incoming messages.

If you use timeField, you must define it as one of the following:

  • a $toDate expression that takes a source message field as an argument

  • a $dateFromString expression that takes a source message field as an argument.

If you do not declare a timeField, Atlas Stream Processing creates a timestamp from the message timestamp provided by the source.

documents

array

Conditional

Array of documents to use as a streaming data source. The value of this field can either be an array of objects or an expression that evaluates to an array of objects. Do not use this field when using the connectionName field.

$source must be the first stage of any pipeline it appears in. You can use only one $source stage per pipeline.

A streaming data source generates detailed weather reports from various locations, conformant to the schema of the Sample Weather Dataset. The following aggregation has three stages:

  1. The $source stage establishes a connection with the Apache Kafka broker collecting these reports in a topic named my_weatherdata, exposing each record as it is ingested to the subsequent aggregation stages. This stage also overrides the name of the timestamp field it projects, setting it to ingestionTime.

  2. The $match stage excludes documents that have a dewPoint.value of less than or equal to 5.0 and passes along the documents with dewPoint.value greater than 5.0 to the next stage.

  3. The $merge stage writes the output to an Atlas collection named stream in the sample_weatherstream database. If no such database or collection exist, Atlas creates them.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

To view the documents in the resulting sample_weatherstream.stream collection, connect to your Atlas cluster and run the following command:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

Note

The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.

The following aggregation ingests data from the cluster0-collection source, which connects to an Atlas cluster loaded with the sample dataset. To learn how to create a stream processing instance and add a connection to an Atlas cluster to the connection registry, see Get Started with Atlas Stream Processing. This aggregation runs two stages to open a change stream and record changes to the data collection in the sample_weatherdata database:

  1. The $source stage connects to the cluster0-collection source and opens a change stream against the data collection in the sample_weatherdata database.

  2. The $merge stage writes the filtered change stream documents to an Atlas collection named data_changes in the sample_weatherdata database. If no such collection exists, Atlas creates it.

{
$source: {
connectionName: "cluster0-connection",
db : "sample_weatherdata",
coll : "data"
},
$merge: {
into: {
connectionName: "cluster0-connection",
db: "sample_weatherdata",
coll: "data_changes"
}
}
}

The following mongosh command deletes a data document:

db.getSiblingDB("sample_weatherdata").data.deleteOne(
{ _id: ObjectId("5553a99ae4b02cf715120e4b") }
)

After the data document is deleted, the stream processor writes the change stream event document to the sample_weatherdata.data_changes collection. To view the documents in the resulting sample_weatherdata.data_changes collection, use mongosh to connect to your Atlas cluster and run the following command:

db.getSiblingDB("sample_weatherdata").data_changes.find()
[
{
_id: {
_data: '8267A3D7A3000000012B042C0100296E5A1004800951B8EDE4430AB5C1B254BB3C96D6463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F696400645553A99AE4B02CF715120E4B000004'
},
clusterTime: Timestamp({ t: 1738790819, i: 1 }),
documentKey: { _id: ObjectId('5553a99ae4b02cf715120e4b') },
ns: { db: 'sample_weatherdata', coll: 'data' },
operationType: 'delete',
wallTime: ISODate('2025-02-05T21:26:59.313Z')
}
]

Back

Aggregation Stages

On this page