Пошаговая загрузка данных из Базы данных SQL Azure в Хранилище BLOB-объектов Azure с помощью PowerShell

Область применения:Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

В этом руководстве показано, как с помощью Фабрики данных Azure создать конвейер, который загружает разностные данные из таблицы в службе "База данных SQL Azure" в Хранилище BLOB-объектов Azure.

В этом руководстве вы выполните следующие шаги:

  • Подготовили хранилище данных для хранения значений предела.
  • Создали фабрику данных.
  • Создали связанные службы.
  • Создали наборы данных источника, приемника и предела.
  • Создание конвейера.
  • Запуск конвейера.
  • Осуществили мониторинг выполнения конвейера.

Обзор

Ниже приведена общая схема решения.

Incrementally load data

Ниже приведены важные действия для создания этого решения.

  1. Выберите столбец для предела. Выберите один столбец в исходном хранилище данных, который можно использовать для создания среза новых или обновленных записей при каждом запуске. Как правило, данные в этом выбранном столбце (например, последнее_время_изменения или идентификатор) продолжают увеличиваться по мере создания или обновления строк. В качестве предела используется максимальное значение в этом столбце.

  2. Подготовьте хранилище данных для хранения значений предела.
    В этом руководстве вы сохраните значение предела в базе данных SQL.

  3. Создайте конвейер, используя следующий рабочий процесс:

    Конвейер в этом решении содержит следующие действия:

    • Создайте два действия поиска. Используйте первое действие поиска для получения последнего значения предела, а второе для получения нового значения предела. Эти значения передаются в действие копирования.
    • Создайте действие копирования, копирующее строки из исходного хранилища данных со значениями столбцов предела, которые выше значений старого предела и меньше или равно значениям нового. Затем оно копирует разностные данные из исходного хранилища данных в хранилище BLOB-объектов в качестве нового файла.
    • Создайте действие хранимой процедуры, которое обновляет значение предела для конвейера при последующем выполнении.

Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.

Предварительные требования

Примечание.

Мы рекомендуем использовать модуль Azure Az PowerShell для взаимодействия с Azure. Чтобы начать работу, см. статью Установка Azure PowerShell. Дополнительные сведения см. в статье Перенос Azure PowerShell с AzureRM на Az.

Создание таблицы источника данных в базе данных SQL

  1. Откройте SQL Server Management Studio. В обозревателе сервера щелкните правой кнопкой мыши базу данных и выберите Создать запрос.

  2. Выполните указанную ниже команду SQL для базы данных SQL, чтобы создать таблицу с именем data_source_table в качестве хранилища источника данных.

    create table data_source_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
    VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');
    

    В этом руководстве в качестве столбца предела мы будем использовать LastModifytime. В следующей таблице показаны данные в хранилище источника данных.

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    

Создание дополнительной таблицы в базе данных SQL для хранения значения верхнего предела

  1. Выполните указанную ниже команду SQL для базы данных SQL, чтобы создать таблицу с именем watermarktable для хранения значения предела.

    create table watermarktable
    (
    
    TableName varchar(255),
    WatermarkValue datetime,
    );
    
  2. Задайте значение по умолчанию верхнего предела, используя имя таблицы исходного хранилища данных. В этом руководстве используется таблица с именем data_source_table.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Просмотрите данные в таблице watermarktable.

    Select * from watermarktable
    

    Выходные данные:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

Создание хранимой процедуры в базе данных SQL

Выполните указанную ниже команду, чтобы создать хранимую процедуру в базе данных SQL.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Создание фабрики данных

  1. Определите переменную для имени группы ресурсов, которую в дальнейшем можно будет использовать в командах PowerShell. Скопируйте текст следующей команды в PowerShell, укажите имя группы ресурсов Azure в двойных кавычках, а затем выполните команду. Например, "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Если группа ресурсов уже имеется, вы можете не перезаписывать ее. Назначьте переменной $resourceGroupName другое значение и еще раз выполните команду.

  2. Определите переменную для расположения фабрики данных.

    $location = "East US"
    
  3. Чтобы создать группу ресурсов Azure, выполните следующую команду:

    New-AzResourceGroup $resourceGroupName $location
    

    Если группа ресурсов уже имеется, вы можете не перезаписывать ее. Назначьте переменной $resourceGroupName другое значение и еще раз выполните команду.

  4. Определите переменную для имени фабрики данных.

    Важно!

    Измените имя фабрики данных, чтобы сделать его глобально уникальным. Например, укажите ADFTutorialFactorySP1127.

    $dataFactoryName = "ADFIncCopyTutorialFactory";
    
  5. Чтобы создать фабрику данных, выполните командлет Set-AzDataFactoryV2.

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "East US" -Name $dataFactoryName
    

