Поделиться через


Копирование данных из Azure Blob в базу данных Azure SQL с использованием Azure Data Factory.

ПРИМЕНИМО К: Azure Data Factory Azure Synapse Analytics

Совет

Data Factory в Microsoft Fabric — это следующее поколение Azure Data Factory с более простой архитектурой, встроенным ИИ и новыми функциями. Если вы не знакомы с интеграцией данных, начните с Fabric Data Factory. Существующие рабочие нагрузки ADF могут обновляться до Fabric для доступа к новым возможностям в области обработки и анализа данных, аналитики в режиме реального времени и отчетов.

В этом руководстве вы создадите конвейер фабрики данных, который копирует данные из Azure Blob Storage в Azure SQL Database. Шаблон конфигурации в этом руководстве применяется к копированию из файлового в реляционное хранилище данных. Список хранилищ данных, которые поддерживаются в качестве источников и приемников, см. в разделе Поддерживаемые хранилища данных и форматы.

При работе с этим руководством вы выполните следующие задачи:

  • Создали фабрику данных.
  • Создайте Azure Storage и связанные службы Azure SQL Database.
  • Создайте наборы данных объектов BLOB Azure и базы данных Azure SQL.
  • Создайте конвейер, содержащий действие копирования.
  • Запуск конвейера.
  • Следите за конвейером и запусками заданий.

В этом руководстве используется пакет SDK .NET. Вы можете использовать другие механизмы для взаимодействия с Azure Data Factory; см. примеры в разделе Quickstarts.

Если у вас нет подписки Azure, создайте учетную запись free Azure перед началом работы.

Предварительные требования

Создание BLOB и таблицы SQL

Теперь подготовьте Azure BLOB-объект и Azure SQL Database для этого руководства, создав исходный BLOB-объект и таблицу SQL-хранилища.

Создание исходного BLOB

Сначала нужно создать исходный BLOB-объект. Для этого создайте контейнер и отправьте в него входной текстовый файл.

  1. Откройте Блокнот. Скопируйте следующий текст и сохраните его в локальный файл с именем inputEmp.txt.

    John|Doe
    Jane|Doe
    
  2. Используйте средство, например Azure Storage Explorer для создания контейнера adfv2tutorial и отправки файла inputEmp.txt в контейнер.

Создание таблицы-приемника SQL

Далее создайте таблицу SQL для приема данных.

  1. Используйте следующий скрипт SQL, чтобы создать таблицу dbo.emp в Azure SQL Database.

    CREATE TABLE dbo.emp
    (
        ID int IDENTITY(1,1) NOT NULL,
        FirstName varchar(50),
        LastName varchar(50)
    )
    GO
    
    CREATE CLUSTERED INDEX IX_emp_ID ON dbo.emp (ID);
    
  2. Разрешить службам Azure доступ к базе данных SQL. Убедитесь, что вы разрешаете доступ к службам Azure на сервере, чтобы служба фабрики данных могла записывать данные в базу данных SQL. Чтобы проверить и при необходимости включить этот параметр, сделайте следующее.

    1. Перейдите на портал Azure для управления сервером SQL Server. Выполните поиск по фразе Серверы SQL и выберите этот вариант.

    2. Выберите нужный сервер.

    3. В меню сервера SQL выберите в разделе Безопасность элемент Брандмауэры и виртуальные сети.

    4. На странице Брандмауэр и виртуальные сети в разделе Разрешить службам и ресурсам Azure доступ к этому серверу выберите ВКЛ.

Создание проекта Visual Studio

С помощью Visual Studio создайте консольное приложение .NET C#.

  1. Откройте Visual Studio.
  2. В окне Начало работы выберите Создать проект.
  3. В окне Create новый проект выберите версию C# Console App (.NET Framework) из списка типов проектов. Затем выберите Далее.
  4. В окне Настройка вашего нового проекта введите название проектаADFv2Tutorial. Для параметра Расположение найдите или создайте каталог, в который будет сохранен проект. Затем выберите Создать. Новый проект появится в Visual Studio IDE.

Установка пакетов Nuget

Теперь установите обязательные пакеты библиотек с помощью диспетчера пакетов NuGet.

  1. В строке меню выберите Tools>NuGet Package Manager>Package Manager Console.

  2. В панели консоль Package Manager выполните следующие команды для установки пакетов. Сведения о пакете NuGet Azure Data Factory см. в разделе Microsoft.Azure. Management.DataFactory.

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -PreRelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

Создать клиента для фабрики данных

Чтобы создать клиент фабрики данных, сделайте следующее.

  1. Откройте файл Program.cs и замените все существующие инструкции using следующим кодом, чтобы добавить ссылки на пространства имен.

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Rest.Serialization;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. Добавьте приведенный ниже код в метод Main, в котором задаются переменные. Замените значения 14 заполнителей на собственные.

    Чтобы просмотреть список регионов Azure, в которых сейчас доступен Data Factory, см. в разделе Продукты, доступные по регионам. В раскрывающемся списке Продукты выберите Обзор>Аналитика>Фабрика данных. Затем в раскрывающемся списке Регионы выберите все интересующие регионы. Отобразится сетка со сведениями о доступности продуктов Фабрики данных для выбранных регионов.

    Примечание.

    Хранилища данных, такие как Azure Storage и Azure SQL Database, и вычислительные ресурсы, такие как HDInsight, используются фабрикой данных, могут находиться в других регионах, отличных от того, что вы выбираете для фабрики данных.

    // Set variables
    string tenantID = "<your tenant ID>";
    string applicationId = "<your application ID>";
    string authenticationKey = "<your authentication key for the application>";
    string subscriptionId = "<your subscription ID to create the factory>";
    string resourceGroup = "<your resource group to create the factory>";
    
    string region = "<location to create the data factory in, such as East US>";
    string dataFactoryName = "<name of data factory to create (must be globally unique)>";
    
    // Specify the source Azure Blob information
    string storageAccount = "<your storage account name to copy data>";
    string storageKey = "<your storage account key>";
    string inputBlobPath = "adfv2tutorial/";
    string inputBlobName = "inputEmp.txt";
    
    // Specify the sink Azure SQL Database information
    string azureSqlConnString =
        "Server=tcp:<your server name>.database.windows.net,1433;" +
        "Database=<your database name>;" +
        "User ID=<your username>@<your server name>;" +
        "Password=<your password>;" +
        "Trusted_Connection=False;Encrypt=True;Connection Timeout=30";
    string azureSqlTableName = "dbo.emp";
    
    string storageLinkedServiceName = "AzureStorageLinkedService";
    string sqlDbLinkedServiceName = "AzureSqlDbLinkedService";
    string blobDatasetName = "BlobDataset";
    string sqlDatasetName = "SqlDataset";
    string pipelineName = "Adfv2TutorialBlobToSqlCopy";
    
  3. Добавьте в метод Main следующий код, который создает экземпляр класса DataFactoryManagementClient. Этот объект используется для создания фабрики данных, связанной службы, наборов данных и конвейера. Вы также используете этот объект для отслеживания деталей выполнения конвейера.

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync(
        "https://management.azure.com/", cc
    ).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
    

Создание фабрики данных

Добавьте в метод Main следующий код, который создает фабрику данных.

// Create a data factory
Console.WriteLine("Creating a data factory " + dataFactoryName + "...");
Factory dataFactory = new Factory
{
    Location = region,
    Identity = new FactoryIdentity()
};

client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings)
);

while (
    client.Factories.Get(
        resourceGroup, dataFactoryName
    ).ProvisioningState == "PendingCreation"
)
{
    System.Threading.Thread.Sleep(1000);
}

Создание связанных служб

Работая с этим руководством, вы создадите две связанные службы для источника и приемника, соответственно.

Создание связанной службы Azure Storage

Добавьте следующий код в метод Main, который создает связанную службу Azure Storage. Сведения о поддерживаемых свойствах и деталях см. в разделе свойства связанной службы Azure Blob.

// Create an Azure Storage linked service
Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");

LinkedServiceResource storageLinkedService = new LinkedServiceResource(
    new AzureStorageLinkedService
    {
        ConnectionString = new SecureString(
            "DefaultEndpointsProtocol=https;AccountName=" + storageAccount +
            ";AccountKey=" + storageKey
        )
    }
);

client.LinkedServices.CreateOrUpdate(
    resourceGroup, dataFactoryName, storageLinkedServiceName, storageLinkedService
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings)
);

Создание связанной службы Azure SQL Database

Добавьте следующий код в метод Main, который создает связанную службу Azure SQL Database. Сведения о поддерживаемых свойствах и подробностях см. в разделе Свойства связанной службы Azure SQL Database.

// Create an Azure SQL Database linked service
Console.WriteLine("Creating linked service " + sqlDbLinkedServiceName + "...");

LinkedServiceResource sqlDbLinkedService = new LinkedServiceResource(
    new AzureSqlDatabaseLinkedService
    {
        ConnectionString = new SecureString(azureSqlConnString)
    }
);

client.LinkedServices.CreateOrUpdate(
    resourceGroup, dataFactoryName, sqlDbLinkedServiceName, sqlDbLinkedService
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(sqlDbLinkedService, client.SerializationSettings)
);

Создайте наборы данных.

В этом разделе вы создадите два набора данных: по одному для источника и приемника.

Создание набора данных для исходного Azure BLOB-объекта

Добавьте следующий код в метод Main, который создает набор данных объектов blob Azure. Сведения о поддерживаемых свойствах и деталях см. в разделе свойства набора данных Azure Blob.

Вы определяете набор данных, представляющий исходные данные в Azure BLOB-объекте. Этот набор данных BLOB-объектов ссылается на связанную службу Azure Storage, созданную на предыдущем шаге, и описывает следующее:

  • Расположение BLOB, с которого необходимо выполнить копирование: FolderPath и FileName.
  • Формат BLOB, указывающий, как анализировать содержимое: TextFormat и его параметры, такие как разделитель столбцов.
  • структура данных, включая имена столбцов и типы данных, которые в нашем примере сопоставляются с таблицей-приемником SQL.
// Create an Azure Blob dataset
Console.WriteLine("Creating dataset " + blobDatasetName + "...");
DatasetResource blobDataset = new DatasetResource(
    new AzureBlobDataset
    {
        LinkedServiceName = new LinkedServiceReference {
            ReferenceName = storageLinkedServiceName
        },
        FolderPath = inputBlobPath,
        FileName = inputBlobName,
        Format = new TextFormat { ColumnDelimiter = "|" },
        Structure = new List<DatasetDataElement>
        {
            new DatasetDataElement { Name = "FirstName", Type = "String" },
            new DatasetDataElement { Name = "LastName", Type = "String" }
        }
    }
);

client.Datasets.CreateOrUpdate(
    resourceGroup, dataFactoryName, blobDatasetName, blobDataset
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings)
);

Создание набора данных для приемника Azure SQL Database

Добавьте следующий код в метод Main, который создает набор данных Azure SQL Database. Для получения информации о поддерживаемых свойствах и других деталях см. раздел Свойства набора данных Azure SQL Database.

Вы определяете набор данных, представляющий данные приемника в Azure SQL Database. Этот набор данных относится к связанной службе Azure SQL Database, созданной на предыдущем шаге. Он также указывает таблицу SQL, которая содержит копируемые данных.

// Create an Azure SQL Database dataset
Console.WriteLine("Creating dataset " + sqlDatasetName + "...");
DatasetResource sqlDataset = new DatasetResource(
    new AzureSqlTableDataset
    {
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = sqlDbLinkedServiceName
        },
        TableName = azureSqlTableName
    }
);

client.Datasets.CreateOrUpdate(
    resourceGroup, dataFactoryName, sqlDatasetName, sqlDataset
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(sqlDataset, client.SerializationSettings)
);

Создание конвейера

Добавьте следующий код в метод Main, который создает конвейер с операцией копирования. В этом руководстве этот конвейер содержит одно действие CopyActivity, которое принимает в качестве источника набор данных Blob, а в качестве приемника — набор данных SQL. Информацию о сведениях о копировании данных см. в разделе Copy activity в Azure Data Factory.

// Create a pipeline with copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
PipelineResource pipeline = new PipelineResource
{
    Activities = new List<Activity>
    {
        new CopyActivity
        {
            Name = "CopyFromBlobToSQL",
            Inputs = new List<DatasetReference>
            {
                new DatasetReference() { ReferenceName = blobDatasetName }
            },
            Outputs = new List<DatasetReference>
            {
                new DatasetReference { ReferenceName = sqlDatasetName }
            },
            Source = new BlobSource { },
            Sink = new SqlSink { }
        }
    }
};

client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings)
);

Создание конвейера

Добавьте в метод Main следующий код, который активирует выполнение конвейера.

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
    resourceGroup, dataFactoryName, pipelineName
).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Мониторинг выполнения конвейера

Теперь вставьте код, чтобы проверять состояния выполнения конвейера и получать сведения о выполнении действий копирования.

  1. Добавьте в метод Main следующий код, который постоянно проверяет состояния выполнения конвейера, пока тот не завершит копирование данных.

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(
            resourceGroup, dataFactoryName, runResponse.RunId
        );
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    
  2. Добавьте в метод Main следующий код, который извлекает сведения о выполнении действия копирования, например размер записанных и прочитанных данных.

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    RunFilterParameters filterParams = new RunFilterParameters(
        DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)
    );
    
    ActivityRunsQueryResponse queryResponse = client.ActivityRuns.QueryByPipelineRun(
        resourceGroup, dataFactoryName, runResponse.RunId, filterParams
    );
    
    if (pipelineRun.Status == "Succeeded")
    {
        Console.WriteLine(queryResponse.Value.First().Output);
    }
    else
        Console.WriteLine(queryResponse.Value.First().Error);
    
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

Выполнение кода

Создайте приложение, выбрав Build>Собрать решение. Создайте и запустите приложение, выбрав Отладка>Начать отладку, а затем проверьте выполнение конвейера.

Консоль печатает процесс создания фабрики данных, связанной службы, наборов данных, конвейера и выполнения конвейера. Затем она проверяет состояние выполнения конвейера. Дождитесь появления деталей процесса копирования с информацией о размере прочитанных и записанных данных. Затем с помощью таких средств, как SQL Server Management Studio (SSMS) или Visual Studio, можно подключиться к целевому Azure SQL Database и проверить, содержит ли указанная целевая таблица скопированные данные.

Пример полученных результатов

Creating a data factory AdfV2Tutorial...
{
  "identity": {
    "type": "SystemAssigned"
  },
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "properties": {
    "type": "AzureStorage",
    "typeProperties": {
      "connectionString": {
        "type": "SecureString",
        "value": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
      }
    }
  }
}
Creating linked service AzureSqlDbLinkedService...
{
  "properties": {
    "type": "AzureSqlDatabase",
    "typeProperties": {
      "connectionString": {
        "type": "SecureString",
        "value": "Server=tcp:<servername>.database.windows.net,1433;Database=<databasename>;User ID=<username>@<servername>;Password=<password>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
      }
    }
  }
}
Creating dataset BlobDataset...
{
  "properties": {
    "type": "AzureBlob",
    "typeProperties": {
      "folderPath": "adfv2tutorial/",
      "fileName": "inputEmp.txt",
      "format": {
        "type": "TextFormat",
        "columnDelimiter": "|"
      }
    },
    "structure": [
      {
        "name": "FirstName",
        "type": "String"
      },
      {
        "name": "LastName",
        "type": "String"
      }
    ],
    "linkedServiceName": {
      "type": "LinkedServiceReference",
      "referenceName": "AzureStorageLinkedService"
    }
  }
}
Creating dataset SqlDataset...
{
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": "dbo.emp"
    },
    "linkedServiceName": {
      "type": "LinkedServiceReference",
      "referenceName": "AzureSqlDbLinkedService"
    }
  }
}
Creating pipeline Adfv2TutorialBlobToSqlCopy...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "BlobDataset"
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SqlDataset"
          }
        ],
        "name": "CopyFromBlobToSQL"
      }
    ]
  }
}
Creating pipeline run...
Pipeline run ID: 1cd03653-88a0-4c90-aabc-ae12d843e252
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 18,
  "dataWritten": 28,
  "rowsCopied": 2,
  "copyDuration": 2,
  "throughput": 0.01,
  "errors": [],
  "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)",
  "usedDataIntegrationUnits": 2,
  "billedDuration": 2
}

Press any key to exit...

Pipeline в этом примере копирует данные из одного места в другое место в хранилище блобов Azure. Вы научились выполнять следующие задачи:

  • Создали фабрику данных.
  • Создайте Azure Storage и связанные службы Azure SQL Database.
  • Создайте наборы данных объектов BLOB Azure и базы данных Azure SQL.
  • Создайте конвейер с действием копирования.
  • Запуск конвейера.
  • Следите за конвейером и запусками заданий.

Перейдите к следующему руководству, чтобы узнать о копировании данных из локальной среды в облако: