Change streams in Azure Cosmos DB’s API for MongoDB
APPLIES TO: MongoDB
Change feed support in Azure Cosmos DB’s API for MongoDB is available by using the change streams API. By using the change streams API, your applications can get the changes made to the collection or to the items in a single shard. Later you can take further actions based on the results. Changes to the items in the collection are captured in the order of their modification time and the sort order is guaranteed per shard key.
Note
To use change streams, create the Azure Cosmos DB's API for MongoDB account with server version 3.6 or higher. If you run the change stream examples against an earlier version, you might see the Unrecognized pipeline stage name: $changeStream error.
Examples
The following example shows how to get change streams on all the items in the collection. This example creates a cursor to watch items when they are inserted, updated, or replaced. The $match
stage, $project
stage, and fullDocument
option are required to get the change streams. Watching for delete operations using change streams is currently not supported. As a workaround, you can add a soft marker on the items that are being deleted. For example, you can add an attribute in the item called "deleted." When you'd like to delete the item, you can set "deleted" to true
and set a TTL on the item. Since updating "deleted" to true
is an update, this change will be visible in the change stream.
var cursor = db.coll.watch(
[
{ $match: { "operationType": { $in: ["insert", "update", "replace"] } } },
{ $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1 } }
],
{ fullDocument: "updateLookup" });
while (!cursor.isExhausted()) {
if (cursor.hasNext()) {
printjson(cursor.next());
}
}
Changes within a single shard
The following example shows how to get changes to the items within a single shard. This example gets the changes of items that have shard key equal to "a" and the shard key value equal to "1". It is possible to have different clients reading changes from different shards in parallel.
var cursor = db.coll.watch(
[
{
$match: {
$and: [
{ "fullDocument.a": 1 },
{ "operationType": { $in: ["insert", "update", "replace"] } }
]
}
},
{ $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1} }
],
{ fullDocument: "updateLookup" });
Scaling change streams
When using change streams at scale, it is best to evenly spread the load. Utilize the GetChangeStreamTokens custom command to spread the load across physical shards/partitions.
Current limitations
The following limitations are applicable when using change streams:
- The
operationType
andupdateDescription
properties are not yet supported in the output document. - The
insert
,update
, andreplace
operations types are currently supported. However, the delete operation or other events are not yet supported.
Due to these limitations, the $match stage, $project stage, and fullDocument options are required as shown in the previous examples.
Unlike the change feed in Azure Cosmos DB's API for NoSQL, there is not a separate Change Feed Processor Library to consume change streams or a need for a leases container. There is not currently support for Azure Functions triggers to process change streams.
Error handling
The following error codes and messages are supported when using change streams:
HTTP error code 16500 - When the change stream is throttled, it returns an empty page.
NamespaceNotFound (OperationType Invalidate) - If you run change stream on the collection that does not exist or if the collection is dropped, then a
NamespaceNotFound
error is returned. Because theoperationType
property can't be returned in the output document, instead of theoperationType Invalidate
error, theNamespaceNotFound
error is returned.