Обратите внимание на следующие аспекты:

  • Имя фабрики данных должно быть глобально уникальным. Если появляется следующая ошибка, измените имя и повторите попытку.

    The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.
    
  • Чтобы создать экземпляры фабрики данных, нужно назначить учетной записи пользователя, используемой для входа в Azure, роль участника, владельца либо администратора подписки Azure.

  • Чтобы получить список регионов Azure, в которых в настоящее время доступна Фабрика данных, выберите интересующие вас регионы на следующей странице, а затем разверните раздел Аналитика, чтобы найти пункт Фабрика данных: Доступность продуктов по регионам. Хранилища данных (служба хранилища, База данных SQL, Управляемый экземпляр SQL Azure и т. д.) и вычислительные ресурсы (Azure HDInsight и т. д.), используемые фабрикой данных, могут располагаться в других регионах.

Создание связанных служб

Связанная служба в фабрике данных связывает хранилища данных и службы вычислений с фабрикой данных. В этом разделе показано, как создать связанные службы для учетной записи хранения и Базы данных SQL.

Создание связанной службы хранилища

  1. Создайте JSON-файл с именем AzureStorageLinkedService.json в папке C:\ADF со следующим содержимым. (Создайте папку ADF, если она еще не существует.) Замените <accountName> имя <accountKey> и ключ учетной записи хранения перед сохранением файла.

    {
        "name": "AzureStorageLinkedService",
        "properties": {
            "type": "AzureStorage",
            "typeProperties": {
                "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
            }
        }
    }
    
  2. В PowerShell перейдите в папку ADF.

  3. Выполните командлет Set-AzDataFactoryV2LinkedService, чтобы создать связанную службу AzureStorageLinkedService. В указанном ниже примере вы передадите значения для параметров ResourceGroupName и DataFactoryName.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
    

    Пример выходных данных:

    LinkedServiceName : AzureStorageLinkedService
    ResourceGroupName : <resourceGroupName>
    DataFactoryName   : <dataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
    

Создание связанной службы базы данных SQL

  1. Создайте в папке C:\ADF файл JSON с именем AzureSQLDatabaseLinkedService.json со следующим содержимым. (Создайте папку ADF, если она еще не существует.) Замените сервер, базу данных, <идентификатор> пользователя и <пароль> именем сервера, базы данных>, идентификатора пользователя и пароля перед сохранением файла. <><

    {
        "name": "AzureSQLDatabaseLinkedService",
        "properties": {
            "type": "AzureSqlDatabase",
            "typeProperties": {
                "connectionString": "Server = tcp:<server>.database.windows.net,1433;Initial Catalog=<database>; Persist Security Info=False; User ID=<user> ; Password=<password>; MultipleActiveResultSets = False; Encrypt = True; TrustServerCertificate = False; Connection Timeout = 30;"
            }
        }
    }
    
  2. В PowerShell перейдите в папку ADF.

  3. Выполните командлет Set-AzDataFactoryV2LinkedService, чтобы создать связанную службу AzureSQLDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Пример выходных данных:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    ProvisioningState :
    

Создайте наборы данных.

На этом шаге вы создадите наборы данных, которые представляют данные источника и приемника.

Создание исходного набора данных

  1. Создайте файл JSON с именем SourceDataset.json в той же папке со следующим содержимым:

    {
        "name": "SourceDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "data_source_table"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    
    

    В этом руководстве используется таблица с именем data_source_table. Замените ее, если используется таблица с другим именем.

  2. Выполните командлет Set-AzDataFactoryV2Dataset, чтобы создать набор данных SourceDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Вот пример выходных данных командлета:

    DatasetName       : SourceDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Создание набора данных приемника

  1. Создайте файл JSON с именем SinkDataset.json в той же папке со следующим содержимым:

    {
        "name": "SinkDataset",
        "properties": {
            "type": "AzureBlob",
            "typeProperties": {
                "folderPath": "adftutorial/incrementalcopy",
                "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')",
                "format": {
                    "type": "TextFormat"
                }
            },
            "linkedServiceName": {
                "referenceName": "AzureStorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }   
    

    Важно!

    В этом фрагменте кода предполагается, что у вас есть контейнер BLOB-объектов с именем adftutorial в хранилище BLOB-объектов. Создайте контейнер (если его еще нет) или присвойте ему имя имеющегося контейнера. Выходная папка incrementalcopy создается автоматически, если она не существует в контейнере. В этом руководстве имя файла создается динамически с помощью выражения @CONCAT('Incremental-', pipeline().RunId, '.txt').

  2. Выполните командлет Set-AzDataFactoryV2Dataset, чтобы создать набор данных SinkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Вот пример выходных данных командлета:

    DatasetName       : SinkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset    
    

Создание набора данных для предела

На этом шаге вы создадите набор данных для хранения значения верхнего предела.

  1. Создайте файл JSON с именем WatermarkDataset.json в той же папке со следующим содержимым:

    {
        "name": " WatermarkDataset ",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "watermarktable"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }    
    
  2. Выполните командлет Set-AzDataFactoryV2Dataset, чтобы создать набор данных WatermarkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Вот пример выходных данных командлета:

    DatasetName       : WatermarkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset    
    

Создание конвейера

В этом руководстве вы создадите конвейер с двумя действиями поиска, одним действием копирования и одним действием хранимой процедуры, связанными в одном конвейере.

  1. Создайте JSON-файл IncrementalCopyPipeline.json в той же папке со следующим содержимым:

    {
        "name": "IncrementalCopyPipeline",
        "properties": {
            "activities": [
                {
                    "name": "LookupOldWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                        "type": "SqlSource",
                        "sqlReaderQuery": "select * from watermarktable"
                        },
    
                        "dataset": {
                        "referenceName": "WatermarkDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "LookupNewWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select MAX(LastModifytime) as NewWatermarkvalue from data_source_table"
                        },
    
                        "dataset": {
                        "referenceName": "SourceDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
    
                {
                    "name": "IncrementalCopyActivity",
                    "type": "Copy",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'"
                        },
                        "sink": {
                            "type": "BlobSink"
                        }
                    },
                    "dependsOn": [
                        {
                            "activity": "LookupNewWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        },
                        {
                            "activity": "LookupOldWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
    
                    "inputs": [
                        {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "SinkDataset",
                            "type": "DatasetReference"
                        }
                    ]
                },
    
                {
                    "name": "StoredProceduretoWriteWatermarkActivity",
                    "type": "SqlServerStoredProcedure",
                    "typeProperties": {
    
                        "storedProcedureName": "usp_write_watermark",
                        "storedProcedureParameters": {
                            "LastModifiedtime": {"value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type": "datetime" },
                            "TableName":  { "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"String"}
                        }
                    },
    
                    "linkedServiceName": {
                        "referenceName": "AzureSQLDatabaseLinkedService",
                        "type": "LinkedServiceReference"
                    },
    
                    "dependsOn": [
                        {
                            "activity": "IncrementalCopyActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ]
                }
            ]
    
        }
    }
    
  2. Выполните командлет Set-AzDataFactoryV2Pipeline, чтобы создать конвейер IncrementalCopyPipeline.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Пример выходных данных:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : ADF
     DataFactoryName   : incrementalloadingADF
     Activities        : {LookupOldWaterMarkActivity, LookupNewWaterMarkActivity, IncrementalCopyActivity, StoredProceduretoWriteWatermarkActivity}
     Parameters        :
    

Запуск конвейера

  1. Запустите конвейер IncrementalCopyPipeline, выполнив командлет Invoke-AzDataFactoryV2Pipeline. Замените заполнители собственными именами группы ресурсов и фабрики данных.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  2. Проверьте состояние конвейера, выполняя командлет Get-AzDataFactoryV2ActivityRun, пока не увидите, что все действия выполняются успешно. Замените заполнители нужными значениями времени для параметров RunStartedAfter и RunStartedBefore. В этом руководстве вы используете -RunStartedAfter "2017/09/14" и -RunStartedBefore "2017/09/15".

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    Пример выходных данных:

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:42:50 AM
    DurationInMs      : 7777
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:43:07 AM
    DurationInMs      : 25437
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:10 AM
    ActivityRunEnd    : 9/14/2017 7:43:29 AM
    DurationInMs      : 19769
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:32 AM
    ActivityRunEnd    : 9/14/2017 7:43:47 AM
    DurationInMs      : 14467
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    

Проверьте результаты.

  1. В хранилище BLOB-объектов (хранилище приемника) вы должны увидеть, что данные скопированы в файл, заданный в SinkDataset. В этом руководстве в качестве имени файла используется Incremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txt. Открыв файл, вы увидите, что записи в файле идентичны данным в базе данных SQL.

    1,aaaa,2017-09-01 00:56:00.0000000
    2,bbbb,2017-09-02 05:23:00.0000000
    3,cccc,2017-09-03 02:36:00.0000000
    4,dddd,2017-09-04 03:21:00.0000000
    5,eeee,2017-09-05 08:06:00.0000000
    
  2. Проверьте последнее значение из watermarktable. Вы увидите, что значение предела было обновлено.

    Select * from watermarktable
    

    Пример выходных данных:

    TableName WatermarkValue
    data_source_table 2017-09-05 8:06:00.000

Добавление данных в хранилище источника данных для проверки загрузки разностных данных

  1. Добавьте новые данные в базу данных SQL (хранилище источника данных).

    INSERT INTO data_source_table
    VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
    
    INSERT INTO data_source_table
    VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
    

    Обновленные данные в базе данных SQL выглядят следующим образом:

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    6 | newdata | 2017-09-06 02:23:00.000
    7 | newdata | 2017-09-07 09:01:00.000
    
  2. Повторно запустите конвейер IncrementalCopyPipeline, выполнив командлет Invoke-AzDataFactoryV2Pipeline. Замените заполнители собственными именами группы ресурсов и фабрики данных.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  3. Проверьте состояние конвейера, выполняя командлет Get-AzDataFactoryV2ActivityRun, пока не увидите, что все действия выполняются успешно. Замените заполнители нужными значениями времени для параметров RunStartedAfter и RunStartedBefore. В этом руководстве вы используете -RunStartedAfter "2017/09/14" и -RunStartedBefore "2017/09/15".

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    Пример выходных данных:

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:58 AM
    DurationInMs      : 31758
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:52 AM
    DurationInMs      : 25497
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:00 AM
    ActivityRunEnd    : 9/14/2017 8:53:20 AM
    DurationInMs      : 20194
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:23 AM
    ActivityRunEnd    : 9/14/2017 8:53:41 AM
    DurationInMs      : 18502
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    
  4. В хранилище BLOB-объектов вы увидите, что был создан другой файл. В этом руководстве новый файл имеет имя Incremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txt. Открыв этот файл, вы увидите записи двух строк.

  5. Проверьте последнее значение из watermarktable. Вы увидите, что значение предела было обновлено снова.

    Select * from watermarktable
    

    Пример выходных данных:

    TableName WatermarkValue
    data_source_table 2017-09-07 09:01:00.000

В этом руководстве вы выполнили следующие шаги:

  • Подготовили хранилище данных для хранения значений предела.
  • Создали фабрику данных.
  • Создали связанные службы.
  • Создали наборы данных источника, приемника и предела.
  • Создание конвейера.
  • Запуск конвейера.
  • Осуществили мониторинг выполнения конвейера.

В этом руководстве с помощью конвейера показано, как скопировать данные из одной таблицы в Базе данных SQL в Хранилище BLOB-объектов. Перейдите к следующему руководству, чтобы узнать о копировании данных из нескольких таблиц в Базе данных SQL Server в Базу данных SQL.