Создание крупномасштабных потоков копирования данных с подходом, управляемым метаданными в средстве копирования данных

ПРИМЕНИМО К: Azure Data Factory Azure Synapse Analytics

Совет

Data Factory в Microsoft Fabric — это следующее поколение Azure Data Factory с более простой архитектурой, встроенным ИИ и новыми функциями. Если вы не знакомы с интеграцией данных, начните с Fabric Data Factory. Существующие рабочие нагрузки ADF могут обновляться до Fabric для доступа к новым возможностям в области обработки и анализа данных, аналитики в режиме реального времени и отчетов.

Если вам нужно копировать огромные объемы объектов (например, тысячи таблиц) или загружать данные из большого спектра источников, следует ввести список имен объектов с требуемыми режимами копирования в таблицу управления, а затем использовать параметризованные конвейеры для чтения данных из этой таблицы и их применения к соответствующим заданиям. Таким образом можно оперировать списком объектов (например, дополнять его или сокращать), чтобы упростить копирование и просто обновлять имена объектов в таблице управления, а не повторно развертывать конвейеры. Более того, вы сможете легко проверить, какие объекты скопированы какими конвейерами и триггерами с определенными режимами копирования.

Средство копирования данных в ADF упрощает процесс создания таких управляемых метаданными конвейеров копирования данных. После того как вы выполните интуитивно понятные инструкции по работе с мастером, средство сможет создавать параметризованные конвейеры и сценарии SQL, чтобы соответствующим образом создать внешние таблицы управления. После запуска созданных сценариев для создания таблицы управления в базе данных SQL конвейеры будут считывать метаданные из таблицы управления и автоматически применять их к заданиям копирования.

Создание управляемых метаданными заданий копирования из средства копирования данных

  1. В средстве копирования данных выберите задание копирования, управляемое метаданными (Metadata-driven copy task).

    Необходимо указать подключение и имя таблицы управления, чтобы созданный конвейер считывал метаданные из нее.

    Выбор типа задачи

  2. Введите подключение к базе данных-источнику. Можно также использовать параметризованную связанную службу.

    Выбор параметризованной связанной службы

  3. Выберите имя таблицы для копирования.

    Выберите таблицу

    Примечание.

    Если выбрать хранилище табличных данных, то на следующей странице вы сможете дополнительно выбрать полную загрузку или разностную загрузку. Если выбрать хранилище, то на следующей странице можно будет выбрать только полную загрузку. Добавочная загрузка новых файлов только из хранилища в настоящее время не поддерживается.

  4. Выберите режим загрузки.

    Совет

    Если необходимо выполнить полное копирование всех таблиц, выберите Full load all tables (Полная загрузка всех таблиц). Если нужно выполнить добавочное копирование, можно выбрать параметр configure for each table individually (Настроить для каждой таблицы по отдельности) и выбрать Delta load (Разностная нагрузка), а также имя и начальное значение столбца предела для каждой таблицы.

  5. Выберите целевое хранилище данных.

  6. На странице Settings (Параметры) можно выбрать максимальное число действий копирования, которые будут одновременно копировать данные из исходного хранилища, с помощью параметра Number of concurrent copy tasks (Количество параллельных задач копирования). Значение по умолчанию — 20.

    Страница параметров

  7. После развертывания конвейера можно скопировать или скачать сценарии SQL из пользовательского интерфейса для создания управляющей таблицы и процедуры хранилища.

    Скачивание сценариев

    Вы увидите два сценария SQL.

    • Первый сценарий SQL используется для создания двух управляющих таблиц. В основной таблице управления хранятся список таблиц, путь к файлам или режимы копирования. В таблице управления подключениями хранится значение подключения для хранилища данных, если используется параметризованная связанная служба.
    • Второй сценарий SQL используется для создания процедуры хранилища. Она используется для обновления значения водяного знака в основной таблице управления при каждом завершении заданий инкрементного копирования.
  8. Откройте SSMS, чтобы подключиться к серверу таблицы управления, и выполните два сценария SQL для создания управляющих таблиц и процедуры хранилища.

    Сценарий для создания таблицы управления

  9. Выполните запрос к основной таблице управления и таблице управления подключениями, чтобы просмотреть содержащиеся в них метаданные.

    Основная таблица управленияСкрипт таблицы управления запросами1

    Таблица управления подключениемСкрипт таблицы управления запросами2

  10. Вернитесь на портал ADF, чтобы просмотреть и отладить конвейеры. Вы увидите созданную папку с именем MetadataDrivenCopyTask_#########. Щелкните имя конвейера MetadataDrivenCopyTask###_TopLevel и выберите debug run (Выполнить отладку).

    Вам потребуется ввести следующие параметры.

    Имя параметра Описание
    MaxNumberOfConcurrentTasks Вы всегда можете изменить максимальное число одновременных операций копирования перед запуском конвейера. Значение по умолчанию будет тем, которое вы введите в инструмент копирования данных.
    MainControlTableName Вы всегда можете изменить имя основной таблицы управления, и конвейер будет получать метаданные из этой таблицы перед запуском.
    ТаблицаУправленияПодключениями Вы всегда можете изменить имя таблицы управления подключениями (необязательно), и перед запуском конвейер будет получать из нее метаданные, связанные с подключением к хранилищам данных.
    МаксимальноеКоличествоОбъектовВозвращаемыхИзПоисковойАктивности Чтобы избежать достижения предельного количества операций поиска в выходных данных, можно определить максимальное число объектов, возвращаемых действием поиска. В большинстве случаев значение по умолчанию не требуется изменять.
    windowStart Если вы указали динамическое значение (например, "yyyy/mm/dd") в пути к папке, то этот параметр используется для передачи текущего времени активации в конвейер, чтобы заполнить динамический путь к папке. Когда конвейер активируется триггером по расписанию или периодическим триггером, пользователям не нужно вводить значение этого параметра. Пример значения: 2021-01-25T01:49:28Z
  11. Включите этот триггер, чтобы обеспечить работу конвейеров.

    Включить триггер

Обновление таблицы управления с помощью средства копирования данных

Вы всегда можете напрямую обновить таблицу управления, добавив или удалив объект, который необходимо скопировать, или изменив режим копирования для каждой таблицы. Мы также создаем интерфейс пользователя для инструмента копирования данных, чтобы упростить редактирование таблицы управления.

  1. Щелкните правой кнопкой мыши конвейер верхнего уровня MetadataDrivenCopyTask_xxx_TopLevel, а затем выберите Edit control table (Изменить таблицу управления).

    Изменение таблицы управления 1

  2. Выберите строки из таблицы управления для изменения.

    Изменение таблицы управления 2

  3. Воспользуйтесь инструментом копирования данных, и он создаст для вас новый SQL скрипт. Перезапустите сценарий SQL, чтобы обновить контрольную таблицу.

    Изменение таблицы управления 3

    Примечание.

    Конвейер не будет повторно развернут. Новый созданный сценарий SQL помогает обновить только таблицу управления.

Таблицы управления

Основная таблица управления

Каждая строка в таблице управления содержит метаданные одного копируемого объекта (например, таблицы).

Имя столбца Описание
Идентификатор Уникальный идентификатор копируемого объекта.
НастройкиОбъектаИсточника Метаданные исходного набора данных. Это может быть имя схемы, имя таблицы и т. д. Ниже приведен пример.
ИмяНастроекПодключенияИсточника Имя параметра исходного подключения в таблице управления подключениями. Этот параметр необязателен.
КопироватьНастройкиИсточник Метаданные источника в операции копирования. Это может быть запрос, секции и т. д. Ниже приведен пример.
SinkObjectSettings Метаданные целевого набора данных. Это может быть имя файла, путь к папке, имя таблицы и т. д. Ниже приведен пример. Если указан динамический путь к папке, то значение переменной не будет записываться в таблицу управления.
ПараметрыПодключенияSinkИмя Имя параметра целевого подключения в таблице управления подключениями. Этот параметр необязателен.
CopySinkSettings Метаданные свойства приемника в операции копирования. Это может быть preCopyScript, tableOption и т. д. Ниже приведен пример.
НастройкиКопированияДействий Метаданные свойства переводчика в процессе копирования. Оно используется для определения сопоставления столбцов.
TopLevelPipelineName Имя конвейера верхнего уровня, который может скопировать этот объект.
ИмяТриггера Имя триггера, который может активировать конвейер для копирования этого объекта. При запуске отладки используется имя Sandbox. При выполнении вручную используется имя Manual. Если это запланированное выполнение, то используется имя связанного триггера. Система может принимать несколько имен в качестве входных данных.
НастройкиПоведенияЗагрузкиДанных Полная загрузка против дельта загрузки.
ИдентификаторЗадачи Порядок копирования объектов после элемента TaskId в таблице управления (ORDER BY [TaskId] DESC). Если вам нужно скопировать огромное количество объектов, но количество одновременно копируемых объектов ограничено, можно изменить TaskId для каждого объекта, чтобы решить, какие объекты можно скопировать раньше. Значение по умолчанию равно 0.
ВключениеКопирования Укажите, участвует ли элемент в процессе приема данных. Допустимые значения: 1 (участвует), 0 (не участвует). Значение по умолчанию — 1.

Таблица управления подключениями

Каждая строка в этой таблице управления содержит один параметр подключения к хранилищу данных.

Имя столбца Описание
Имя. Имя параметризованного подключения в основной таблице управления.
ПараметрыПодключения Параметры подключения. Это может быть имя базы данных, имя сервера и т. д.

Конвейеры

Вы увидите три уровня конвейеров, созданных средством копирования данных.

MetadataDrivenCopyTask_xxx_TopLevel

Этот конвейер вычислит общее количество объектов (таблиц и т. д.), которые должны быть скопированы в этом запуске, получит количество последовательных пакетов на основе максимального числа одновременных задач копирования, а затем выполнит другой конвейер для последовательного копирования разных пакетов.

Параметры

Имя параметра Описание
MaxNumberOfConcurrentTasks Вы всегда можете изменить максимальное число одновременных операций копирования перед запуском конвейера. Значение по умолчанию будет тем, которое вы введите в инструмент копирования данных.
MainControlTableName Имя основной таблицы управления. Конвейер получит метаданные из этой таблицы перед запуском.
ТаблицаУправленияПодключениями Имя таблицы управления подключениями (необязательно). Перед запуском конвейер получит метаданные, связанные с подключением к хранилищам данных.
МаксимальноеКоличествоОбъектовВозвращаемыхИзПоисковойАктивности Чтобы избежать достижения предельного количества операций поиска в выходных данных, можно определить максимальное число объектов, возвращаемых действием поиска. В большинстве случаев значение по умолчанию не требуется изменять.
windowStart Если вы указали динамическое значение (например, "yyyy/mm/dd") в пути к папке, то этот параметр используется для передачи текущего времени активации в конвейер, чтобы заполнить динамический путь к папке. Когда конвейер активируется триггером по расписанию или периодическим триггером, пользователям не нужно вводить значение этого параметра. Пример значения: 2021-01-25T01:49:28Z

Деятельность

Название действия Тип активности Описание
GetSumOfObjectsToCopy Поиск Вычисление общего количества объектов (таблиц и т. д.), которые должны быть скопированы при выполнении этого запуска.
КопироватьПакетыОбъектовПоочередно ForEach Определите количество последовательных партий на основе максимального разрешенного количества одновременных задач копирования, а затем запустите другой конвейер для копирования различных партий по очереди.
КопированиеОбъектовВОдномПакете Запуск потока Выполните другой конвейер, чтобы скопировать одну партию объектов. Объекты, относящиеся к этому пакету, будут копироваться параллельно.

MetadataDrivenCopyTask_xxx_ СреднийУровень

Этот конвейер скопирует один пакет объектов. Объекты, относящиеся к этому пакету, будут копироваться параллельно.

Параметры

Имя параметра Описание
МаксимальноеКоличествоОбъектовВозвращаемыхИзПоисковойАктивности Чтобы избежать достижения предельного количества операций поиска в выходных данных, можно определить максимальное число объектов, возвращаемых действием поиска. В большинстве случаев значение по умолчанию не требуется изменять.
TopLevelPipelineName Имя конвейера верхнего уровня.
ИмяТриггера Имя триггера.
Текущий последовательный номер партии Идентификатор серийной партии.
СуммаОбъектовДляКопирования Общее число копируемых объектов.
Количество объектов для копирования в текущей партии Число копируемых объектов в текущем пакете.
MainControlTableName Имя основной таблицы управления.
ТаблицаУправленияПодключениями Имя таблицы управления подключениями.

Деятельность

Название действия Тип активности Описание
РазделитьОдинПакетНаНесколькоГрупп ForEach Разделите объекты из одного набора на несколько параллельных групп, чтобы избежать превышения ограничения выходных данных действия поиска.
ПолучитьОбъектыПоГруппеДляКопирования Поиск Получение объектов (таблиц и т. д.) из управляющей таблицы, которые должны быть скопированы в этой группе. Порядок копирования объектов после элемента TaskId в таблице управления (ORDER BY [TaskId] DESC).
КопироватьОбъектыВОднуГруппу Запуск потока Выполнить другой конвейер, чтобы скопировать объекты из одной группы. Объекты, относящиеся к этой группе, будут копироваться параллельно.

MetadataDrivenCopyTask_xxx_ BottomLevel

Этот конвейер скопирует объекты из одной группы. Объекты, относящиеся к этой группе, будут копироваться параллельно.

Параметры

Имя параметра Описание
ObjectsPerGroupToCopy Число копируемых объектов в текущей группе.
ТаблицаУправленияПодключениями Имя таблицы управления подключениями.
windowStart Используется для передачи текущего времени активации в конвейер, чтобы заполнить динамический путь к папке, если он настроен пользователем.

Деятельность

Название действия Тип активности Описание
СписокОбъектовИзОднойГруппы ForEach Перечисление объектов из одной группы и их последовательная обработка для последующих действий.
МаршрутизацияЗаданийНаОсновеПоведенияЗагрузки переключатель Проверка режима загрузки для каждого объекта. Если это случай по умолчанию или FullLoad, выполните полную загрузку. Если указано значение DeltaLoad, выполняется добавочная загрузка посредством столбца предела для обнаружения изменений.
FullLoadOneObject Копия Сделайте полный моментальный снимок этого объекта и скопируйте его в место назначения.
DeltaLoadOneObject Копия Копируйте только те данные, которые изменены с момента последнего раза, сравнивая значение в столбце водяного знака, чтобы определить изменения.
GetMaxWatermarkValue Поиск Сделайте запрос к исходному объекту данных, чтобы получить максимальное значение из столбца с водяным знаком.
UpdateWatermarkColumnValue StoreProcedure Запись нового значения водяного знака обратно в управляющую таблицу для последующего использования.

Известные ограничения

  • Имя IR, тип базы данных, тип формата файла невозможно параметризовать в ADF. Например, если вы хотите принять данные из Oracle Server и SQL Server, потребуется два разных параметризованных конвейера. Но два набора конвейеров могут совместно использовать одну таблицу управления.
  • OPENJSON используется в сгенерированных сценариях SQL инструментом копирования данных. Если для размещения таблицы управления используется SQL Server, она должна быть SQL Server 2016 (13.x) и более поздних версий для поддержки функции OPENJSON.

Попробуйте эти учебные пособия, в которых используется инструмент Copy Data: