Добавочная загрузка данных из нескольких таблиц в SQL Server в Базу данных SQL Azure с использованием PowerShell

Область применения:Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

В этом кратком руководстве показано, как создать фабрику данных Azure с конвейером, который загружает разностные данные из нескольких таблиц базы данных SQL Server в Базу данных SQL Azure.

В этом руководстве вы выполните следующие шаги:

  • подготовите исходное и конечное хранилища данных;
  • Создали фабрику данных.
  • Создание локальной среды выполнения интеграции.
  • Установка среды выполнения интеграции.
  • Создали связанные службы.
  • Создали наборы данных источника, приемника и предела.
  • создадите и запустите конвейер, а также начнете его мониторинг;
  • Проверка результатов.
  • Добавление или обновление данных в исходных таблицах.
  • Повторный запуск конвейера и выполнение его мониторинга.
  • Просмотр окончательных результатов.

Обзор

Ниже приведены важные действия для создания этого решения.

  1. Выберите столбец для предела.

    Выберите один столбец из каждой таблицы в исходном хранилище данных, который можно использовать для идентификации новых или обновленных записей при каждом запуске. Как правило, данные в этом выбранном столбце (например, последнее_время_изменения или идентификатор) продолжают увеличиваться по мере создания или обновления строк. В качестве предела используется максимальное значение в этом столбце.

  2. Подготовьте хранилище данных для хранения значений предела.

    В этом руководстве вы сохраните значение предела в базе данных SQL.

  3. Создайте конвейер, следуя инструкциям ниже.

    1. Создайте действие ForEach, которое выполняет итерацию по списку имен исходной таблицы. Этот список передается в конвейер в качестве параметра. В каждой исходной таблице этот параметр вызывает следующие действия для загрузки разностных данных для этой таблицы.

    2. Создание двух действий поиска. Используйте первое действие поиска для получения последнего значения предела, а второе для получения нового значения предела. Эти значения передаются в действие копирования.

    3. Создайте действие копирования, копирующее строки из исходного хранилища данных со значениями столбцов предела, которые выше значений старого предела и меньше или равно значениям нового. Затем оно копирует разностные данные из исходного хранилища данных в хранилище BLOB-объектов Azure в качестве нового файла.

    4. Создайте действие хранимой процедуры, которое обновляет значение предела для конвейера при последующем выполнении.

    Ниже приведена общая схема решения.

    Incrementally load data

Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.

Необходимые компоненты

  • SQL Server. В этом учебнике используйте базу данных SQL Server в качестве исходного хранилища данных.
  • База данных SQL Azure. Базу данных в службе "База данных SQL Azure" следует использовать в качестве принимающего хранилища данных. Если у вас нет базы данных SQL, создайте ее, следуя указаниям из руководства Создание отдельной базы данных в Базе данных SQL Azure.

Создание исходных таблиц в базе данных SQL Server

  1. Откройте SQL Server Management Studio (SSMS) или Azure Data Studio и подключитесь к базе данных SQL Server.

  2. В Обозревателе сервера (SSMS) или в Области подключения (Azure Data Studio), щелкните правой кнопкой мыши на базу данных и выберите Новый запрос.

  3. Выполните следующую команду SQL в базе данных, чтобы создать таблицы с именами customer_table и project_table.

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    
     INSERT INTO customer_table
     (PersonID, Name, LastModifytime)
     VALUES
     (1, 'John','9/1/2017 12:56:00 AM'),
     (2, 'Mike','9/2/2017 5:23:00 AM'),
     (3, 'Alice','9/3/2017 2:36:00 AM'),
     (4, 'Andy','9/4/2017 3:21:00 AM'),
     (5, 'Anny','9/5/2017 8:06:00 AM');
    
     INSERT INTO project_table
     (Project, Creationtime)
     VALUES
     ('project1','1/1/2015 0:00:00 AM'),
     ('project2','2/2/2016 1:23:00 AM'),
     ('project3','3/4/2017 5:16:00 AM');
    

