Выходные данные Azure Stream Analytics в Azure Cosmos DB

Azure Stream Analytics может выводить данные в формате JSON в Azure Cosmos DB. Он обеспечивает архивацию данных и запросы с низкой задержкой для неструктурированных данных JSON. В этой статье рассматриваются некоторые рекомендации по реализации этой конфигурации (Stream Analytics в Cosmos DB). Если вы не знакомы с Azure Cosmos DB, для начала работы обратитесь к документации Azure Cosmos DB.

Примечание.

  • В настоящее время Stream Analytics поддерживает подключение к Azure Cosmos DB только через API SQL. Другие API Azure Cosmos DB пока не поддерживаются. Если указать модулю Stream Analytics учетные записи Azure Cosmos DB, созданные при помощи других API, это может привести к неправильному сохранению данных.
  • Мы рекомендуем указать для задания уровень совместимости 1.2, если вы используете для вывода данных Azure Cosmos DB.

Основное об Azure Cosmos DB как о целевом объекте выходных данных

Вывод данных Stream Analytics в Azure Cosmos DB позволяет записывать результаты обработки потока данных в контейнеры Azure Cosmos DB в формате JSON. Stream Analytics не создает контейнеры в базе данных. Вам необходимо создать их заранее. После этого вы можете контролировать расходы на выставление счетов за контейнеры Azure Cosmos DB. Кроме того, можно настроить производительность, согласованность и емкость контейнеров напрямую через API Azure Cosmos DB. В следующих разделах подробно описаны некоторые параметры контейнера Azure Cosmos DB.

Настройка согласованности, доступности и задержки

Для соответствия требованиям вашего приложения служба Azure Cosmos DB дает возможность настроить базу данных и контейнеры и создать оптимальный баланс между согласованностью, доступностью, задержкой и пропускной способностью.

В зависимости от того, какие уровни согласованности чтения потребуются вашему сценарию для чтения и записи задержки, вы можете выбирать уровень согласованности в своей учетной записи базы данных. Можно повысить пропускную способность, увеличив количество единиц запроса (EЗ) для контейнера. Кроме того, Azure Cosmos DB по умолчанию активирует синхронное индексирование для каждой операции CRUD в вашем контейнере. Этот параметр полезен для управления производительностью записи и чтения в Azure Cosmos DB. Дополнительные сведения см. в статье об уровнях согласованности базы данных и запросов.

Вставка и обновление Upsert в Stream Analytics

Интеграция Stream Analytics с Azure Cosmos DB позволяет вставлять или обновлять записи в контейнере с помощью заданного столбца Идентификатор документа. Эта операция также называется upsert. Stream Analytics использует подход оптимистичного upsert. Обновления происходят, только если вставка завершается ошибкой с конфликтом идентификатора документа.

При уровне совместимости 1.0 Stream Analytics выполняет это обновление как операцию исправления, что обеспечивает частичные обновления документа. Stream Analytics добавляет новые свойства или постепенно заменяет существующее свойство. Тем не менее изменение значений свойств массива в документе JSON приводит к перезаписи всего массива. То есть массивы не объединяются.

При уровне 1.2 поведение upsert изменяется на вставку или замену документа. Подробно это поведение описано ниже, в разделе об уровне совместимости 1.2.

Если входящий документ JSON имеет существующее поле ID, это поле автоматически используется в качестве столбца Идентификатор документа в Azure Cosmos DB. Все последующие операции записи обрабатываются так же, что приводит к одной из следующих ситуаций.

  • Для уникальных идентификаторов выполняется вставка.
  • При совпадении идентификаторов и значении Идентификатор документа, выставленном по ID, выполняется upsert.
  • При совпадении идентификаторов и невыставленном значении Идентификатор документа после первого документа возникает ошибка.

Если вы хотите сохранить все документы, включая те, которые имеют дублирующийся идентификатор, переименуйте поле идентификатора в запросе (с помощью ключевого слова AS). Позвольте Azure Cosmos DB создать поле идентификатора или замените идентификатор другим значением столбца (с помощью ключевого слова AS или параметра Идентификатор документа).

Секционирование в Azure Cosmos DB

Azure Cosmos DB автоматически масштабирует секции в зависимости от рабочей нагрузки. Поэтому рекомендуется использовать неограниченные контейнеры для секционирования данных. При записи в неограниченные контейнеры Stream Analytics использует столько же параллельных модулей записи, сколько на предыдущем шаге запроса или в схеме секционирования входных данных.

Примечание.

Azure Stream Analytics поддерживает только неограниченные контейнеры с ключами секции на верхнем уровне. Например, /region поддерживается. Вложенные ключи секции (например, /region/name) не поддерживаются.

В зависимости от выбранного ключа секции может появиться следующее предупреждение:

CosmosDB Output contains multiple rows and just one row per partition key. If the output latency is higher than expected, consider choosing a partition key that contains at least several hundred records per partition key.

Важно выбрать свойство ключа секции, которое имеет множество различных значений, и это позволяет равномерно распределять рабочую нагрузку по этим значениям. Как естественный артефакт секционирования, запросы, включающие один и тот же ключ секции, ограничиваются максимальной пропускной способностью одной секции.

Размер хранилища для документов, относящихся к одному значению ключа секции, ограничен 20 ГБ (а ограничение размера физической секции составляет 50 ГБ). Идеальный ключ секции — это тот, который часто отображается в качестве фильтра в запросах и имеет достаточное карта inality, чтобы обеспечить масштабируемость решения.

Ключи секций, используемые для запросов Stream Analytics и Azure Cosmos DB, не должны быть идентичными. Полностью параллельные топологии рекомендуют использовать ключ секции входных данных, так как ключ секции запроса Stream Analytics, PartitionIdно это может быть не рекомендуется для ключа секции контейнера Azure Cosmos DB.

Ключ секции также является границей для транзакций в хранимых процедурах и триггерах для Azure Cosmos DB. Выбирайте ключ секции так, чтобы документы, которые вместе появляются в транзакциях, использовали одно и то же значение ключа секции. Статья Секционирование в Azure Cosmos DB содержит дополнительные сведения о выборе ключа секции.

Для фиксированных контейнеров Azure Cosmos DB после их заполнения Stream Analytics не позволяет увеличить или уменьшить масштаб. Они имеют верхний предел в 10 ГБ и 10 000 ЕЗ/с для пропускной способности. Чтобы перенести данные из фиксированного контейнера в неограниченный контейнер (например, с ключом секции и пропускной способностью не менее 1000 ЕЗ/с), используйте средство миграции данных или библиотеку канала изменений.

Возможность записи в несколько фиксированных контейнеров является устаревшей. Мы не рекомендуем использовать ее для масштабирования задания Stream Analytics.

Улучшенная пропускная способность с уровнем совместимости 1.2

При уровне совместимости 1.2 Stream Analytics поддерживает встроенную интеграцию для массовой записи в Azure Cosmos DB. Эта интеграция позволяет эффективно вести запись в Azure Cosmos DB, обеспечивая максимальную пропускную способность и эффективную обработку запросов регулирования.

Улучшенный механизм записи доступен на новом уровне совместимости из-за разницы в поведении upsert. С уровнями ниже 1.2 upsert выполняет вставку или объединение документа. При уровне 1.2 поведение upsert изменяется на вставку или замену документа.

С уровнями ниже 1.2 Stream Analytics использует пользовательскую хранимую процедуру для массовой операции upsert документов для каждого ключа секции в Azure Cosmos DB. В этом случае пакет записывается как транзакция. Даже если одна запись сталкивается с временной ошибкой (регулирование), приходится повторить весь пакет. Это поведение делает сценарии даже разумным регулирование относительно медленным.

В следующем примере показаны два идентичных задания Stream Analytics, считывающие одни и те же входные данные с концентраторов событий Azure. Оба задания Stream Analytics полностью секционированы сквозным запросом и записывают в идентичные контейнеры Azure Cosmos DB. Метрики слева относятся к заданию с уровнем совместимости 1.0. Метрики справа получены при уровне 1.2. Ключом секции контейнера Azure Cosmos DB является глобально уникальный идентификатор GUID, который поступает из входного события.

Снимок экрана: сравнение метрик Stream Analytics.

Частота входящих событий в концентраторах событий вдвое выше чем та, на прием которой настроены контейнеры Azure Cosmos DB (20 000 ЕЗ), поэтому регулирование ожидается в Azure Cosmos DB. Однако задание с уровнем 1.2 постоянно выполняет запись с более высокой пропускной способностью (выходные события в минуту), а также с меньшим средним использованием единиц потоковой передачи SU%. В вашей среде эта разница зависит от нескольких дополнительных факторов. Эти факторы включают в себя формат события, размер входного события или сообщения, ключи секций и запросы.

Снимок экрана: сравнение метрик Azure Cosmos DB.

С 1.2 Stream Analytics более интеллектуальных в использовании 100 процентов доступной пропускной способности в Azure Cosmos DB с несколькими повторами регулирования или ограничения скорости. Это поведение обеспечивает лучший интерфейс для других рабочих нагрузок, таких как запросы, выполняемые в контейнере одновременно. Если вы хотите увидеть, как Stream Analytics масштабируется с Azure Cosmos DB в качестве приемника для 1000–10 000 сообщений в секунду, попробуйте этот пример проекта Azure.

Пропускная способность вывода данных в Azure Cosmos DB идентична для уровней 1.0 и 1.1. Мы настоятельно рекомендуем использовать уровень совместимости 1.2 в Stream Analytics при работе с Azure Cosmos DB.

Параметры Azure Cosmos DB для выходных данных JSON

При настройке вывода данных Stream Analytics в Azure Cosmos DB появляется следующее окно запроса информации.

Снимок экрана: поля сведений для выходного потока Azure Cosmos DB.

Поле Description
Псевдоним выходных данных Псевдоним для ссылки на эти выходные данные в запросе Stream Analytics.
Отток подписок Подписка Azure.
Код счета Имя или универсальный код ресурса (URI) конечной точки учетной записи Azure Cosmos DB.
Ключ учетной записи Общедоступный ключ доступа к учетной записи Azure Cosmos DB.
База данных Имя базы данных Azure Cosmos DB.
Имя контейнера Имя контейнера, например MyContainer. Должен существовать один контейнер с именем MyContainer.
Код документа Необязательно. Имя столбца в выходных событиях используется как уникальный ключ, на котором должны основываться операции вставки или обновления. Если оставить его пустым, все события вставляются без параметра обновления.

После настройки вывода данных в Azure Cosmos DB можно использовать его в запросе в качестве целевого объекта предложения INTO. Когда вы используете вывод данных в Azure Cosmos DB таким образом, необходимо явно задать ключ секции.

Выходная запись должна содержать столбец с учетом регистра, названный так же, как ключ секции в Azure Cosmos DB. Для достижения большей параллелизации в запросе может потребоваться предложение PARTITION BY, использующее тот же столбец.

Вот пример запроса:

    SELECT TollBoothId, PartitionId
    INTO CosmosDBOutput
    FROM Input1 PARTITION BY PartitionId

Обработка ошибок и повторные попытки

Если временная ошибка, недоступность службы или регулирование происходит, когда Stream Analytics отправляет события в Azure Cosmos DB, Stream Analytics продолжает попытки завершить операцию неограниченно долго. Однако повторных попыток не будет при следующих ошибках:

  • Unauthorized (код ошибки HTTP 401)
  • NotFound (код ошибки HTTP 404)
  • Forbidden (код ошибки HTTP 403)
  • BadRequest (код ошибки HTTP 400)

Распространенные проблемы

  1. В коллекцию добавлено ограничение уникальности индекса, а выходные данные Stream Analytics нарушают это ограничение. Примите меры, чтобы выходные данные Stream Analytics не нарушали ограничения уникальности, или удалите эти ограничения. Дополнительные сведения см. в статье Ограничения уникальности ключей в Azure Cosmos DB.

  2. Столбец PartitionKey не существует.

  3. Столбец Id не существует.

Следующие шаги