Добавочная загрузка данных из Базы данных SQL Azure в хранилище BLOB-объектов Azure с использованием сведений об отслеживания изменений и PowerShell

Область применения:Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

Из этого руководства вы узнаете, как создать Фабрику данных Azure с конвейером, который загружает в Хранилище BLOB-объектов Azure разностные данные на основе сведений об отслеживании изменений из базы данных-источника в службе "База данных SQL Azure".

В этом руководстве вы выполните следующие шаги:

  • подготовите исходное хранилище данных;
  • Создали фабрику данных.
  • Создали связанные службы.
  • создадите источник, приемник и наборы данных отслеживания изменений;
  • создадите, запустите и начнете мониторинг конвейера полного копирования;
  • добавите или обновите данные в исходной таблице;
  • создаете, запустите и начнете мониторинг конвейера добавочного копирования.

Примечание.

Мы рекомендуем использовать модуль Azure Az PowerShell для взаимодействия с Azure. Чтобы начать работу, см. статью Установка Azure PowerShell. Дополнительные сведения см. в статье Перенос Azure PowerShell с AzureRM на Az.

Обзор

В решениях для интеграции данных после начальной загрузки данных широко используется добавочная загрузка. В некоторых случаях измененные за определенный период времени данные в исходном хранилище данных можно легко разделить на такие срезы, как LastModifyTime или CreationTime. В некоторых случаях определить разностные данные, полученные в ходе последней обработки данных, нет возможности. Для этого используется технология "Отслеживание изменений", поддерживаемая такими хранилищами данных, как База данных Azure SQL и SQL Server. В этом руководстве описано, как использовать службу "Фабрика данных Azure" с технологией "Отслеживание изменений" (SQL) для добавочной загрузки разностных данных из Базы данных SQL Azure в хранилище BLOB-объектов Azure. Дополнительные сведения о технологии "Отслеживание изменений" (SQL) см. в статье Об отслеживании изменений (SQL Server).

Комплексный рабочий процесс

Ниже описан стандартный рабочий процесс по добавочной загрузке данных с помощью технологии "Отслеживание изменений".

Примечание.

И База данных SQL Azure, и SQL Server поддерживают технологию "Отслеживание изменений". В этом руководстве в качестве исходного хранилища данных используется база данных Azure SQL. Можете также использовать экземпляр SQL Server.

  1. Начальная загрузка исторических данных (один раз):
    1. Включите технологию "Отслеживание изменений" в базе данных-источнике, хранящейся в Базе данных SQL Azure.
    2. Получите начальное значение SYS_CHANGE_VERSION в базе данных в качестве базовых показателей для записи измененных данных.
    3. Загрузите все данные из базы данных-источника в Хранилище BLOB-объектов Azure.
  2. Добавочная загрузка разностных данных по расписанию (периодически выполняется после начальной загрузки данных):
    1. Получите старое и новое значения SYS_CHANGE_VERSION.
    2. Загрузите разностные данные, присоединив первичные ключи измененных строк (для двух значений SYS_CHANGE_VERSION) из таблицы sys.change_tracking_tables, с данными в исходной таблице, а затем переместите разностные данные в целевое хранилище.
    3. Обновите SYS_CHANGE_VERSION для следующей загрузки разностных данных.

Общее решение

В этом руководстве описано, как создать два конвейера, которые выполняют следующие две операции:

  1. Начальная загрузка — вы создадите конвейер с действием копирования, которое копирует все данные из исходного хранилища данных (База данных Azure SQL) в целевое хранилище данных (хранилище BLOB-объектов Azure).

    Full loading of data

  2. Добавочная загрузка — вы создадите конвейер со следующими действиями для периодического выполнения:

    1. Создайте два действия поиска, чтобы получить старое и новое значения SYS_CHANGE_VERSION из базы данных SQL Azure и передать их в действие копирования.
    2. Создайте одно действие копирования, чтобы скопировать вставленные, обновленные или удаленные данные, связанные с двумя значениями SYS_CHANGE_VERSION, из Базы данных SQL Azure в хранилище BLOB-объектов.
    3. Создайте одно действие хранимой процедуры, чтобы обновить значение SYS_CHANGE_VERSION для запуска следующего конвейера.

    Increment load flow diagram

Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.

