Копирование данных из хранилища BLOB-объектов Azure в Базу данных SQL Azure с помощью фабрики данных Azure
ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics
Совет
Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !
В этом руководстве вы создадите фабрику данных с конвейером, который перемещает данные из хранилища BLOB-объектов Azure в Базу данных SQL Azure. Шаблон конфигурации в этом руководстве применяется к копированию из файлового в реляционное хранилище данных. Список хранилищ данных, которые поддерживаются в качестве источников и приемников, см. в разделе Поддерживаемые хранилища данных и форматы.
При работе с этим руководством вы выполните следующие задачи:
- Создали фабрику данных.
- Создание связанных служб хранилища Azure и базы данных SQL Azure.
- Создание наборов данных большого двоичного объекта Azure и базы данных SQL Azure.
- Создание конвейера с действием копирования.
- Запуск конвейера.
- Мониторинг конвейера и выполнения действий.
В этом руководстве используется пакет SDK для .NET. Вы можете использовать другие механизмы для взаимодействия с Фабрикой данных Azure (см. примеры в разделе Краткие руководства).
Если у вас еще нет подписки Azure, создайте бесплатную учетную запись Azure, прежде чем начинать работу.
Необходимые компоненты
- Учетная запись хранения Azure. В этом руководстве в качестве источника будет использоваться хранилище BLOB-объектов. Если у вас нет учетной записи хранения Azure, см. инструкции по созданию учетной записи хранения общего назначения.
- База данных SQL Azure. Используйте базу данных как хранилище данных-приемник. Если у вас нет базы данных в службе "База данных SQL Azure", воспользуйтесь кратким руководством Создание отдельной базы данных в Базе данных SQL Azure.
- Visual Studio. В этом пошаговом руководстве используется Visual Studio 2019.
- Пакет Azure SDK для .NET.
- Приложение Microsoft Entra. Если у вас нет приложения Microsoft Entra, см. раздел "Создание приложения Microsoft Entra" статьи "Практическое руководство. Создание приложения Microsoft Entra с помощью портала". Скопируйте следующие значения для использования в последующих шагах: идентификатор приложения (клиента), ключ проверки подлинности и идентификатор каталога (клиента). Назначьте приложению роль Участник, следуя указаниям в той же статье.
Создание большого двоичного объекта и таблицы SQL
Теперь подготовьте большой двоичный объект Azure и Базу данных SQL Azure для работы с этим руководством, создав BLOB-объект в качестве источника и таблицу SQL в качестве приемника.
Создание исходного большого двоичного объекта
Сначала нужно создать исходный BLOB-объект. Для этого создайте контейнер и отправьте в него входной текстовый файл.
Откройте Блокнот. Скопируйте следующий текст и сохраните его в локальный файл с именем inputEmp.txt.
John|Doe Jane|Doe
С помощью Обозревателя службы хранилища Azure или аналогичного средства создайте контейнер adfv2tutorial и отправьте в него файл inputEmp.txt.
Создание таблицы-приемника SQL
Теперь создайте таблицу SQL в качестве приемника.
Используйте следующий скрипт SQL, чтобы создать таблицу dbo.emp в базе данных SQL Azure.
CREATE TABLE dbo.emp ( ID int IDENTITY(1,1) NOT NULL, FirstName varchar(50), LastName varchar(50) ) GO CREATE CLUSTERED INDEX IX_emp_ID ON dbo.emp (ID);
Предоставьте службам Azure доступ к базе данных SQL. Убедитесь, что доступ к службам Azure разрешен для вашего сервера, чтобы служба "Фабрика данных" могла записывать данные в базу данных SQL. Чтобы проверить и при необходимости включить этот параметр, сделайте следующее.
Перейдите на портал Azure для управления сервером SQL Server. Выполните поиск по фразе Серверы SQL и выберите этот вариант.
Выберите нужный сервер.
В меню сервера SQL выберите в разделе Безопасность элемент Брандмауэры и виртуальные сети.
На странице Брандмауэры и виртуальные сети для параметра Разрешить доступ к серверу службам и ресурсам Azure выберите значение ВКЛ.
Создание проекта Visual Studio
С помощью Visual Studio создайте консольное приложение C# .NET.
- Откройте Visual Studio.
- В окне Начало работы выберите Создать проект.
- В окне Создание проекта выберите версию C# для элемента Консольное приложение (.NET Framework) в списке типов проекта. Затем выберите Далее.
- В окне Настроить новый проект введите для параметра Имя проекта значение ADFv2Tutorial. Для параметра Расположение найдите или создайте каталог, в который будет сохранен проект. Затем выберите Создать. Новый проект появится в интегрированной среде разработки Visual Studio.
Установка пакетов Nuget
Теперь установите обязательные пакеты библиотек с помощью диспетчера пакетов NuGet.
В строке меню последовательно выберите Средства>Диспетчер пакетов NuGet>Консоль диспетчера пакетов.
На панели Консоль диспетчера пакетов выполните следующие команды, чтобы установить пакеты. Дополнительные сведения см. в документации по пакету NuGet Microsoft.Azure.Management.DataFactory для Фабрики данных Azure.
Install-Package Microsoft.Azure.Management.DataFactory Install-Package Microsoft.Azure.Management.ResourceManager -PreRelease Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
Создание клиента фабрики данных
Чтобы создать клиент фабрики данных, сделайте следующее.
Откройте файл Program.cs и замените все существующие инструкции
using
следующим кодом, чтобы добавить ссылки на пространства имен.using System; using System.Collections.Generic; using System.Linq; using Microsoft.Rest; using Microsoft.Rest.Serialization; using Microsoft.Azure.Management.ResourceManager; using Microsoft.Azure.Management.DataFactory; using Microsoft.Azure.Management.DataFactory.Models; using Microsoft.IdentityModel.Clients.ActiveDirectory;
Добавьте приведенный ниже код в метод
Main
, в котором задаются переменные. Замените значения 14 заполнителей на собственные.Список регионов Azure, в которых сейчас доступна Фабрика данных см. на странице Доступность продуктов по регионам. В раскрывающемся списке Продукты выберите Обзор>Аналитика>Фабрика данных. Затем в раскрывающемся списке Регионы выберите все интересующие регионы. Отобразится сетка со сведениями о доступности продуктов Фабрики данных для выбранных регионов.
Примечание.
Используемые Фабрикой данных хранилища данных, например служба хранилища Azure и База данных SQL Azure, и вычислительные среды, например HDInsight, могут находиться не в том регионе, который выбран для самой Фабрики данных.
// Set variables string tenantID = "<your tenant ID>"; string applicationId = "<your application ID>"; string authenticationKey = "<your authentication key for the application>"; string subscriptionId = "<your subscription ID to create the factory>"; string resourceGroup = "<your resource group to create the factory>"; string region = "<location to create the data factory in, such as East US>"; string dataFactoryName = "<name of data factory to create (must be globally unique)>"; // Specify the source Azure Blob information string storageAccount = "<your storage account name to copy data>"; string storageKey = "<your storage account key>"; string inputBlobPath = "adfv2tutorial/"; string inputBlobName = "inputEmp.txt"; // Specify the sink Azure SQL Database information string azureSqlConnString = "Server=tcp:<your server name>.database.windows.net,1433;" + "Database=<your database name>;" + "User ID=<your username>@<your server name>;" + "Password=<your password>;" + "Trusted_Connection=False;Encrypt=True;Connection Timeout=30"; string azureSqlTableName = "dbo.emp"; string storageLinkedServiceName = "AzureStorageLinkedService"; string sqlDbLinkedServiceName = "AzureSqlDbLinkedService"; string blobDatasetName = "BlobDataset"; string sqlDatasetName = "SqlDataset"; string pipelineName = "Adfv2TutorialBlobToSqlCopy";
Добавьте в метод
Main
следующий код, который создает экземпляр классаDataFactoryManagementClient
. Этот объект используется не только для создания фабрики данных, связанной службы, наборов данных и конвейера, но и для отслеживания подробностей выполнения конвейера.// Authenticate and create a data factory management client var context = new AuthenticationContext("https://login.windows.net/" + tenantID); ClientCredential cc = new ClientCredential(applicationId, authenticationKey); AuthenticationResult result = context.AcquireTokenAsync( "https://management.azure.com/", cc ).Result; ServiceClientCredentials cred = new TokenCredentials(result.AccessToken); var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
Создание фабрики данных
Добавьте в метод Main
следующий код, который создает фабрику данных.
// Create a data factory
Console.WriteLine("Creating a data factory " + dataFactoryName + "...");
Factory dataFactory = new Factory
{
Location = region,
Identity = new FactoryIdentity()
};
client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
Console.WriteLine(
SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings)
);
while (
client.Factories.Get(
resourceGroup, dataFactoryName
).ProvisioningState == "PendingCreation"
)
{
System.Threading.Thread.Sleep(1000);
}
Создание связанных служб
Работая с этим руководством, вы создадите две связанные службы для источника и приемника, соответственно.
Создание связанной службы хранилища Azure
Добавьте в метод Main
приведенный ниже код, который создает связанную службу хранилища Azure. Информацию о поддерживаемых свойствах и подробные сведения см. в статье о свойствах связанной службы BLOB-объектов Azure.
// Create an Azure Storage linked service
Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
LinkedServiceResource storageLinkedService = new LinkedServiceResource(
new AzureStorageLinkedService
{
ConnectionString = new SecureString(
"DefaultEndpointsProtocol=https;AccountName=" + storageAccount +
";AccountKey=" + storageKey
)
}
);
client.LinkedServices.CreateOrUpdate(
resourceGroup, dataFactoryName, storageLinkedServiceName, storageLinkedService
);
Console.WriteLine(
SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings)
);
Создание связанной службы Базы данных SQL Azure
Добавьте в метод Main
приведенный ниже код, который создает связанную службу Базы данных SQL Azure. Информацию о поддерживаемых свойствах и подробные сведения см. в статье о свойствах связанной службы Базы данных SQL Azure.
// Create an Azure SQL Database linked service
Console.WriteLine("Creating linked service " + sqlDbLinkedServiceName + "...");
LinkedServiceResource sqlDbLinkedService = new LinkedServiceResource(
new AzureSqlDatabaseLinkedService
{
ConnectionString = new SecureString(azureSqlConnString)
}
);
client.LinkedServices.CreateOrUpdate(
resourceGroup, dataFactoryName, sqlDbLinkedServiceName, sqlDbLinkedService
);
Console.WriteLine(
SafeJsonConvert.SerializeObject(sqlDbLinkedService, client.SerializationSettings)
);
Создайте наборы данных.
В этом разделе вы создадите два набора данных: по одному для источника и приемника.
Создание набора данных для исходного большого двоичного объекта Azure
Добавьте в метод Main
приведенный ниже код, который создает набор данных большого двоичного объекта Azure. Информацию о поддерживаемых свойствах и подробные сведения см. в разделе о свойствах набора данных BLOB-объектов Azure.
Задайте набор данных, представляющий исходные данные в большом двоичном объекте Azure. Этот набор данных большого двоичного объекта относится к связанной службе хранилища Azure, созданной на предыдущем шаге, и описывает следующее:
- расположение большого двоичного объекта для копирования:
FolderPath
иFileName
; - формат большого двоичного объекта, указывающий, как анализировать содержимое,
TextFormat
и все параметры, как например разделитель столбцов; - структура данных, включая имена столбцов и типы данных, которые в нашем примере сопоставляются с таблицей-приемником SQL.
// Create an Azure Blob dataset
Console.WriteLine("Creating dataset " + blobDatasetName + "...");
DatasetResource blobDataset = new DatasetResource(
new AzureBlobDataset
{
LinkedServiceName = new LinkedServiceReference {
ReferenceName = storageLinkedServiceName
},
FolderPath = inputBlobPath,
FileName = inputBlobName,
Format = new TextFormat { ColumnDelimiter = "|" },
Structure = new List<DatasetDataElement>
{
new DatasetDataElement { Name = "FirstName", Type = "String" },
new DatasetDataElement { Name = "LastName", Type = "String" }
}
}
);
client.Datasets.CreateOrUpdate(
resourceGroup, dataFactoryName, blobDatasetName, blobDataset
);
Console.WriteLine(
SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings)
);
Создание набора данных для базы данных-приемника SQL Azure
Добавьте в метод Main
приведенный ниже код, который создает набор данных Базы данных SQL Azure. Информацию о поддерживаемых свойствах и подробные сведения см. в разделе о свойствах набора данных Базы данных SQL Azure.
Задайте набор данных, который представляет данные приемника в Базе данных SQL Azure. Этот набор данных относится к связанной службе Базы данных SQL Azure, которую вы создали на предыдущем шаге. Он также указывает таблицу SQL, которая содержит копируемые данных.
// Create an Azure SQL Database dataset
Console.WriteLine("Creating dataset " + sqlDatasetName + "...");
DatasetResource sqlDataset = new DatasetResource(
new AzureSqlTableDataset
{
LinkedServiceName = new LinkedServiceReference
{
ReferenceName = sqlDbLinkedServiceName
},
TableName = azureSqlTableName
}
);
client.Datasets.CreateOrUpdate(
resourceGroup, dataFactoryName, sqlDatasetName, sqlDataset
);
Console.WriteLine(
SafeJsonConvert.SerializeObject(sqlDataset, client.SerializationSettings)
);
Создание конвейера
Добавьте в метод Main
приведенный ниже код, который создает конвейер с действием копирования. В этом руководстве этот конвейер содержит одно действие CopyActivity
, которое принимает в качестве источника набор данных большого двоичного объекта, а в качестве приемника — набор данных SQL. Информацию о поддерживаемых свойствах и подробные сведения см. в разделе Действие копирования в Фабрике данных Azure.
// Create a pipeline with copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
PipelineResource pipeline = new PipelineResource
{
Activities = new List<Activity>
{
new CopyActivity
{
Name = "CopyFromBlobToSQL",
Inputs = new List<DatasetReference>
{
new DatasetReference() { ReferenceName = blobDatasetName }
},
Outputs = new List<DatasetReference>
{
new DatasetReference { ReferenceName = sqlDatasetName }
},
Source = new BlobSource { },
Sink = new SqlSink { }
}
}
};
client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);
Console.WriteLine(
SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings)
);
Создание конвейера
Добавьте в метод Main
следующий код, который активирует выполнение конвейера.
// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
resourceGroup, dataFactoryName, pipelineName
).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
Мониторинг выполнения конвейера
Теперь вставьте код, чтобы проверять состояния выполнения конвейера и получать сведения о выполнении действий копирования.
Добавьте в метод
Main
следующий код, который постоянно проверяет состояния выполнения конвейера, пока тот не завершит копирование данных.// Monitor the pipeline run Console.WriteLine("Checking pipeline run status..."); PipelineRun pipelineRun; while (true) { pipelineRun = client.PipelineRuns.Get( resourceGroup, dataFactoryName, runResponse.RunId ); Console.WriteLine("Status: " + pipelineRun.Status); if (pipelineRun.Status == "InProgress") System.Threading.Thread.Sleep(15000); else break; }
Добавьте в метод
Main
следующий код, который извлекает сведения о выполнении действия копирования, например размер записанных и прочитанных данных.// Check the copy activity run details Console.WriteLine("Checking copy activity run details..."); RunFilterParameters filterParams = new RunFilterParameters( DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10) ); ActivityRunsQueryResponse queryResponse = client.ActivityRuns.QueryByPipelineRun( resourceGroup, dataFactoryName, runResponse.RunId, filterParams ); if (pipelineRun.Status == "Succeeded") { Console.WriteLine(queryResponse.Value.First().Output); } else Console.WriteLine(queryResponse.Value.First().Error); Console.WriteLine("\nPress any key to exit..."); Console.ReadKey();
Выполнение кода
Создайте приложение, выбрав Сборка>Собрать решение. Создайте и запустите приложение, выбрав Отладка>Начать отладку, а затем проверьте выполнение конвейера.
Консоль выведет ход выполнения создания фабрики данных, связанной службы, наборов данных, конвейера и выполнения конвейера. Затем она проверяет состояние выполнения конвейера. Дождитесь появления сведений о действии копирования с информацией о размере записанных и прочитанных данных. Затем вы можете с помощью SSMS (SQL Server Management Studio), Visual Studio или аналогичных средств подключиться к Базе данных SQL Azure и проверить, скопированы ли данные в указанную таблицу.
Пример полученных результатов
Creating a data factory AdfV2Tutorial...
{
"identity": {
"type": "SystemAssigned"
},
"location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": {
"type": "SecureString",
"value": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
}
}
}
}
Creating linked service AzureSqlDbLinkedService...
{
"properties": {
"type": "AzureSqlDatabase",
"typeProperties": {
"connectionString": {
"type": "SecureString",
"value": "Server=tcp:<servername>.database.windows.net,1433;Database=<databasename>;User ID=<username>@<servername>;Password=<password>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
}
}
}
}
Creating dataset BlobDataset...
{
"properties": {
"type": "AzureBlob",
"typeProperties": {
"folderPath": "adfv2tutorial/",
"fileName": "inputEmp.txt",
"format": {
"type": "TextFormat",
"columnDelimiter": "|"
}
},
"structure": [
{
"name": "FirstName",
"type": "String"
},
{
"name": "LastName",
"type": "String"
}
],
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureStorageLinkedService"
}
}
}
Creating dataset SqlDataset...
{
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": "dbo.emp"
},
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureSqlDbLinkedService"
}
}
}
Creating pipeline Adfv2TutorialBlobToSqlCopy...
{
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "SqlSink"
}
},
"inputs": [
{
"type": "DatasetReference",
"referenceName": "BlobDataset"
}
],
"outputs": [
{
"type": "DatasetReference",
"referenceName": "SqlDataset"
}
],
"name": "CopyFromBlobToSQL"
}
]
}
}
Creating pipeline run...
Pipeline run ID: 1cd03653-88a0-4c90-aabc-ae12d843e252
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
"dataRead": 18,
"dataWritten": 28,
"rowsCopied": 2,
"copyDuration": 2,
"throughput": 0.01,
"errors": [],
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)",
"usedDataIntegrationUnits": 2,
"billedDuration": 2
}
Press any key to exit...
Связанный контент
В этом примере конвейер копирует данные из одного расположения в другое в хранилище BLOB-объектов Azure. Вы научились выполнять следующие задачи:
- Создали фабрику данных.
- Создание связанных служб хранилища Azure и базы данных SQL Azure.
- Создание наборов данных большого двоичного объекта Azure и базы данных SQL Azure.
- Создание конвейера с действием копирования.
- Запуск конвейера.
- Мониторинг конвейера и выполнения действий.
Перейдите к следующему руководству, чтобы узнать о копировании данных из локальной среды в облако: