Поделиться через


Копирование данных из хранилища BLOB-объектов Azure в Базу данных SQL Azure с помощью фабрики данных Azure

ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

В этом руководстве вы создадите фабрику данных с конвейером, который перемещает данные из хранилища BLOB-объектов Azure в Базу данных SQL Azure. Шаблон конфигурации в этом руководстве применяется к копированию из файлового в реляционное хранилище данных. Список хранилищ данных, которые поддерживаются в качестве источников и приемников, см. в разделе Поддерживаемые хранилища данных и форматы.

При работе с этим руководством вы выполните следующие задачи:

  • Создали фабрику данных.
  • Создание связанных служб хранилища Azure и базы данных SQL Azure.
  • Создание наборов данных большого двоичного объекта Azure и базы данных SQL Azure.
  • Создание конвейера с действием копирования.
  • Запуск конвейера.
  • Мониторинг конвейера и выполнения действий.

В этом руководстве используется пакет SDK для .NET. Вы можете использовать другие механизмы для взаимодействия с Фабрикой данных Azure (см. примеры в разделе Краткие руководства).

Если у вас еще нет подписки Azure, создайте бесплатную учетную запись Azure, прежде чем начинать работу.

Необходимые компоненты

Создание большого двоичного объекта и таблицы SQL

Теперь подготовьте большой двоичный объект Azure и Базу данных SQL Azure для работы с этим руководством, создав BLOB-объект в качестве источника и таблицу SQL в качестве приемника.

Создание исходного большого двоичного объекта

Сначала нужно создать исходный BLOB-объект. Для этого создайте контейнер и отправьте в него входной текстовый файл.

  1. Откройте Блокнот. Скопируйте следующий текст и сохраните его в локальный файл с именем inputEmp.txt.

    John|Doe
    Jane|Doe
    
  2. С помощью Обозревателя службы хранилища Azure или аналогичного средства создайте контейнер adfv2tutorial и отправьте в него файл inputEmp.txt.

Создание таблицы-приемника SQL

Теперь создайте таблицу SQL в качестве приемника.

  1. Используйте следующий скрипт SQL, чтобы создать таблицу dbo.emp в базе данных SQL Azure.

    CREATE TABLE dbo.emp
    (
        ID int IDENTITY(1,1) NOT NULL,
        FirstName varchar(50),
        LastName varchar(50)
    )
    GO
    
    CREATE CLUSTERED INDEX IX_emp_ID ON dbo.emp (ID);
    
  2. Предоставьте службам Azure доступ к базе данных SQL. Убедитесь, что доступ к службам Azure разрешен для вашего сервера, чтобы служба "Фабрика данных" могла записывать данные в базу данных SQL. Чтобы проверить и при необходимости включить этот параметр, сделайте следующее.

    1. Перейдите на портал Azure для управления сервером SQL Server. Выполните поиск по фразе Серверы SQL и выберите этот вариант.

    2. Выберите нужный сервер.

    3. В меню сервера SQL выберите в разделе Безопасность элемент Брандмауэры и виртуальные сети.

    4. На странице Брандмауэры и виртуальные сети для параметра Разрешить доступ к серверу службам и ресурсам Azure выберите значение ВКЛ.

Создание проекта Visual Studio

С помощью Visual Studio создайте консольное приложение C# .NET.

  1. Откройте Visual Studio.
  2. В окне Начало работы выберите Создать проект.
  3. В окне Создание проекта выберите версию C# для элемента Консольное приложение (.NET Framework) в списке типов проекта. Затем выберите Далее.
  4. В окне Настроить новый проект введите для параметра Имя проекта значение ADFv2Tutorial. Для параметра Расположение найдите или создайте каталог, в который будет сохранен проект. Затем выберите Создать. Новый проект появится в интегрированной среде разработки Visual Studio.

Установка пакетов Nuget

Теперь установите обязательные пакеты библиотек с помощью диспетчера пакетов NuGet.

  1. В строке меню последовательно выберите Средства>Диспетчер пакетов NuGet>Консоль диспетчера пакетов.

  2. На панели Консоль диспетчера пакетов выполните следующие команды, чтобы установить пакеты. Дополнительные сведения см. в документации по пакету NuGet Microsoft.Azure.Management.DataFactory для Фабрики данных Azure.

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -PreRelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

Создание клиента фабрики данных

Чтобы создать клиент фабрики данных, сделайте следующее.

  1. Откройте файл Program.cs и замените все существующие инструкции using следующим кодом, чтобы добавить ссылки на пространства имен.

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Rest.Serialization;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. Добавьте приведенный ниже код в метод Main, в котором задаются переменные. Замените значения 14 заполнителей на собственные.

    Список регионов Azure, в которых сейчас доступна Фабрика данных см. на странице Доступность продуктов по регионам. В раскрывающемся списке Продукты выберите Обзор>Аналитика>Фабрика данных. Затем в раскрывающемся списке Регионы выберите все интересующие регионы. Отобразится сетка со сведениями о доступности продуктов Фабрики данных для выбранных регионов.

    Примечание.

    Используемые Фабрикой данных хранилища данных, например служба хранилища Azure и База данных SQL Azure, и вычислительные среды, например HDInsight, могут находиться не в том регионе, который выбран для самой Фабрики данных.

    // Set variables
    string tenantID = "<your tenant ID>";
    string applicationId = "<your application ID>";
    string authenticationKey = "<your authentication key for the application>";
    string subscriptionId = "<your subscription ID to create the factory>";
    string resourceGroup = "<your resource group to create the factory>";
    
    string region = "<location to create the data factory in, such as East US>";
    string dataFactoryName = "<name of data factory to create (must be globally unique)>";
    
    // Specify the source Azure Blob information
    string storageAccount = "<your storage account name to copy data>";
    string storageKey = "<your storage account key>";
    string inputBlobPath = "adfv2tutorial/";
    string inputBlobName = "inputEmp.txt";
    
    // Specify the sink Azure SQL Database information
    string azureSqlConnString =
        "Server=tcp:<your server name>.database.windows.net,1433;" +
        "Database=<your database name>;" +
        "User ID=<your username>@<your server name>;" +
        "Password=<your password>;" +
        "Trusted_Connection=False;Encrypt=True;Connection Timeout=30";
    string azureSqlTableName = "dbo.emp";
    
    string storageLinkedServiceName = "AzureStorageLinkedService";
    string sqlDbLinkedServiceName = "AzureSqlDbLinkedService";
    string blobDatasetName = "BlobDataset";
    string sqlDatasetName = "SqlDataset";
    string pipelineName = "Adfv2TutorialBlobToSqlCopy";
    
  3. Добавьте в метод Main следующий код, который создает экземпляр класса DataFactoryManagementClient. Этот объект используется не только для создания фабрики данных, связанной службы, наборов данных и конвейера, но и для отслеживания подробностей выполнения конвейера.

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync(
        "https://management.azure.com/", cc
    ).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
    

Создание фабрики данных

Добавьте в метод Main следующий код, который создает фабрику данных.

// Create a data factory
Console.WriteLine("Creating a data factory " + dataFactoryName + "...");
Factory dataFactory = new Factory
{
    Location = region,
    Identity = new FactoryIdentity()
};

client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings)
);

while (
    client.Factories.Get(
        resourceGroup, dataFactoryName
    ).ProvisioningState == "PendingCreation"
)
{
    System.Threading.Thread.Sleep(1000);
}

Создание связанных служб

Работая с этим руководством, вы создадите две связанные службы для источника и приемника, соответственно.

Создание связанной службы хранилища Azure

Добавьте в метод Main приведенный ниже код, который создает связанную службу хранилища Azure. Информацию о поддерживаемых свойствах и подробные сведения см. в статье о свойствах связанной службы BLOB-объектов Azure.

// Create an Azure Storage linked service
Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");

LinkedServiceResource storageLinkedService = new LinkedServiceResource(
    new AzureStorageLinkedService
    {
        ConnectionString = new SecureString(
            "DefaultEndpointsProtocol=https;AccountName=" + storageAccount +
            ";AccountKey=" + storageKey
        )
    }
);

client.LinkedServices.CreateOrUpdate(
    resourceGroup, dataFactoryName, storageLinkedServiceName, storageLinkedService
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings)
);

Создание связанной службы Базы данных SQL Azure

Добавьте в метод Main приведенный ниже код, который создает связанную службу Базы данных SQL Azure. Информацию о поддерживаемых свойствах и подробные сведения см. в статье о свойствах связанной службы Базы данных SQL Azure.