Создание целевых таблиц в базе данных SQL Azure

  1. Откройте SQL Server Management Studio (SSMS) или Azure Data Studio и подключитесь к базе данных SQL Server.

  2. В Обозревателе сервера (SSMS) или в Области подключения (Azure Data Studio), щелкните правой кнопкой мыши на базу данных и выберите Новый запрос.

  3. Выполните следующую команду SQL в базе данных, чтобы создать таблицы с именами customer_table и project_table.

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    

Создание дополнительной таблицы в Базе данных SQL Azure для хранения значения верхнего предела

  1. Выполните указанную ниже команду SQL для базы данных, чтобы создать таблицу с именем watermarktable для хранения значения предела.

     create table watermarktable
     (
    
         TableName varchar(255),
         WatermarkValue datetime,
     );
    
  2. Вставьте исходные значения предела для обеих исходных таблиц в таблицу значений предела.

     INSERT INTO watermarktable
     VALUES
     ('customer_table','1/1/2010 12:00:00 AM'),
     ('project_table','1/1/2010 12:00:00 AM');
    

Создание хранимой процедуры в Базе данных SQL Azure

Выполните указанную ниже команду, чтобы создать хранимую процедуру в своей базе данных. Эта хранимая процедура обновляет значение предела после каждого запуска конвейера.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Создание типов данных и дополнительных хранимых процедур в Базе данных SQL Azure

Выполните следующий запрос, чтобы создать две хранимые процедуры и два типа данных в своей базе данных. Они используются для объединения данных из исходных в целевые таблицы.

Чтобы упростить начало работы, мы непосредственно используем эти хранимые процедуры, передающие разностные данные в табличной переменной, а затем объединяем их в целевое хранилище. Учтите, что для хранения в табличной переменной не ожидается большое число строк разностных данных (более 100).

Если нужно объединить большое количество строк разностных данных в целевом хранилище, мы рекомендуем с помощью действия копирования сначала скопировать разностные данные во временную "промежуточную" таблицу в целевом хранилище, а затем создать собственную хранимую процедуру, не используя табличную переменную для их объединения из "промежуточной" таблицы в "итоговую".

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Azure PowerShell

Чтобы установить новые модули Azure PowerShell, выполните инструкции из статьи Установка и настройка Azure PowerShell.

Создание фабрики данных

  1. Определите переменную для имени группы ресурсов, которую в дальнейшем можно будет использовать в командах PowerShell. Скопируйте текст следующей команды в PowerShell, укажите имя группы ресурсов Azure в двойных кавычках, а затем выполните команду. Например, "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Если группа ресурсов уже имеется, вы можете не перезаписывать ее. Назначьте переменной $resourceGroupName другое значение и еще раз выполните команду.

  2. Определите переменную для расположения фабрики данных.

    $location = "East US"
    
  3. Чтобы создать группу ресурсов Azure, выполните следующую команду:

    New-AzResourceGroup $resourceGroupName $location
    

    Если группа ресурсов уже имеется, вы можете не перезаписывать ее. Назначьте переменной $resourceGroupName другое значение и еще раз выполните команду.

  4. Определите переменную для имени фабрики данных.

    Важно!

    Измените имя фабрики данных, чтобы сделать его глобально уникальным. Например, ADFIncMultiCopyTutorialFactorySP1127.

    $dataFactoryName = "ADFIncMultiCopyTutorialFactory";
    
  5. Чтобы создать фабрику данных, выполните командлет Set-AzDataFactoryV2.

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

Обратите внимание на следующие аспекты:

  • Имя фабрики данных должно быть глобально уникальным. Если появляется следующая ошибка, измените имя и повторите попытку.

    Set-AzDataFactoryV2 : HTTP Status Code: Conflict
    Error Code: DataFactoryNameInUse
    Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
    
  • Чтобы создать экземпляры фабрики данных, нужно назначить учетной записи пользователя, используемой для входа в Azure, роль участника, владельца либо администратора подписки Azure.

  • Чтобы получить список регионов Azure, в которых в настоящее время доступна Фабрика данных, выберите интересующие вас регионы на следующей странице, а затем разверните раздел Аналитика, чтобы найти пункт Фабрика данных: Доступность продуктов по регионам. Хранилища данных (служба хранилища Azure, База данных SQL, Управляемый экземпляр SQL и т. д.) и вычислительные ресурсы (Azure HDInsight и т. д.), используемые фабрикой данных, могут располагаться в других регионах.

Создание локальной среды выполнения интеграции

В этом разделе вы создадите локальную среду выполнения интеграции и свяжете ее с локальным компьютером, на котором находится база данных SQL Server. Локальная среда выполнения интеграции — это компонент, который копирует данные с SQL Server на компьютере в Базу данных SQL Azure.

  1. Создайте переменную для имени среды выполнения интеграции. Используйте уникальное имя и запишите его. Оно будет использоваться далее в этом руководстве.

    $integrationRuntimeName = "ADFTutorialIR"
    
  2. Создание локальной среды выполнения интеграции.

    Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Пример выходных данных:

     Name              : <Integration Runtime name>
     Type              : SelfHosted
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Description       : 
     Id                : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
    
  3. Чтобы получить состояние созданной среды выполнения интеграции, выполните следующую команду. Убедитесь, что свойству State присвоено значение NeedRegistration.

    Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
    

    Пример выходных данных:

    State                     : NeedRegistration
    Version                   : 
    CreateTime                : 9/24/2019 6:00:00 AM
    AutoUpdate                : On
    ScheduledUpdateDate       : 
    UpdateDelayOffset         : 
    LocalTimeZoneOffset       : 
    InternalChannelEncryption : 
    Capabilities              : {}
    ServiceUrls               : {eu.frontend.clouddatahub.net}
    Nodes                     : {}
    Links                     : {}
    Name                      : ADFTutorialIR
    Type                      : SelfHosted
    ResourceGroupName         : <ResourceGroup name>
    DataFactoryName           : <DataFactory name>
    Description               : 
    Id                        : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
    
  4. Чтобы получить ключи проверки подлинности для регистрации локальной среды выполнения интеграции в службе фабрики данных Azure в облаке, выполните следующую команду:

    Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
    

    Пример выходных данных:

    {
     "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=",
     "AuthKey2":  "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy="
    }
    
  5. Скопируйте один из ключей (без двойных кавычек), используемый для регистрации локальной среды выполнения интеграции, которую вы установите на компьютер на следующих шагах.

Установка средства среды выполнения интеграции

  1. Если на вашем компьютере уже установлена среда выполнения интеграции, удалите ее в разделе Установка и удаление программ.

  2. Скачайте локальную среду выполнения интеграции на локальный компьютер под управлением Windows. Запустите установку.

  3. На странице приветствия мастера установки Microsoft Integration Runtime нажмите кнопку Далее.

  4. На странице Лицензионное соглашение примите условия использования и лицензионное соглашение и нажмите кнопку Далее.

  5. На странице Конечная папка нажмите кнопку Далее.

  6. На странице Ready to install Microsoft Integration Runtime (Все готово для установки Microsoft Integration Runtime) нажмите кнопку Установить.

  7. На странице Completed the Microsoft Integration Runtime Setup (Мастер установки Microsoft Integration Runtime завершил работу) нажмите кнопку Готово.

  8. На странице Регистрация Integration Runtime (Self-hosted) вставьте ключ, созданный в предыдущем разделе, и выберите Зарегистрировать.

    Register the integration runtime

  9. На странице Новый узел среды выполнения интеграции (с локальным размещением) нажмите кнопку Finish (Завершить)

  10. Когда локальная среда выполнения интеграции будет успешно зарегистрирована, вы увидите следующее сообщение:

    Registered successfully

  11. На странице Регистрация Integration Runtime (Self-hosted) выберите Запустить Configuration Manager.

  12. Когда узел будет подключен к облачной службе, отобразится следующая страница:

    Node is connected page

  13. Теперь проверьте возможность подключения к базе данных SQL Server.

    Diagnostics tab

    a. На странице Configuration Manager перейдите на вкладку Diagnostics (Диагностика).

    b. В качестве типа источника данных выберите SqlServer.

    c. Введите имя сервера.

    d. Введите имя базы данных.

    д) Выберите режим проверки подлинности.

    е) Введите имя пользователя.

    ж. Введите пароль, связанный с именем пользователя.

    h. Нажмите кнопку проверки, чтобы убедиться, что эта среда выполнения интеграции может подключаться к серверу SQL Server. Если подключение установлено успешно, появится зеленый флажок. Если подключение не установлено, появится сообщение об ошибке. Исправьте ошибки и проверьте, может ли среда выполнения интеграции подключаться к SQL Server.

    Примечание.

    Запишите значения типа проверки подлинности, сервера, базы данных, пользователя и пароль. Они будут использоваться далее в этом руководстве.

Создание связанных служб

Связанная служба в фабрике данных связывает хранилища данных и службы вычислений с фабрикой данных. В рамках этого раздела вы создадите связанные службы для базы данных SQL Server и базы данных в службе "База данных SQL Azure".

Создание связанной службы SQL Server

На этом шаге вы свяжете базу данных SQL Server с фабрикой данных.

  1. Создайте JSON-файл с именемSqlServerLinkedService.json в папке C:\ADFTutorials\IncCopyMultiTableTutorial (создайте локальные папки, если они не существуют) со следующим содержимым. Выберите правильный раздел в зависимости от типа проверки подлинности, который используется для подключения к SQL Server.

    Важно!

    Выберите правильный раздел в зависимости от типа проверки подлинности, который используется для подключения к SQL Server.

    Если используется проверка подлинности SQL, скопируйте следующее определение JSON:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>"
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Если используется проверка подлинности Windows, скопируйте следующее определение JSON:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>",
                 "userName":"<username> or <domain>\\<username>",
                 "password":{
                     "type":"SecureString",
                     "value":"<password>"
                 }
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Важно!

    • Выберите правильный раздел в зависимости от типа проверки подлинности, который используется для подключения к SQL Server.
    • Замените <integration runtime name> именем своей среды выполнения интеграции.
    • Перед сохранением файла замените <servername>, <databasename>, <username> и <password> значениями имени сервера, базы данных, пользователя и пароля для базы данных SQL Server.
    • Если в имени учетной записи пользователя или имени сервера необходимо использовать символ косой черты (\), добавьте escape-символ (\). Например, mydomain\\myuser.
  2. В PowerShell запустите следующий командлет, чтобы перейти к папке C:\ADFTutorials\IncCopyMultiTableTutorial.

    Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
    
  3. Выполните командлет Set-AzDataFactoryV2LinkedService, чтобы создать связанную службу AzureStorageLinkedService. В указанном ниже примере вы передадите значения для параметров ResourceGroupName и DataFactoryName.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
    

    Пример выходных данных:

    LinkedServiceName : SqlServerLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
    

Создание связанной службы Базы данных SQL

  1. Создайте JSON-файл с именемAzureSQLDatabaseLinkedService.json в папке C:\ADFTutorials\IncCopyMultiTableTutorial и добавьте в него следующее содержимое. (Создайте папку ADF, если она еще не существует.) Замените <имя> сервера, имя> базы данных, <<имя> пользователя и <пароль> именем базы данных SQL Server, именем базы данных, именем пользователя и паролем перед сохранением файла.

     {
         "name":"AzureSQLDatabaseLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"AzureSqlDatabase",
             "typeProperties":{
                 "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;"
             }
         }
     }
    
  2. В PowerShell выполните командлет Set-AzDataFactoryV2LinkedService, чтобы создать связанную службу AzureSQLDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Пример выходных данных:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

Создайте наборы данных.

На этом шаге вы создадите наборы данных для представления источника данных, назначение данных и место для хранения предела.

Создание исходного набора данных

  1. Создайте файл JSON с именем SourceDataset.json в той же папке со следующим содержимым:

    {
         "name":"SourceDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"SqlServerLinkedService",
                 "type":"LinkedServiceReference"
             },
             "annotations":[
    
             ],
             "type":"SqlServerTable",
             "schema":[
    
             ]
         }
    }
    

    Действие копирования в конвейере использует SQL-запрос для загрузки данных, вместо того чтобы загружать всю таблицу.

  2. Выполните командлет Set-AzDataFactoryV2Dataset, чтобы создать набор данных SourceDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Вот пример выходных данных командлета:

    DatasetName       : SourceDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
    

Создание набора данных приемника

  1. Создайте файл JSON с именем SinkDataset.json в той же папке со следующим содержимым. Элемент tableName динамически задается конвейером в среде выполнения. Действие ForEach в конвейере выполняет итерацию по списку имен таблиц и передает имя таблицы для этого набора данных в каждой итерации.

     {
         "name":"SinkDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"AzureSQLDatabaseLinkedService",
                 "type":"LinkedServiceReference"
             },
             "parameters":{
                 "SinkTableName":{
                     "type":"String"
                 }
             },
             "annotations":[
    
             ],
             "type":"AzureSqlTable",
             "typeProperties":{
                 "tableName":{
                     "value":"@dataset().SinkTableName",
                     "type":"Expression"
                 }
             }
         }
     }
    
  2. Выполните командлет Set-AzDataFactoryV2Dataset, чтобы создать набор данных SinkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Вот пример выходных данных командлета:

    DatasetName       : SinkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Создание набора данных для предела

На этом шаге вы создадите набор данных для хранения значения верхнего предела.

  1. Создайте файл JSON с именем WatermarkDataset.json в той же папке со следующим содержимым:

     {
         "name": " WatermarkDataset ",
         "properties": {
             "type": "AzureSqlTable",
             "typeProperties": {
                 "tableName": "watermarktable"
             },
             "linkedServiceName": {
                 "referenceName": "AzureSQLDatabaseLinkedService",
                 "type": "LinkedServiceReference"
             }
         }
     }
    
  2. Выполните командлет Set-AzDataFactoryV2Dataset, чтобы создать набор данных WatermarkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Вот пример выходных данных командлета:

    DatasetName       : WatermarkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Создание конвейера

Этот конвейер принимает список имен таблиц в качестве параметра. Действие ForEach выполняет итерацию по списку имен таблиц, а затем выполняет следующие операции:

  1. Использует действие поиска, чтобы получить старое значение предела (начальное значение или значение, используемое в последней итерации).

  2. Использует действие поиска, чтобы получить новое значение предела (максимальное значение в столбце предела в исходной таблице).

  3. Использует действие копирования, чтобы скопировать данные между двумя значениями пределов из исходной в целевую базу данных.

  4. Использует действие хранимой процедуры, чтобы обновить старое значение предела для использования на первом шаге следующей итерации.

Создание конвейера

  1. Создайте JSON-файл с именем IncrementalCopyPipeline.json в той же папке со следующим содержимым:

     {
         "name":"IncrementalCopyPipeline",
         "properties":{
             "activities":[
                 {
                     "name":"IterateSQLTables",
                     "type":"ForEach",
                     "dependsOn":[
    
                     ],
                     "userProperties":[
    
                     ],
                     "typeProperties":{
                         "items":{
                             "value":"@pipeline().parameters.tableList",
                             "type":"Expression"
                         },
                         "isSequential":false,
                         "activities":[
                             {
                                 "name":"LookupOldWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"AzureSqlSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"WatermarkDataset",
                                         "type":"DatasetReference"
                                     }
                                 }
                             },
                             {
                                 "name":"LookupNewWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     },
                                     "firstRowOnly":true
                                 }
                             },
                             {
                                 "name":"IncrementalCopyActivity",
                                 "type":"Copy",
                                 "dependsOn":[
                                     {
                                         "activity":"LookupOldWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     },
                                     {
                                         "activity":"LookupNewWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "sink":{
                                         "type":"AzureSqlSink",
                                         "sqlWriterStoredProcedureName":{
                                             "value":"@{item().StoredProcedureNameForMergeOperation}",
                                             "type":"Expression"
                                         },
                                         "sqlWriterTableType":{
                                             "value":"@{item().TableType}",
                                             "type":"Expression"
                                         },
                                         "storedProcedureTableTypeParameterName":{
                                             "value":"@{item().TABLE_NAME}",
                                             "type":"Expression"
                                         },
                                         "disableMetricsCollection":false
                                     },
                                     "enableStaging":false
                                 },
                                 "inputs":[
                                     {
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     }
                                 ],
                                 "outputs":[
                                     {
                                         "referenceName":"SinkDataset",
                                         "type":"DatasetReference",
                                         "parameters":{
                                             "SinkTableName":{
                                                 "value":"@{item().TABLE_NAME}",
                                                 "type":"Expression"
                                             }
                                         }
                                     }
                                 ]
                             },
                             {
                                 "name":"StoredProceduretoWriteWatermarkActivity",
                                 "type":"SqlServerStoredProcedure",
                                 "dependsOn":[
                                     {
                                         "activity":"IncrementalCopyActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "storedProcedureName":"[dbo].[usp_write_watermark]",
                                     "storedProcedureParameters":{
                                         "LastModifiedtime":{
                                             "value":{
                                                 "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}",
                                                 "type":"Expression"
                                             },
                                             "type":"DateTime"
                                         },
                                         "TableName":{
                                             "value":{
                                                 "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}",
                                                 "type":"Expression"
                                             },
                                             "type":"String"
                                         }
                                     }
                                 },
                                 "linkedServiceName":{
                                     "referenceName":"AzureSQLDatabaseLinkedService",
                                     "type":"LinkedServiceReference"
                                 }
                             }
                         ]
                     }
                 }
             ],
             "parameters":{
                 "tableList":{
                     "type":"array"
                 }
             },
             "annotations":[
    
             ]
         }
     }
    
  2. Выполните командлет Set-AzDataFactoryV2Pipeline, чтобы создать конвейер IncrementalCopyPipeline.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Пример выходных данных:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Activities        : {IterateSQLTables}
     Parameters        : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
    

Запуск конвейера

  1. Создайте файл параметров с именем Parameters.json в той же папке со следующим содержимым:

     {
         "tableList":
         [
             {
                 "TABLE_NAME": "customer_table",
                 "WaterMark_Column": "LastModifytime",
                 "TableType": "DataTypeforCustomerTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
             },
             {
                 "TABLE_NAME": "project_table",
                 "WaterMark_Column": "Creationtime",
                 "TableType": "DataTypeforProjectTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
             }
         ]
     }
    
  2. Запустите конвейер IncrementalCopyPipeline, выполнив командлет Invoke-AzDataFactoryV2Pipeline. Замените заполнители собственными именами группы ресурсов и фабрики данных.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    

