Разностное копирование из базы данных с использованием контрольной таблицы

Область применения:Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

В этой статье описывается шаблон, доступный для добавочной загрузки новых или обновленных строк из таблицы базы данных в Azure с помощью внешней контрольной таблицы, в которой хранится значение верхнего предела.

Этот шаблон требует, чтобы схема базы данных-источника содержала столбец метки времени или добавочный ключ для идентификации новых или обновленных строк.

Примечание.

Если в базе данных-источнике есть столбец метки времени для идентификации новых или обновленных строк, но вы не хотите создавать внешнюю контрольную таблицу для разностного копирования, то вы можете использовать средство "Копирование данных" Фабрики данных Azure, чтобы получить конвейер. Это средство использует запланированное время в качестве переменной для считывания только новых строк из базы данных-источника.

Информация о шаблоне решения

Этот шаблон сначала извлекает старое значение предела, а затем сравнивает его с текущим значением. После этого из базы данных-источника копируются только изменения на основе сравнения двух значений пределов. Наконец, новое значение верхнего предела сохраняется во внешнюю контрольную таблицу для разностной загрузки данных в следующий раз.

Шаблон состоит из четырех действий.

  • Действие поиска для получения старого значения верхнего предела, хранящегося во внешней контрольной таблице.
  • Еще одно действие поиска для получения текущего значения верхнего предела из базы данных-источника.
  • Действие копирования копирует в целевое хранилище только изменения из базы данных-источника. Запрос, используемый для определения изменений в базе данных-источнике, похож на запрос SELECT * FROM Data_Source_Table WHERE TIMESTAMP_Column > "последний верхний предел" и TIMESTAMP_Column <= "текущий верхний предел".
  • Процедура SqlServerStoredProcedure для записи текущего значения верхнего предела во внешнюю контрольную таблицу для разностного копирования в следующий раз.

Ниже описаны параметры, которые определяет шаблон:

  • Data_Source_Table_Name — это имя таблицы в базе данных-источнике, из которой необходимо загрузить данные.
  • Data_Source_WaterMarkColumn — имя столбца в исходной таблице, который используется для обнаружения новых или обновленных строк. Этот столбец обычно имеет тип datetime, INT или аналогичный.
  • Data_Destination_Folder_Path — это путь к корневой папке в целевом хранилище, куда копируются данные.
  • Data_Destination_Directory — это путь к каталогу в корневой папке в целевом хранилище, куда копируются данные.
  • Data_Destination_Table_Name — место, куда копируются данные в целевом хранилище (применяется, если в качестве места назначения данных выбрано "Azure Synapse Analytics").
  • Data_Destination_Folder_Path — место, куда копируются данные в целевом хранилище (применяется, если в качестве места назначения данных выбрано "Файловая система" или "Azure Data Lake Storage 1-го поколения").
  • Control_Table_Table_Name — это имя внешней контрольной таблицы для хранения значения верхнего предела.
  • Control_Table_Column_Name — это имя столбца во внешней контрольной таблице, в которой хранится значение верхнего предела.

Использование шаблона решения

  1. Изучите исходную таблицу, которую вы хотите загрузить, и определите столбец верхнего предела, который можно использовать для идентификации новых или обновленных строк. Этот столбец может иметь тип datetime, INT или аналогичный. Значение этого столбца увеличивается по мере добавления новых строк. В следующем примере исходной таблицы (data_source_table) в качестве столбца верхнего предела можно использовать столбец LastModifytime.

    PersonID	Name            LastModifytime
    1           aaaa            2017-09-01 00:56:00.000
    2           bbbb            2017-09-02 05:23:00.000
    3           cccc            2017-09-03 02:36:00.000
    4           dddd            2017-09-04 03:21:00.000
    5           eeee            2017-09-05 08:06:00.000
    6           fffffff         2017-09-06 02:23:00.000
    7           gggg            2017-09-07 09:01:00.000
    8           hhhh            2017-09-08 09:01:00.000
    9           iiiiiiiii       2017-09-09 09:01:00.000
    
  2. Создайте контрольную таблицу в экземпляре SQL Server или Базы данных SQL Azure, чтобы хранить значение верхнего предела для разностной загрузки данных. В следующем примере имя контрольной таблицы — watermarktable. В этой таблице WatermarkValue — это столбец, в котором хранится значение верхнего предела, а его тип — datetime.

    create table watermarktable
    (
    WatermarkValue datetime,
    );
    INSERT INTO watermarktable
    VALUES ('1/1/2010 12:00:00 AM')
    
  3. Создайте хранимую процедуру в том же экземпляре SQL Server или Базы данных SQL Azure, который использовался для создания контрольной таблицы. Эта хранимая процедура используется для записи нового значения верхнего предела во внешнюю контрольную таблицу для разностной загрузки данных в следующий раз.

    CREATE PROCEDURE update_watermark @LastModifiedtime datetime
    AS
    
    BEGIN
    
        UPDATE watermarktable
        SET [WatermarkValue] = @LastModifiedtime 
    
    END
    
  4. Перейдите к шаблону разностного копирования из базы данных. Выберите Создать, чтобы создать подключение к базе данных-источнику, из которой нужно скопировать данные.

    Screenshot showing the creation of a new connection to the source table.

  5. Выберите Создать, чтобы создать подключение к целевой базе данных, в которую нужно скопировать данные.

    Screenshot showing the creation of a new connection to the destination table.

  6. Выберите Создать, чтобы создать подключение к внешней контрольной таблице и хранимой процедуре, созданным на шагах 2 и 3.

    Screenshot showing the creation of a new connection to the control table data store.

  7. Выберите Использовать этот шаблон.

  8. Вы увидите доступный конвейер, как показано в следующем примере:

    Screenshot showing the pipeline.

  9. Выберите Хранимая процедура. В качестве имени хранимой процедуры выберите [dbo].[update_watermark]. Выберите Import parameter (Параметр импорта), а затем выберите Добавить динамическое содержимое.

    Screenshot showing where to set the stored procedure activity.

  10. Запишите содержимое {activity('LookupCurrentWaterMark').output.firstRow.NewWatermarkValue}, после чего щелкните Готово.

    Screenshot showing where to write the content for the parameters of the stored procedure.

  11. Выберите Отладка, введите Параметры, а затем нажмите Готово.

    Screenshot showing the Debug button.

  12. Отобразятся результаты, как в следующем примере:

    Sreenshot showing the result of the pipeline run.

  13. Вы можете создать новые строки в исходной таблице. Ниже приведен пример кода SQL для создания новых строк:

    INSERT INTO data_source_table
    VALUES (10, 'newdata','9/10/2017 2:23:00 AM')
    
    INSERT INTO data_source_table
    VALUES (11, 'newdata','9/11/2017 9:01:00 AM')
    
  14. Чтобы снова запустить конвейер, выберите Отладка, укажите Параметры и щелкните Готово.

    Вы увидите, что в место назначения будут скопированы только новые строки.

  15. (Необязательно.) Если в качестве места назначения данных выбрана служба Azure Synapse Analytics, необходимо указать подключение к хранилищу BLOB-объектов Azure для промежуточного хранения в соответствии с требованиями Azure Synapse Analytics Polybase. Шаблон автоматически создаст путь к контейнеру. После выполнения конвейера проверьте, создан ли контейнер в хранилище BLOB-объектов.

    Screenshot showing where to configure Polybase.