Поделиться через


Начало работы с сбором измененных данных в аналитическом хранилище для Azure Cosmos DB

Область применения: Nosql MongoDB

Используйте сбор измененных данных (CDC) в аналитическом хранилище Azure Cosmos DB в качестве источника для Фабрика данных Azure или Azure Synapse Analytics для записи конкретных изменений в данные.

Примечание.

Обратите внимание, что связанный интерфейс службы для API Azure Cosmos DB для MongoDB еще недоступен в потоке данных. Однако вы сможете использовать конечную точку документа учетной записи с интерфейсом связанной службы Azure Cosmos DB для NoSQL в качестве рабочей области, пока связанная служба Mongo не будет поддерживаться. В связанной службе NoSQL выберите "Ввод вручную", чтобы указать сведения о учетной записи Cosmos DB и использовать конечную точку документа учетной записи (например: ) вместо конечной точки MongoDB (например: https://[your-database-account-uri].documents.azure.com:443/mongodb://[your-database-account-uri].mongo.cosmos.azure.com:10255/)

Необходимые компоненты

Включение аналитического хранилища

Сначала включите Azure Synapse Link на уровне учетной записи и включите аналитическое хранилище для контейнеров, подходящих для рабочей нагрузки.

  1. Включение Azure Synapse Link: включение Azure Synapse Link для учетной записи Azure Cosmos DB

  2. Включите аналитическое хранилище для контейнеров:

    Вариант Руководство
    Включение для конкретного нового контейнера Включение Azure Synapse Link для новых контейнеров
    Включение для конкретного существующего контейнера Включение Azure Synapse Link для существующих контейнеров

Создание целевого ресурса Azure с помощью потоков данных

Функция отслеживания измененных данных аналитического хранилища доступна через функцию потока данных Фабрика данных Azure или Azure Synapse Analytics. В этом руководстве используйте Фабрика данных Azure.

Внимание

Вы также можете использовать Azure Synapse Analytics. Сначала создайте рабочую область Azure Synapse, если у вас еще нет. В созданной рабочей области выберите вкладку "Разработка ", выберите "Добавить новый ресурс" и выберите поток данных.

  1. Создайте Фабрика данных Azure, если у вас еще нет.

    Совет

    По возможности создайте фабрику данных в том же регионе, где находится учетная запись Azure Cosmos DB.

  2. Запустите только что созданную фабрику данных.

  3. В фабрике данных выберите вкладку "Потоки данных" и выберите "Создать поток данных".

  4. Присвойте только что созданному потоку данных уникальное имя. В этом примере поток данных называется cosmoscdc.

    Screnshot нового потока данных с именем cosmoscdc.

Настройка параметров источника для контейнера аналитического хранилища

Теперь создайте и настройте источник для потоков данных из аналитического хранилища учетной записи Azure Cosmos DB.

  1. Выберите Добавить источник.

    Снимок экрана: параметр

  2. В поле имени потока вывода введите cosmos.

    Снимок экрана: именование только что созданного исходного cosmos.

  3. В разделе "Тип источника" выберите "Встроенный".

    Снимок экрана: выбор встроенного типа источника.

  4. В поле набора данных выберите Azure — Azure Cosmos DB для NoSQL.

    Снимок экрана: выбор Azure Cosmos DB для NoSQL в качестве типа набора данных.

  5. Создайте связанную службу для учетной записи с именем cosmoslinkedservice. Выберите имеющуюся учетную запись Azure Cosmos DB для NoSQL в всплывающем окне "Новая связанная служба " и нажмите кнопку "ОК". В этом примере мы выбираем уже существующую учетную запись Azure Cosmos DB для NoSQL и базу данных с именем msdocs-cosmos-sourcecosmicworks.

    Снимок экрана: диалоговое окно

  6. Выберите "Аналитический " для типа хранилища.

    Снимок экрана: аналитический параметр, выбранный для связанной службы.

  7. Перейдите на вкладку " Параметры источника".

  8. В параметрах источника выберите целевой контейнер и включите отладку потока данных. В этом примере контейнер называется products.

    Снимок экрана: выбранный исходный контейнер с именами продуктов.

  9. Выберите отладку потока данных. Во всплывающем диалоговом окне отладки потока данных сохраните параметры по умолчанию и нажмите кнопку "ОК".

    Снимок экрана: параметр переключателя для включения отладки потока данных.

  10. Вкладка " Параметры источника" также содержит другие параметры, которые вы можете включить. В этой таблице описаны следующие параметры:

Вариант Описание
Запись промежуточных обновлений Включите этот параметр, если вы хотите записать журнал изменений в элементы, включая промежуточные изменения между считываниями измененных данных.
Удаление записей Включите этот параметр для записи записей, удаленных пользователем, и примените их к приемнику. Удаление невозможно применить к Azure Data Explorer и приемникам Azure Cosmos DB.
Сбор списков TTL хранилища транзакций Включите этот параметр для записи удаленных записей транзакций Azure Cosmos DB (время в реальном времени) и применения к приемнику. TTL-deletes нельзя применять к приемникам Azure Data Explorer и Azure Cosmos DB.
Пакетная обработка в байтах Этот параметр фактически имеет гигабайты. Укажите размер в гигабайтах, если вы хотите пакетировать веб-каналы отслеживания измененных данных
Дополнительные конфигурации Дополнительные конфигурации аналитического хранилища Azure Cosmos DB и их значения. (например: spark.cosmos.allowWhiteSpaceInFieldNames -> true)

Работа с параметрами источника

При проверке любого из Capture intermediate updatesCapture Deltesпараметров и Capture Transactional store TTLs параметров процесс CDC создаст и заполняет __usr_opType поле в приемнике следующими значениями:

значение Описание Вариант
1 UPDATE Запись промежуточных обновлений
2 ВСТАВИТЬ Параметр вставки отсутствует, он включен по умолчанию
3 USER_DELETE Удаление записей
4 TTL_DELETE Сбор списков TTL хранилища транзакций

Если необходимо отличить удаленные записи TTL от документов, удаленных пользователями или приложениями, проверьте оба Capture intermediate updates варианта Capture Transactional store TTLs . Затем необходимо адаптировать процессы ИЛИ приложения ИЛИ запросы CDC для использования __usr_opType в соответствии с потребностями бизнеса.

Совет

Если для конечных потребителей требуется восстановить порядок обновлений с параметром "захват промежуточных обновлений", поле метки времени _ts системы можно использовать в качестве поля упорядочивания.

Создание и настройка параметров приемника для операций обновления и удаления

Сначала создайте простой приемник Хранилище BLOB-объектов Azure, а затем настройте приемник для фильтрации данных только для определенных операций.

  1. Создайте учетную запись Хранилище BLOB-объектов Azure и контейнер, если у вас еще нет учетной записи. В следующих примерах мы будем использовать учетную запись с именем msdocsblobstorage и контейнером с именем output.

    Совет

    Если это возможно, создайте учетную запись хранения в том же регионе, где находится учетная запись Azure Cosmos DB.

  2. Еще в Фабрика данных Azure создайте приемник для измененных данных, полученных из cosmos источника.

    Снимок экрана: добавление нового приемника, подключенного к существующему источнику.

  3. Присвойте приемнику уникальное имя. В этом примере приемник называется storage.

    Снимок экрана: именование созданного хранилища приемника.

  4. В разделе "Тип приемника" выберите "Встроенный". В поле набора данных выберите Delta.

    Снимок экрана: выбор и тип набора данных Inline Delta для приемника.

  5. Создайте связанную службу для учетной записи с помощью Хранилище BLOB-объектов Azure именованной службы storagelinkedservice. Выберите существующую учетную запись Хранилище BLOB-объектов Azure в всплывающем окне "Новая связанная служба" и нажмите кнопку "ОК". В этом примере мы выбираем уже существующую учетную запись Хранилище BLOB-объектов Azure с именемmsdocsblobstorage.

    Снимок экрана: параметры типа службы для новой связанной службы Delta.

    Снимок экрана: диалоговое окно

  6. Выберите вкладку Параметры.

  7. В параметрах задайте путь к папке имени контейнера BLOB-объектов. В этом примере имя контейнера — output.

    Снимок экрана: контейнер больших двоичных объектов с именем выходного набора в качестве целевого объекта приемника.

  8. Найдите раздел метода Update и измените выбранные параметры, чтобы разрешить только операции удаления и обновления. Кроме того, укажите ключевые столбцы в виде списка столбцов, используя поле {_rid} в качестве уникального идентификатора.

    Снимок экрана: методы обновления и ключевые столбцы, указанные для приемника.

  9. Выберите "Проверить" , чтобы убедиться, что вы не сделали никаких ошибок или упущений. Затем выберите "Опубликовать ", чтобы опубликовать поток данных.

    Снимок экрана: параметр для проверки и публикации текущего потока данных.

Планирование выполнения отслеживания измененных данных

После публикации потока данных можно добавить новый конвейер для перемещения и преобразования данных.

  1. Создание нового конвейера. Присвойте конвейеру уникальное имя. В этом примере конвейер называется cosmoscdcpipeline.

    Снимок экрана: новый параметр конвейера в разделе ресурсов.

  2. В разделе "Действия" разверните параметр перемещения и преобразования, а затем выберите поток данных.

    Снимок экрана: параметр действия потока данных в разделе действий.

  3. Присвойте действию потока данных уникальное имя. В этом примере действие называется cosmoscdcactivity.

  4. На вкладке "Параметры" выберите поток данных, созданный cosmoscdc ранее в этом руководстве. Затем выберите размер вычислительных ресурсов на основе тома данных и требуемой задержки для рабочей нагрузки.

    Снимок экрана: параметры конфигурации для потока данных и размера вычислительных ресурсов для действия.

    Совет

    Для добавочных размеров данных больше 100 ГБ рекомендуется использовать настраиваемый размер с числом ядер 32 (+16 ядер драйверов).

  5. Выберите " Добавить триггер". Запланируйте выполнение этого конвейера по курсу, который имеет смысл для рабочей нагрузки. В этом примере конвейер настраивается для выполнения каждые пять минут.

    Снимок экрана: кнопка добавления триггера для нового конвейера.

    Снимок экрана: конфигурация триггера на основе расписания, начиная с 2023 года, которая выполняется каждые пять минут.

    Примечание.

    Минимальное окно повторения для выполнения отслеживания измененных данных составляет одну минуту.

  6. Выберите "Проверить" , чтобы убедиться, что вы не сделали никаких ошибок или упущений. Затем выберите "Опубликовать ", чтобы опубликовать конвейер.

  7. Просмотрите данные, помещенные в контейнер Хранилище BLOB-объектов Azure в качестве выходных данных потока данных с помощью аналитического хранилища Azure Cosmos DB для отслеживания измененных данных.

    Screnshot выходных файлов из конвейера в контейнере Хранилище BLOB-объектов Azure.

    Примечание.

    Время запуска начального кластера может занять до трех минут. Чтобы избежать времени запуска кластера в последующих выполнениях записи измененных данных, настройте время запуска кластера dataflow для динамического значения. Дополнительные сведения о среде выполнения и TTL см. в Фабрика данных Azure среды выполнения интеграции.

Параллельно работающие задания

Размер пакета в исходных параметрах или ситуациях, когда приемник замедляет прием потока изменений, может привести к одновременному выполнению нескольких заданий. Чтобы избежать этой ситуации, задайте для параметра параллелизма значение 1 в параметрах конвейера, чтобы убедиться, что новые выполнения не активируются до завершения текущего выполнения.

Следующие шаги