Потоки изменений в API Azure Cosmos DB для MongoDB

ПРИМЕНИМО К: Mongodb

Поддержка канала изменений в API Azure Cosmos DB для MongoDB доступна с помощью API потоков изменений. С помощью API потоков изменений приложения могут получать изменения, внесенные в коллекцию или элементы в пределах одного сегмента. На основе полученных результатов можно будет выполнять дальнейшие действия. Изменения элементов в коллекции фиксируются в порядке времени их изменения, и в каждом ключе сегмента обязательно соблюдается порядок сортировки.

Примечание

Чтобы использовать потоки изменений, создайте API Azure Cosmos DB для учетной записи MongoDB с версией сервера 3.6 и выше. При выполнении примеров потока изменений в более ранней версии может появиться следующее сообщение об ошибке: Unrecognized pipeline stage name: $changeStream (Неопознанное имя этапа конвейера: $changeStream).

Примеры

В следующем примере показано, как получить потоки изменений для всех элементов в коллекции. В этом примере создается курсор для просмотра элементов при их вставке, обновлении или замене. Для получения потоков изменений требуются этап $match, этап $project и параметр fullDocument. Наблюдение за операциями удаления с помощью потоков изменений сейчас не поддерживается. В качестве обходного решения в удаляемые элементы можно добавить программный маркер. Например, вы можете добавить атрибут в элемент под названием "deleted". Когда вы захотите удалить элемент, вы можете установить "deleted" (удален) в true и установить TTL для элемента. Так как обновление "deleted" до true является изменением, оно будет отображено в потоке изменений.

var cursor = db.coll.watch(
    [
        { $match: { "operationType": { $in: ["insert", "update", "replace"] } } },
        { $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1 } }
    ],
    { fullDocument: "updateLookup" });

while (!cursor.isExhausted()) {
    if (cursor.hasNext()) {
        printjson(cursor.next());
    }
}

Изменения в пределах одного сегмента

В следующем примере показано, как получить изменения элементов в пределах одного сегмента. В этом примере показано получение изменений элементов, ключ сегмента которых равен "a", а значение ключа сегмента равно "1". Допускается ситуация, когда разные клиенты могут одновременно считывать изменения из разных сегментов.

var cursor = db.coll.watch(
    [
        { 
            $match: { 
                $and: [
                    { "fullDocument.a": 1 }, 
                    { "operationType": { $in: ["insert", "update", "replace"] } }
                ]
            }
        },
        { $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1} }
    ],
    { fullDocument: "updateLookup" });

Масштабирование потоков изменений

При использовании потоков изменений в большом масштабе рекомендуется равномерно распределить нагрузку. Используйте пользовательскую команду GetChangeStreamTokens, чтобы распределить нагрузку между физическими сегментами и секциями.

Текущие ограничения

При использовании потоков изменений действуют следующие ограничения.

  • Свойства operationType и updateDescription пока не поддерживаются в выходном документе.
  • В настоящее время поддерживаются типы операций insert, update и replace. При этом операция удаления или другие события пока не поддерживаются.

В связи с этими ограничениями этап $match, этап $project и параметры fullDocument являются обязательными, как показано в предыдущих примерах.

В отличие от канала изменений в API Azure Cosmos DB для NoSQL, нет отдельной библиотеки обработчика канала изменений для использования потоков изменений или необходимости в контейнере аренд. В настоящее время поддержка триггеров Функций Azure для обработки потоков изменений отсутствует.

Обработка ошибок

При использовании потоков изменений поддерживаются следующие коды ошибок и сообщения.

  • Код ошибки HTTP: 16500 — когда поток изменений регулируется, он возвращает пустую страницу.

  • NamespaceNotFound (OperationType Invalidate) — при запуске потока изменений в несуществующей коллекции или при удалении коллекции возвращается ошибка NamespaceNotFound. Так как свойство operationType не может быть возвращено в выходном документе, вместо ошибки operationType Invalidate возвращается ошибка NamespaceNotFound.

Дальнейшие действия