適用於:
MongoDB
Azure Cosmos DB 適用於 MongoDB 的 API 中的變更摘要支援,是透過使用變更資料流 API 來提供。 透過使用變更資料流 API,您的應用程式可以取得對集合或單一分區中項目所做的變更。 您稍後可以根據結果採取進一步動作。 系統會依對集合中項目所做變更的修改時間加以擷取,且會針對每個分區索引鍵保證排序次序。
注意
若要使用變更資料流,請使用伺服器 3.6 版或更新版本來建立 Azure Cosmos DB 適用於 MongoDB 的 API 帳戶。 如果您執行針對較早版本的變更資料流程範例,您可能會看到「無法辨識的管線階段名稱:$changeStream」錯誤。
範例
下列範例顯示如何在集合中的所有項目上取得變更資料流。 此範例會建立資料指標,以便在插入、更新或取代項目時加以監看。 需要 $match 階段、$project 階段及 fullDocument 選項來取得變更資料流。 目前不支援使用變更資料流來監看刪除作業。 作為因應措施,您可以於正在刪除的項目上新增虛標記。 例如,您可以在項目中新增名為 "deleted" (已刪除) 的屬性。當您想要刪除項目時,您可以將項目上的 "deleted" 設為 true 並設定 TTL。 由於系統會把將 "deleted" 更新為 true 的動作視為更新,此變更將會顯示在變更資料流中。
var cursor = db.coll.watch(
[
{ $match: { "operationType": { $in: ["insert", "update", "replace"] } } },
{ $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1 } }
],
{ fullDocument: "updateLookup" });
while (!cursor.isExhausted()) {
if (cursor.hasNext()) {
printjson(cursor.next());
}
}
var collection = new MongoClient("<connection-string>")
.GetDatabase("<database-name>")
.GetCollection<BsonDocument>("<collection-name>");
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change =>
change.OperationType == ChangeStreamOperationType.Insert ||
change.OperationType == ChangeStreamOperationType.Update ||
change.OperationType == ChangeStreamOperationType.Replace
)
.AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
@"{
$project: {
'_id': 1,
'fullDocument': 1,
'ns': 1,
'documentKey': 1
}
}"
);
ChangeStreamOptions options = new ()
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
};
using IChangeStreamCursor<BsonDocument> enumerator = collection.Watch(
pipeline,
options
);
Console.WriteLine("Waiting for changes...");
while (enumerator.MoveNext())
{
IEnumerable<BsonDocument> changes = enumerator.Current;
foreach(BsonDocument change in changes)
{
Console.WriteLine(change);
}
}
下列範例示範如何在 JAVA 中使用變更資料流功能,如需完整的範例,請參閱此 GitHub 存放庫。 此範例也會示範如何使用 resumeAfter 方法來搜尋上次讀取以來的所有變更。
Bson match = Aggregates.match(Filters.in("operationType", asList("update", "replace", "insert")));
// Pick the field you are most interested in
Bson project = Aggregates.project(fields(include("_id", "ns", "documentKey", "fullDocument")));
// This variable is for second example
BsonDocument resumeToken = null;
// Now time to build the pipeline
List<Bson> pipeline = Arrays.asList(match, project);
//#1 Simple example to seek changes
// Create cursor with update_lookup
MongoChangeStreamCursor<ChangeStreamDocument<org.bson.Document>> cursor = collection.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
Document document = new Document("name", "doc-in-step-1-" + Math.random());
collection.insertOne(document);
while (cursor.hasNext()) {
// There you go, we got the change document.
ChangeStreamDocument<Document> csDoc = cursor.next();
// Let is pick the token which will help us resuming
// You can save this token in any persistent storage and retrieve it later
resumeToken = csDoc.getResumeToken();
//Printing the token
System.out.println(resumeToken);
//Printing the document.
System.out.println(csDoc.getFullDocument());
//This break is intentional but in real project feel free to remove it.
break;
}
cursor.close();
單一分區中的變更
下列範例顯示如何取得對單一分區內的項目所做的變更。 此範例會取得分區索引鍵等於 "a" 且分區索引鍵值等於 "1" 的項目變更。 您可以使用不同的用戶端以平行方式讀取來自不同分區的變更。
var cursor = db.coll.watch(
[
{
$match: {
$and: [
{ "fullDocument.a": 1 },
{ "operationType": { $in: ["insert", "update", "replace"] } }
]
}
},
{ $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1} }
],
{ fullDocument: "updateLookup" });
調整變更資料流大小
當大規模使用變更資料流時,最好將負載平均分散。 運用 GetChangeStreamTokens 自訂命令可將負載分散到多個實體分區/分割區。
目前的限制
使用變更資料流時有下列限制︰
- 輸出文件中尚未支援
operationType 和 updateDescription 屬性。
- 目前支援
insert、update 及 replace 作業類型。 但目前尚未支援刪除作業或其他事件。
基於這些限制,需要 $match 階段、$project 階段,以及 fullDocument 選項,如先前範例所示。
和 Azure Cosmos DB 的 API for NoSQL 中的變更摘要不同,並沒有個別的變更摘要處理器程式庫來取用變更資料流或是需要租用容器。 目前並沒有支援使用 Azure Functions 觸發程序來處理變更資料流。
錯誤處理
使用變更資料流時,支援下列錯誤碼和訊息:
HTTP 錯誤碼 16500 - 當系統對變更資料流進行節流時,其會傳回空白頁面。
NamespaceNotFound (OperationType 失效) - 如果您在不存在的集合上,或是在集合已卸除時執行變更資料流,系統便會傳回 NamespaceNotFound 錯誤。 因為 operationType 屬性無法在輸出文件中傳回,系統不會傳回 operationType Invalidate 錯誤,而是會改為傳回 NamespaceNotFound 錯誤。
下一步