// Create an Azure SQL Database linked service
Console.WriteLine("Creating linked service " + sqlDbLinkedServiceName + "...");

LinkedServiceResource sqlDbLinkedService = new LinkedServiceResource(
    new AzureSqlDatabaseLinkedService
    {
        ConnectionString = new SecureString(azureSqlConnString)
    }
);

client.LinkedServices.CreateOrUpdate(
    resourceGroup, dataFactoryName, sqlDbLinkedServiceName, sqlDbLinkedService
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(sqlDbLinkedService, client.SerializationSettings)
);

Создайте наборы данных.

В этом разделе вы создадите два набора данных: по одному для источника и приемника.

Создание набора данных для исходного большого двоичного объекта Azure

Добавьте в метод Main приведенный ниже код, который создает набор данных большого двоичного объекта Azure. Информацию о поддерживаемых свойствах и подробные сведения см. в разделе о свойствах набора данных BLOB-объектов Azure.

Задайте набор данных, представляющий исходные данные в большом двоичном объекте Azure. Этот набор данных большого двоичного объекта относится к связанной службе хранилища Azure, созданной на предыдущем шаге, и описывает следующее:

  • расположение большого двоичного объекта для копирования: FolderPath и FileName;
  • формат большого двоичного объекта, указывающий, как анализировать содержимое, TextFormat и все параметры, как например разделитель столбцов;
  • структура данных, включая имена столбцов и типы данных, которые в нашем примере сопоставляются с таблицей-приемником SQL.
// Create an Azure Blob dataset
Console.WriteLine("Creating dataset " + blobDatasetName + "...");
DatasetResource blobDataset = new DatasetResource(
    new AzureBlobDataset
    {
        LinkedServiceName = new LinkedServiceReference {
            ReferenceName = storageLinkedServiceName
        },
        FolderPath = inputBlobPath,
        FileName = inputBlobName,
        Format = new TextFormat { ColumnDelimiter = "|" },
        Structure = new List<DatasetDataElement>
        {
            new DatasetDataElement { Name = "FirstName", Type = "String" },
            new DatasetDataElement { Name = "LastName", Type = "String" }
        }
    }
);

client.Datasets.CreateOrUpdate(
    resourceGroup, dataFactoryName, blobDatasetName, blobDataset
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings)
);

Создание набора данных для базы данных-приемника SQL Azure

Добавьте в метод Main приведенный ниже код, который создает набор данных Базы данных SQL Azure. Информацию о поддерживаемых свойствах и подробные сведения см. в разделе о свойствах набора данных Базы данных SQL Azure.

Задайте набор данных, который представляет данные приемника в Базе данных SQL Azure. Этот набор данных относится к связанной службе Базы данных SQL Azure, которую вы создали на предыдущем шаге. Он также указывает таблицу SQL, которая содержит копируемые данных.

// Create an Azure SQL Database dataset
Console.WriteLine("Creating dataset " + sqlDatasetName + "...");
DatasetResource sqlDataset = new DatasetResource(
    new AzureSqlTableDataset
    {
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = sqlDbLinkedServiceName
        },
        TableName = azureSqlTableName
    }
);

client.Datasets.CreateOrUpdate(
    resourceGroup, dataFactoryName, sqlDatasetName, sqlDataset
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(sqlDataset, client.SerializationSettings)
);

Создание конвейера

Добавьте в метод Main приведенный ниже код, который создает конвейер с действием копирования. В этом руководстве этот конвейер содержит одно действие CopyActivity, которое принимает в качестве источника набор данных большого двоичного объекта, а в качестве приемника — набор данных SQL. Информацию о поддерживаемых свойствах и подробные сведения см. в разделе Действие копирования в Фабрике данных Azure.

// Create a pipeline with copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
PipelineResource pipeline = new PipelineResource
{
    Activities = new List<Activity>
    {
        new CopyActivity
        {
            Name = "CopyFromBlobToSQL",
            Inputs = new List<DatasetReference>
            {
                new DatasetReference() { ReferenceName = blobDatasetName }
            },
            Outputs = new List<DatasetReference>
            {
                new DatasetReference { ReferenceName = sqlDatasetName }
            },
            Source = new BlobSource { },
            Sink = new SqlSink { }
        }
    }
};

client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings)
);

Создание конвейера

Добавьте в метод Main следующий код, который активирует выполнение конвейера.

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
    resourceGroup, dataFactoryName, pipelineName
).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Мониторинг выполнения конвейера

Теперь вставьте код, чтобы проверять состояния выполнения конвейера и получать сведения о выполнении действий копирования.

  1. Добавьте в метод Main следующий код, который постоянно проверяет состояния выполнения конвейера, пока тот не завершит копирование данных.

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(
            resourceGroup, dataFactoryName, runResponse.RunId
        );
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    
  2. Добавьте в метод Main следующий код, который извлекает сведения о выполнении действия копирования, например размер записанных и прочитанных данных.

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    RunFilterParameters filterParams = new RunFilterParameters(
        DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)
    );
    
    ActivityRunsQueryResponse queryResponse = client.ActivityRuns.QueryByPipelineRun(
        resourceGroup, dataFactoryName, runResponse.RunId, filterParams
    );
    
    if (pipelineRun.Status == "Succeeded")
    {
        Console.WriteLine(queryResponse.Value.First().Output);
    }
    else
        Console.WriteLine(queryResponse.Value.First().Error);
    
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

Выполнение кода

Создайте приложение, выбрав Сборка>Собрать решение. Создайте и запустите приложение, выбрав Отладка>Начать отладку, а затем проверьте выполнение конвейера.

Консоль выведет ход выполнения создания фабрики данных, связанной службы, наборов данных, конвейера и выполнения конвейера. Затем она проверяет состояние выполнения конвейера. Дождитесь появления сведений о действии копирования с информацией о размере записанных и прочитанных данных. Затем вы можете с помощью SSMS (SQL Server Management Studio), Visual Studio или аналогичных средств подключиться к Базе данных SQL Azure и проверить, скопированы ли данные в указанную таблицу.

Пример полученных результатов

Creating a data factory AdfV2Tutorial...
{
  "identity": {
    "type": "SystemAssigned"
  },
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "properties": {
    "type": "AzureStorage",
    "typeProperties": {
      "connectionString": {
        "type": "SecureString",
        "value": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
      }
    }
  }
}
Creating linked service AzureSqlDbLinkedService...
{
  "properties": {
    "type": "AzureSqlDatabase",
    "typeProperties": {
      "connectionString": {
        "type": "SecureString",
        "value": "Server=tcp:<servername>.database.windows.net,1433;Database=<databasename>;User ID=<username>@<servername>;Password=<password>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
      }
    }
  }
}
Creating dataset BlobDataset...
{
  "properties": {
    "type": "AzureBlob",
    "typeProperties": {
      "folderPath": "adfv2tutorial/",
      "fileName": "inputEmp.txt",
      "format": {
        "type": "TextFormat",
        "columnDelimiter": "|"
      }
    },
    "structure": [
      {
        "name": "FirstName",
        "type": "String"
      },
      {
        "name": "LastName",
        "type": "String"
      }
    ],
    "linkedServiceName": {
      "type": "LinkedServiceReference",
      "referenceName": "AzureStorageLinkedService"
    }
  }
}
Creating dataset SqlDataset...
{
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": "dbo.emp"
    },
    "linkedServiceName": {
      "type": "LinkedServiceReference",
      "referenceName": "AzureSqlDbLinkedService"
    }
  }
}
Creating pipeline Adfv2TutorialBlobToSqlCopy...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "BlobDataset"
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SqlDataset"
          }
        ],
        "name": "CopyFromBlobToSQL"
      }
    ]
  }
}
Creating pipeline run...
Pipeline run ID: 1cd03653-88a0-4c90-aabc-ae12d843e252
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 18,
  "dataWritten": 28,
  "rowsCopied": 2,
  "copyDuration": 2,
  "throughput": 0.01,
  "errors": [],
  "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)",
  "usedDataIntegrationUnits": 2,
  "billedDuration": 2
}

Press any key to exit...

В этом примере конвейер копирует данные из одного расположения в другое в хранилище BLOB-объектов Azure. Вы научились выполнять следующие задачи:

  • Создали фабрику данных.
  • Создание связанных служб хранилища Azure и базы данных SQL Azure.
  • Создание наборов данных большого двоичного объекта Azure и базы данных SQL Azure.
  • Создание конвейера с действием копирования.
  • Запуск конвейера.
  • Мониторинг конвейера и выполнения действий.

Перейдите к следующему руководству, чтобы узнать о копировании данных из локальной среды в облако: