Поделиться через


Прием данных из Azure Cosmos DB в Azure Data Explorer

Azure Data Explorer поддерживает прием данных из Azure Cosmos DB для NoSql с помощью канала изменений. Подключение к данным канала изменений Cosmos DB — это конвейер приема данных, который прослушивает канал изменений Cosmos DB и передает данные в таблицу Data Explorer. Канал изменений прослушивает новые и обновленные документы, но не регистрирует удаления. Общие сведения о приеме данных в обозревателе данных Azure см. в разделе Обзор приема данных Azure Data Explorer.

Каждое подключение к данным прослушивает определенный контейнер Cosmos DB и отправляет данные в указанную таблицу (несколько подключений могут выполнять прием в одной таблице). Метод приема поддерживает прием потоковой передачи (если включен) и прием в очереди.

В этой статье вы узнаете, как настроить подключение канала изменений Cosmos DB для приема данных в Azure Data Explorer с помощью управляемого удостоверения системы. Ознакомьтесь с рекомендациями перед началом работы.

Чтобы настроить соединитель, выполните следующие действия.

Шаг 1. Выбор таблицы Azure Data Explorer и настройка сопоставления таблиц

Шаг 2. Создание подключения к данным Cosmos DB

Шаг 3. Проверка подключения к данным

Необходимые компоненты

Шаг 1. Выбор таблицы Azure Data Explorer и настройка сопоставления таблиц

Перед созданием подключения к данным создайте таблицу, в которой будут храниться полученные данные и применять сопоставление, соответствующее схеме в исходном контейнере Cosmos DB. Если вашему сценарию требуется более простое сопоставление полей, можно использовать политики обновления для преобразования и сопоставления данных , полученных из канала изменений.

Ниже показан пример схемы элемента в контейнере Cosmos DB:

{
    "id": "17313a67-362b-494f-b948-e2a8e95e237e",
    "name": "Cousteau",
    "_rid": "pL0MAJ0Plo0CAAAAAAAAAA==",
    "_self": "dbs/pL0MAA==/colls/pL0MAJ0Plo0=/docs/pL0MAJ0Plo0CAAAAAAAAAA==/",
    "_etag": "\"000037fc-0000-0700-0000-626a44110000\"",
    "_attachments": "attachments/",
    "_ts": 1651131409
}

Чтобы создать таблицу и применить сопоставление таблиц, выполните следующие действия.

  1. В веб-интерфейсе Azure Data Explorer в меню навигации слева выберите "Запрос", а затем выберите базу данных, в которой нужно создать таблицу.

  2. Выполните следующую команду, чтобы создать таблицу с именем TestTable.

    .create table TestTable(Id:string, Name:string, _ts:long, _timestamp:datetime)
    
  3. Выполните следующую команду, чтобы создать сопоставление таблиц.

    Команда сопоставляет пользовательские свойства из документа JSON Cosmos DB со столбцами в таблице TestTable следующим образом:

    Свойство Cosmos DB Столбец таблицы Преобразование
    id Артикул нет
    name Имя. нет
    _ts _ts нет
    _ts _timestamp Используется DateTimeFromUnixSeconds для преобразования _ts (секунды UNIX) в _timestamp (datetime))

    Примечание.

    Рекомендуется использовать следующие столбцы метки времени:

    • _ts. Используйте этот столбец для согласования данных с Cosmos DB.
    • _timestamp. Используйте этот столбец для выполнения эффективных фильтров времени в запросах Kusto. Дополнительные сведения см. в статье "Рекомендации по запросу".
    .create table TestTable ingestion json mapping "DocumentMapping"
    ```
    [
        {"column":"Id","path":"$.id"},
        {"column":"Name","path":"$.name"},
        {"column":"_ts","path":"$._ts"},
        {"column":"_timestamp","path":"$._ts", "transform":"DateTimeFromUnixSeconds"}
    ]
    ```
    

Преобразование и сопоставление данных с политиками обновления

Если вашему сценарию требуется более простое сопоставление полей, можно использовать политики обновления для преобразования и сопоставления данных, полученных из канала изменений.

Политики обновления — это способ преобразования данных по мере приема в таблицу. Они написаны в язык запросов Kusto и выполняются в конвейере приема. Их можно использовать для преобразования данных из канала изменений Cosmos DB, например в следующих сценариях:

  • Документы содержат массивы, которые будут проще запрашивать, если они преобразуются в нескольких строках с помощью mv-expand оператора.
  • Вы хотите отфильтровать документы. Например, можно отфильтровать документы по типу where с помощью оператора.
  • У вас есть сложная логика, которая не может быть представлена в сопоставлении таблиц.

Сведения о создании политик обновления и управлении ими см. в обзоре политики обновления.

Шаг 2. Создание подключения к данным Cosmos DB

Для создания соединителя данных можно использовать следующие методы:

  1. В портал Azure перейдите на страницу обзора кластера и перейдите на вкладку "Начало работы".

  2. На плитке приема данных выберите "Создать подключение>к данным Cosmos DB".

    Снимок экрана: вкладка

  3. В области создания подключения к данным Cosmos DB заполните форму сведениями в таблице:

    Снимок экрана: панель подключения к данным с полями формы со значениями.

    Поле Description
    Имя базы данных Выберите базу данных Azure Data Explorer, в которую требуется принять данные.
    Имя подключения к данным Укажите имя подключения к данным.
    Подписка Выберите подписку, содержащую учетную запись NoSQL Cosmos DB.
    Учетная запись Cosmos DB Выберите учетную запись Cosmos DB, из которой требуется принять данные.
    База данных SQL Выберите базу данных Cosmos DB, из которой требуется принять данные.
    Контейнер SQL Выберите контейнер Cosmos DB, из которого требуется принять данные.
    Имя таблицы Укажите имя таблицы Azure Data Explorer, в которую требуется принять данные.
    Имя сопоставления При необходимости укажите имя сопоставления, используемое для подключения к данным.
  4. При необходимости в разделе "Дополнительные параметры " сделайте следующее:

    1. Укажите дату начала получения события. Это время, с которого соединитель начнет прием данных. Если не указать время, соединитель начнет прием данных с момента создания подключения к данным. Рекомендуемый формат даты — стандарт UTC ISO 8601, указанный следующим образом: yyyy-MM-ddTHH:mm:ss.fffffffZ

    2. Выберите назначаемое пользователем и выберите удостоверение. По умолчанию управляемое удостоверение, назначаемое системой, используется подключением. При необходимости можно использовать удостоверение, назначаемое пользователем.

      Снимок экрана: панель подключения к данным с параметрами

  5. Нажмите кнопку "Создать", чтобы обвести подключение к данным.

Шаг 3. Проверка подключения к данным

  1. В контейнер Cosmos DB вставьте следующий документ:

    {
        "name":"Cousteau"
    }
    
  2. В веб-интерфейсе Azure Data Explorer выполните следующий запрос:

    TestTable
    

    Набор результатов должен выглядеть следующим образом:

    Снимок экрана: область результатов с документом приема.

Примечание.

Azure Data Explorer имеет политику агрегирования (пакетной обработки) для приема данных в очереди, предназначенную для оптимизации процесса приема данных. Политика пакетной обработки по умолчанию настроена для запечатывания пакета после того, как одно из следующих условий соответствует пакету: максимальное время задержки в 5 минут, общий размер одного ГБ или 1000 больших двоичных объектов. В связи с этим может возникнуть задержка. Дополнительные сведения см. на странице о политике пакетной обработки. Чтобы уменьшить задержку, настройте таблицу для поддержки потоковой передачи. См. статью Политика потоковой передачи.

Рекомендации

Следующие рекомендации относятся к каналу изменений Cosmos DB:

  • Веб-канал изменений не предоставляет события удаления .

    Канал изменений Cosmos DB содержит только новые и обновленные документы. Если вам нужно знать об удаленных документах, можно настроить канал с помощью мягкого маркера, чтобы пометить документ Cosmos DB как удаленный. Свойство добавляется для обновления событий, указывающих, удален ли документ. Затем можно использовать where оператор в запросах, чтобы отфильтровать их.

    Например, при сопоставлении удаленного свойства с столбцом таблицы с именем IsDeleted можно отфильтровать удаленные документы со следующим запросом:

    TestTable
    | where not(IsDeleted)
    
  • Канал изменений предоставляет только последнее обновление документа.

    Чтобы понять последствия второго рассмотрения, изучите следующий сценарий:

    Контейнер Cosmos DB содержит документы A и B. Изменения свойства, называемого foo , отображаются в следующей таблице:

    Код документа Свойство foo Мероприятие Метка времени документа (_ts)
    а Красный Создание 10
    Б Синий Создание 20
    а Orange Обновить 30
    Объект Розовый Update 40
    Б Фиолетовый Обновить 50
    а Кармин Обновить 50
    Б NeonBlue Обновить 70

    API канала изменений опрашивает соединитель данных через регулярные интервалы, обычно каждые несколько секунд. Каждый опрос содержит изменения, произошедшие в контейнере между вызовами, но только последнюю версию изменений на документ.

    Чтобы проиллюстрировать проблему, рассмотрим последовательность вызовов API с метками времени 15, 35, 55 и 75, как показано в следующей таблице:

    Метка времени вызова API Код документа Свойство foo Метка времени документа (_ts)
    15 а Красный 10
    35 Б Синий 20
    35 а Orange 30
    55 Б Фиолетовый 50
    55 а Кармин 60
    75 Б NeonBlue 70

    Сравнивая результаты API со списком изменений, внесенных в документ Cosmos DB, вы заметите, что они не соответствуют. Событие обновления для документа A, выделенное в таблице изменений в метке времени 40, не отображается в результатах вызова API.

    Чтобы понять, почему событие не отображается, мы рассмотрим изменения в документе A между вызовами API в метки времени 35 и 55. Между этими двумя вызовами документ A изменился дважды следующим образом:

    Код документа Свойство foo Мероприятие Метка времени документа (_ts)
    а Розовый Обновить 40
    а Кармин Обновить 50

    При вызове API в метку времени 55 API-канал изменений возвращает последнюю версию документа. В этом случае последняя версия документа A — это обновление в метке времени 50, которое является обновлением для свойства foo от Pink до Carmine.

    Из-за этого сценария соединитель данных может пропустить некоторые изменения промежуточного документа. Например, некоторые события могут быть пропущены, если служба подключения к данным отключена в течение нескольких минут, или если частота изменений документа выше частоты опроса API. Однако фиксируется последнее состояние каждого документа.

  • Удаление и повторное создание контейнера Cosmos DB не поддерживается

    Azure Data Explorer отслеживает канал изменений, создавая контрольную точку своей "позиции" в канале. Это делается с помощью маркера продолжения на каждом физическом разделе контейнера. При удалении или повторном создании контейнера эти маркеры продолжения становятся недействительными и не сбрасываются; необходимо удалить и заново создать подключение к данным.

Оценка затрат

Сколько при использовании подключения к данным Cosmos DB влияет на использование единиц запросов контейнера Cosmos DB?

Соединитель вызывает API канала изменений Cosmos DB для каждой физической секции контейнера до одного раза в секунду. Следующие затраты связаны с этими вызовами:

Себестоимость Description
Фиксированные затраты Фиксированные затраты составляют около 2 ЕЗ на каждую физическую секцию каждую секунду.
Переменные затраты Переменные затраты составляет около 2% единиц запросов, используемых для записи документов, хотя это может отличаться в зависимости от вашего сценария. Например, если вы напишете 100 документов в контейнер Cosmos DB, стоимость написания этих документов составляет 1000 ЕЗ. Соответствующая стоимость использования соединителя для чтения этих документов составляет около 2 % затрат на их запись, примерно 20 единиц ЕЗ.