Необходимые компоненты

  • Azure PowerShell. Чтобы установить модули Azure PowerShell, выполните инструкции из статьи Установка и настройка Azure PowerShell.
  • База данных SQL Azure. Используйте базу данных как исходное хранилище данных. Если у вас нет базы данных в службе "База данных SQL Azure", вы можете создать ее, выполнив инструкции из статьи Создание отдельной базы данных в Базе данных SQL Azure.
  • Учетная запись хранения Azure. В этом руководстве в качестве приемника будет использоваться хранилище BLOB-объектов. Если у вас нет учетной записи хранения Azure, ознакомьтесь с разделом Создание учетной записи хранения. Создайте контейнер с именем adftutorial.

Создание таблицы источника данных в базе данных

  1. Запустите SQL Server Management Studio и подключитесь к Базе данных SQL.

  2. В обозревателе сервера щелкните правой кнопкой мыши базу данных и выберите Создать запрос.

  3. Выполните указанную ниже команду SQL для базы данных, чтобы создать таблицу с именем data_source_table в качестве исходного хранилища данных.

    create table data_source_table
    (
        PersonID int NOT NULL,
        Name varchar(255),
        Age int
        PRIMARY KEY (PersonID)
    );
    
    INSERT INTO data_source_table
        (PersonID, Name, Age)
    VALUES
        (1, 'aaaa', 21),
        (2, 'bbbb', 24),
        (3, 'cccc', 20),
        (4, 'dddd', 26),
        (5, 'eeee', 22);
    
    
  4. Включите функцию Отслеживание изменений для базы данных и исходной таблицы (data_source_table), выполнив следующий SQL-запрос:

    Примечание.

    • Замените <your database name> именем базы данных, которая содержит таблицу data_source_table.
    • Измененные данные в текущем примере хранятся в течение двух дней. Если загружать измененные данные каждые три дня или реже, некоторые измененные данные не будут включены. В таком случае вам нужно увеличить значение CHANGE_RETENTION. В противном случае нужно следить за тем, чтобы период для загрузки измененных данных не превышал два дня. Дополнительные сведения см. в разделе Включение отслеживания изменений для базы данных.
    ALTER DATABASE <your database name>
    SET CHANGE_TRACKING = ON  
    (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)  
    
    ALTER TABLE data_source_table
    ENABLE CHANGE_TRACKING  
    WITH (TRACK_COLUMNS_UPDATED = ON)
    
  5. Создайте таблицу и сохраните стандартное значение ChangeTracking_version, выполнив следующий запрос:

    create table table_store_ChangeTracking_version
    (
        TableName varchar(255),
        SYS_CHANGE_VERSION BIGINT,
    );
    
    DECLARE @ChangeTracking_version BIGINT
    SET @ChangeTracking_version = CHANGE_TRACKING_CURRENT_VERSION();  
    
    INSERT INTO table_store_ChangeTracking_version
    VALUES ('data_source_table', @ChangeTracking_version)
    

    Примечание.

    Если после включения отслеживания изменений для базы данных SQL данные не меняются, значение ChangeTracking_version будет равно 0.

  6. Выполните следующий запрос, чтобы создать хранимую процедуру в базе данных. Конвейер вызывает эту хранимую процедуру для обновления значения ChangeTracking_version в таблице, созданной на предыдущем шаге.

    CREATE PROCEDURE Update_ChangeTracking_Version @CurrentTrackingVersion BIGINT, @TableName varchar(50)
    AS
    
    BEGIN
    
    UPDATE table_store_ChangeTracking_version
    SET [SYS_CHANGE_VERSION] = @CurrentTrackingVersion
    WHERE [TableName] = @TableName
    
    END    
    

Azure PowerShell

Чтобы установить модули Azure PowerShell, выполните инструкции из статьи Установка и настройка Azure PowerShell.

Создание фабрики данных

  1. Определите переменную для имени группы ресурсов, которую в дальнейшем можно будет использовать в командах PowerShell. Скопируйте текст следующей команды в PowerShell, укажите имя группы ресурсов Azure в двойных кавычках, а затем выполните команду. Например: "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Если группа ресурсов уже существует, вы можете не перезаписывать ее. Назначьте переменной $resourceGroupName другое значение и еще раз выполните команду.

  2. Определите переменную для расположения фабрики данных:

    $location = "East US"
    
  3. Чтобы создать группу ресурсов Azure, выполните следующую команду:

    New-AzResourceGroup $resourceGroupName $location
    

    Если группа ресурсов уже существует, вы можете не перезаписывать ее. Назначьте переменной $resourceGroupName другое значение и еще раз выполните команду.

  4. Определите переменную для имени фабрики данных.

    Важно!

    Измените имя фабрики данных, чтобы оно было глобально уникальным.

    $dataFactoryName = "IncCopyChgTrackingDF";
    
  5. Чтобы создать фабрику данных, выполните командлет Set-AzDataFactoryV2.

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

Обратите внимание на следующие аспекты:

  • Имя фабрики данных Azure должно быть глобально уникальным. Если появляется следующая ошибка, измените имя и повторите попытку.

    The specified Data Factory name 'ADFIncCopyChangeTrackingTestFactory' is already in use. Data Factory names must be globally unique.
    
  • Чтобы создать экземпляры фабрики данных, нужно назначить учетной записи пользователя, используемой для входа в Azure, роль участника, владельца либо администратора подписки Azure.

  • Чтобы получить список регионов Azure, в которых в настоящее время доступна Фабрика данных, выберите интересующие вас регионы на следующей странице, а затем разверните раздел Аналитика, чтобы найти пункт Фабрика данных: Доступность продуктов по регионам. Хранилища данных (служба хранилища Azure, база данных SQL Azure и т. д.) и вычисления (HDInsight и т. д.), используемые фабрикой данных, могут располагаться в других регионах.

Создание связанных служб

Связанная служба в фабрике данных связывает хранилища данных и службы вычислений с фабрикой данных. В этом разделе объясняется, как создать связанные службы для учетной записи хранения Azure и Базы данных SQL Azure.

Создание связанной службы хранилища Azure

На этом шаге вы свяжете учетную запись хранения Azure с фабрикой данных.

  1. Создайте JSON-файл с именем AzureStorageLinkedService.json в папке C:\ADFTutorials\IncCopyChangeTrackingTutorial и добавьте в него следующее (если эта папка отсутствует, создайте ее): Перед сохранением файла замените значения <accountName> и <accountKey> именем своей учетной записи хранения Azure и ее ключом.

    {
        "name": "AzureStorageLinkedService",
        "properties": {
            "type": "AzureStorage",
            "typeProperties": {
                "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
            }
        }
    }
    
  2. В Azure PowerShell перейдите к папке C:\ADFTutorials\IncCopyChangeTrackingTutorial.

  3. Выполните командлет Set-AzDataFactoryV2LinkedService, чтобы создать связанную службу: Azure служба хранилища LinkedService. В указанном ниже примере вы передадите значения для параметров ResourceGroupName и DataFactoryName.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
    

    Пример выходных данных:

    LinkedServiceName : AzureStorageLinkedService
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
    

Создание связанной службы Базы данных SQL Azure

На этом шаге вы свяжете базу данных с фабрикой данных.

  1. Создайте JSON-файл с именем AzureSQLDatabaseLinkedService.json в папке C:\ADFTutorials\IncCopyChangeTrackingTutorial и добавьте в него следующее. Прежде чем сохранить файл, вместо значений <server>, <database name>, <user id> и <password> укажите имя сервера, имя базы данных, идентификатор пользователя и пароль.

    {
        "name": "AzureSQLDatabaseLinkedService",
        "properties": {
            "type": "AzureSqlDatabase",
            "typeProperties": {
                "connectionString": "Server = tcp:<server>.database.windows.net,1433;Initial Catalog=<database name>; Persist Security Info=False; User ID=<user name>; Password=<password>; MultipleActiveResultSets = False; Encrypt = True; TrustServerCertificate = False; Connection Timeout = 30;"
            }
        }
    }
    
  2. В Azure PowerShell запустите командлет Set-AzDataFactoryV2LinkedService, чтобы создать связанную службу: AzureSQLDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Пример выходных данных:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

Создайте наборы данных.

На этом шаге вы создадите наборы данных, которые представляют данные источника и приемника, а также передадите в хранилище значение SYS_CHANGE_VERSION.

Создание исходного набора данных

На этом шаге вы создадите набор данных для представления исходных данных.

  1. Создайте файл JSON с именем SourceDataset.json в той же папке со следующим содержимым:

    {
        "name": "SourceDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "data_source_table"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }   
    
  2. Выполните командлет Set-AzDataFactoryV2Dataset, чтобы создать набор данных: SourceDataset

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Вот пример выходных данных командлета:

    DatasetName       : SourceDataset
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Создание набора данных приемника

На этом шаге вы создадите набор данных для представления данных, которые копируются из исходного хранилища данных.

  1. Создайте файл JSON с именем SinkDataset.json в той же папке со следующим содержимым:

    {
        "name": "SinkDataset",
        "properties": {
            "type": "AzureBlob",
            "typeProperties": {
                "folderPath": "adftutorial/incchgtracking",
                "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')",
                "format": {
                    "type": "TextFormat"
                }
            },
            "linkedServiceName": {
                "referenceName": "AzureStorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    

    Контейнер adftutorial создается в хранилище BLOB-объектов Azure как необходимый компонент. Создайте контейнер (если его еще нет) или присвойте ему имя имеющегося контейнера. В этом руководстве имя выходного файла создается динамически с помощью выражения @CONCAT('Incremental-', pipeline().RunId, '.txt').

  2. Выполните командлет Set-AzDataFactoryV2Dataset, чтобы создать набор данных: SinkDataset

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Вот пример выходных данных командлета:

    DatasetName       : SinkDataset
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset
    

Создание набора данных для отслеживания изменений

На этом шаге вы создадите набор данных для хранения значения ChangeTracking_version.

  1. Создайте в той же папке JSON-файл с именем ChangeTrackingDataset.json со следующим содержимым:

    {
        "name": " ChangeTrackingDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "table_store_ChangeTracking_version"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    

    Таблица table_store_ChangeTracking_version создается как необходимый компонент.

  2. Выполните командлет Set-AzDataFactoryV2Dataset, чтобы создать набор данных: ChangeTrackingDataset

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "ChangeTrackingDataset" -File ".\ChangeTrackingDataset.json"
    

    Вот пример выходных данных командлета:

    DatasetName       : ChangeTrackingDataset
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Создание конвейера для полного копирования данных

На этом этапе вы создадите конвейер с действием копирования, которое копирует все данные из исходного хранилища данных (База данных Azure SQL) в целевое хранилище данных (хранилище BLOB-объектов Azure).

  1. Создайте JSON-файл FullCopyPipeline.json в той же папке со следующим содержимым:

    {
        "name": "FullCopyPipeline",
        "properties": {
            "activities": [{
                "name": "FullCopyActivity",
                "type": "Copy",
                "typeProperties": {
                    "source": {
                        "type": "SqlSource"
                    },
                    "sink": {
                        "type": "BlobSink"
                    }
                },
    
                "inputs": [{
                    "referenceName": "SourceDataset",
                    "type": "DatasetReference"
                }],
                "outputs": [{
                    "referenceName": "SinkDataset",
                    "type": "DatasetReference"
                }]
            }]
        }
    }
    
  2. Выполните командлет Set-AzDataFactoryV2Pipeline, чтобы создать конвейер: FullCopyPipeline.

     Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "FullCopyPipeline" -File ".\FullCopyPipeline.json"
    

    Пример выходных данных:

     PipelineName      : FullCopyPipeline
     ResourceGroupName : ADFTutorialResourceGroup
     DataFactoryName   : IncCopyChgTrackingDF
     Activities        : {FullCopyActivity}
     Parameters        :
    

Запуск конвейера полного копирования

Запустите конвейер: FullCopyPipeline с помощью командлета Invoke-AzDataFactoryV2Pipeline.

Invoke-AzDataFactoryV2Pipeline -PipelineName "FullCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName        

Мониторинг конвейера полного копирования

  1. Войдите на портал Azure.

  2. Щелкните Все службы, выполните поиск по ключевому слову data factories и выберите Фабрики данных.

    Data factories menu

  3. Найдите и выберите в списке свою фабрику данных, чтобы открыть страницу "Фабрика данных".

    Search for your data factory

  4. На странице фабрики данных, щелкните плитку Мониторинг и управление.

    Monitor & Manage tile

  5. Приложение Интеграция данных запускается на отдельной вкладке. Все запуски конвейера и их состояния можно просмотреть. Обратите внимание, что в следующем примере состояние выполнения конвейера имеет значение Успешно. Чтобы проверить параметры, переданные в конвейер, перейдите по ссылке в столбце Параметры. Если произошла ошибка, вы увидите ссылку в столбце Ошибка. Щелкните ссылку в столбце Действия.

    Screenshot shows pipeline runs for a data factory.

  6. Если щелкнуть ссылку в столбце Действия, вы увидите следующую страницу. На ней отображаются все выполняемые действия в конвейере.

    Screenshot shows activity runs for a data factory with the Pipelines link called out.

  7. Чтобы вернуться к представлению Запуски конвейера, щелкните ссылку Конвейеры, как показано на рисунке.

Проверьте результаты.

Вы увидите файл с именем incremental-<GUID>.txt в папке incchgtracking контейнера adftutorial.

Output file from full copy

Файл должен включать данные из базы данных:

1,aaaa,21
2,bbbb,24
3,cccc,20
4,dddd,26
5,eeee,22

Добавление данных в исходную таблицу

Выполните следующий запрос к базе данных, чтобы добавить и обновить строку.

INSERT INTO data_source_table
(PersonID, Name, Age)
VALUES
(6, 'new','50');


UPDATE data_source_table
SET [Age] = '10', [name]='update' where [PersonID] = 1

Создание конвейера для копирования разностных данных

В этом шаге вы создадите конвейер со следующими действиями для периодического выполнения: Действия поиска получают старое и новое значения SYS_CHANGE_VERSION из базы данных SQL Azure и передают их в действие копирования. Действие копирования копирует вставленные, обновленные или удаленные данные, связанные с двумя значениями SYS_CHANGE_VERSION, из Базы данных SQL Azure в хранилище BLOB-объектов. Действие хранимой процедуры обновляет значение SYS_CHANGE_VERSION для запуска следующего конвейера.

  1. Создайте JSON-файл IncrementalCopyPipeline.json в той же папке со следующим содержимым:

    {
        "name": "IncrementalCopyPipeline",
        "properties": {
            "activities": [
                {
                    "name": "LookupLastChangeTrackingVersionActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select * from table_store_ChangeTracking_version"
                        },
                        "dataset": {
                            "referenceName": "ChangeTrackingDataset",
                            "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "LookupCurrentChangeTrackingVersionActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "SELECT CHANGE_TRACKING_CURRENT_VERSION() as CurrentChangeTrackingVersion"
                        },
                        "dataset": {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "IncrementalCopyActivity",
                    "type": "Copy",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select data_source_table.PersonID,data_source_table.Name,data_source_table.Age, CT.SYS_CHANGE_VERSION, SYS_CHANGE_OPERATION from data_source_table RIGHT OUTER JOIN CHANGETABLE(CHANGES data_source_table, @{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.SYS_CHANGE_VERSION}) as CT on data_source_table.PersonID = CT.PersonID where CT.SYS_CHANGE_VERSION <= @{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}"
                        },
                        "sink": {
                            "type": "BlobSink"
                        }
                    },
                    "dependsOn": [
                        {
                            "activity": "LookupLastChangeTrackingVersionActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        },
                        {
                            "activity": "LookupCurrentChangeTrackingVersionActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "inputs": [
                        {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "SinkDataset",
                            "type": "DatasetReference"
                        }
                    ]
                },
                {
                    "name": "StoredProceduretoUpdateChangeTrackingActivity",
                    "type": "SqlServerStoredProcedure",
                    "typeProperties": {
                        "storedProcedureName": "Update_ChangeTracking_Version",
                        "storedProcedureParameters": {
                            "CurrentTrackingVersion": {
                                "value": "@{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}",
                                "type": "INT64"
                            },
                            "TableName": {
                                "value": "@{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.TableName}",
                                "type": "String"
                            }
                        }
                    },
                    "linkedServiceName": {
                        "referenceName": "AzureSQLDatabaseLinkedService",
                        "type": "LinkedServiceReference"
                    },
                    "dependsOn": [
                        {
                            "activity": "IncrementalCopyActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ]
                }
            ]
        }
    }
    
  2. Выполните командлет Set-AzDataFactoryV2Pipeline, чтобы создать конвейер: FullCopyPipeline.

     Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Пример выходных данных:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : ADFTutorialResourceGroup
     DataFactoryName   : IncCopyChgTrackingDF
     Activities        : {LookupLastChangeTrackingVersionActivity, LookupCurrentChangeTrackingVersionActivity, IncrementalCopyActivity, StoredProceduretoUpdateChangeTrackingActivity}
     Parameters        :
    

Запуск конвейера добавочного копирования

Запустите конвейер: incrementalCopyPipeline с помощью командлета Invoke-AzDataFactoryV2Pipeline .

Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName     

Мониторинг конвейера добавочного копирования

  1. В приложении интеграции данных обновите представление запусков конвейера. Вы должны увидеть в списке конвейер IncrementalCopyPipeline. Щелкните ссылку в столбце Действия.

    Screenshot shows pipeline runs for a data factory including your pipeline.

  2. Если щелкнуть ссылку в столбце Действия, вы увидите следующую страницу. На ней отображаются все выполняемые действия в конвейере.

    Screenshot shows pipeline runs for a data factory with several marked as succeeded.

  3. Чтобы вернуться к представлению Запуски конвейера, щелкните ссылку Конвейеры, как показано на рисунке.

Проверьте результаты.

Вы увидите второй файл в папке incchgtracking контейнера adftutorial.

Output file from incremental copy

Файл должен включать только разностные данные из базы данных. Запись с U — это обновленная строка в базе данных, а I — добавленная строка.

1,update,10,2,U
6,new,50,1,I

Первые три столбца — это измененные данные из таблицы data_source_table. Последние два столбцы — это метаданные из таблицы sys.change_tracking_tables. Четвертый столбец — это версия SYS_CHANGE_VERSION для каждой изменившейся строки. Пятый столбец — это операция: U = update, I = insert. Дополнительные сведения об отслеживании изменений см. в описании функции CHANGETABLE.

==================================================================
PersonID Name    Age    SYS_CHANGE_VERSION    SYS_CHANGE_OPERATION
==================================================================
1        update  10            2                                 U
6        new     50            1                                 I

В этом руководстве рассказывается о копировании новых и измененных файлов на основе параметра LastModifiedDate:

Copy new files by lastmodifieddate (Копирование новых файлов с использованием параметра LastModifiedDate)