Мониторинг конвейера

  1. Войдите на портал Azure.

  2. Выберите Все службы, выполните поиск по фразе Фабрики данных и выберите Фабрики данных.

  3. Найдите и выберите в списке свою фабрику данных, чтобы открыть страницу Фабрика данных.

  4. На странице Фабрика данных выберите Открыть на плитке Открыть студию Фабрики данных Azure, чтобы запустить пользовательский интерфейс Фабрики данных на отдельной вкладке.

  5. На домашней странице Фабрики данных Azure выберите слева Монитор.

    Screenshot shows the home page for Azure Data Factory.

  6. Вы можете увидеть все запуски конвейеров и их состояние. Обратите внимание, что в следующем примере состояние выполнения конвейера имеет значение Успешно. Чтобы проверить параметры, переданные в конвейер, щелкните ссылку в столбце Параметры. Если произошла ошибка, вы увидите ссылку в столбце Ошибка.

    Screenshot shows pipeline runs for a data factory including your pipeline.

  7. Если щелкнуть ссылку в столбце Действия, вы увидите все выполняемые действия в конвейере.

  8. Выберите All Pipeline Runs (Все запуски конвейера), чтобы вернуться к представлению Pipeline Runs (Запуски конвейера).

Проверьте результаты.

В SQL Server Management Studio выполните следующие запросы в целевой базе данных SQL. Так вы проверите, что данные скопированы из исходных таблиц в целевые.

Запрос

select * from customer_table

Выходные данные

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            Alice    2017-09-03 02:36:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Запрос

select * from project_table

Выходные данные

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000

Запрос

select * from watermarktable

Выходные данные

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-05 08:06:00.000
project_table    2017-03-04 05:16:00.000

Обратите внимание, что значения предела для обеих таблиц обновились.

Добавление данных в исходные таблицы

Выполните следующий запрос в исходной базе данных SQL Server, чтобы обновить имеющуюся строку в таблице customer_table. Вставьте новую строку в таблицу project_table.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

Повторный запуск конвейера

  1. Теперь снова запустите конвейер. Для этого выполните следующую команду PowerShell:

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    
  2. Чтобы отслеживать выполнение конвейера, следуйте инструкциям из раздела Мониторинг конвейера. Когда конвейер находится в состоянии Выполняется, отображается ссылка на другое действие в столбце Действия, с помощью которой можно отменить выполнение конвейера.

  3. Выберите Обновить, чтобы обновить список, пока конвейер не будет выполнен.

  4. (Необязательно.) Щелкните ссылку View Activity Runs (Просмотреть выполнения действий) в области Действия, чтобы просмотреть все выполнения действий, связанные с этим запуском конвейера.

Просмотр окончательных результатов

В SQL Server Management Studio выполните следующие запросы в целевой базе данных SQL Azure. Так вы проверите, что обновленные и новые данные скопированы из исходных таблиц в целевые.

Запрос

select * from customer_table

Выходные данные

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            NewName    2017-09-08 00:00:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Обратите внимание на новые значения в столбцах Name и LastModifytime для строки под номером 3 в столбце PersonID.

Запрос

select * from project_table

Выходные данные

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000
NewProject    2017-10-01 00:00:00.000

Обратите внимание, что запись NewProject добавлена в таблицу project_table.

Запрос

select * from watermarktable

Выходные данные

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-08 00:00:00.000
project_table    2017-10-01 00:00:00.000

Обратите внимание, что значения предела для обеих таблиц обновились.

В этом руководстве вы выполнили следующие шаги:

  • подготовите исходное и конечное хранилища данных;
  • Создали фабрику данных.
  • Создание локальной среды выполнения интеграции (IR).
  • Установка среды выполнения интеграции.
  • Создали связанные службы.
  • Создали наборы данных источника, приемника и предела.
  • создадите и запустите конвейер, а также начнете его мониторинг;
  • Проверка результатов.
  • Добавление или обновление данных в исходных таблицах.
  • Повторный запуск конвейера и выполнение его мониторинга.
  • Просмотр окончательных результатов.

Перейдите к следующему руководству, чтобы узнать о преобразовании данных с помощью кластера Spark в Azure: