Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
ПРИМЕНИМО К:
Azure Data Factory
Azure Synapse Analytics
Совет
Data Factory в Microsoft Fabric — это следующее поколение Azure Data Factory с более простой архитектурой, встроенным ИИ и новыми функциями. Если вы не знакомы с интеграцией данных, начните с Fabric Data Factory. Существующие рабочие нагрузки ADF могут обновляться до Fabric для доступа к новым возможностям в области обработки и анализа данных, аналитики в режиме реального времени и отчетов.
В этом руководстве вы создадите конвейер фабрики данных, который копирует данные из Azure Blob Storage в Azure SQL Database. Шаблон конфигурации в этом руководстве применяется к копированию из файлового в реляционное хранилище данных. Список хранилищ данных, которые поддерживаются в качестве источников и приемников, см. в разделе Поддерживаемые хранилища данных и форматы.
При работе с этим руководством вы выполните следующие задачи:
- Создали фабрику данных.
- Создайте Azure Storage и связанные службы Azure SQL Database.
- Создайте наборы данных объектов BLOB Azure и базы данных Azure SQL.
- Создайте конвейер, содержащий действие копирования.
- Запуск конвейера.
- Следите за конвейером и запусками заданий.
В этом руководстве используется пакет SDK .NET. Вы можете использовать другие механизмы для взаимодействия с Azure Data Factory; см. примеры в разделе Quickstarts.
Если у вас нет подписки Azure, создайте учетную запись free Azure перед началом работы.
Предварительные требования
- учетная запись Azure Storage. Вы используете хранилище BLOB-объектов в качестве источника данных. Если у вас нет учетной записи хранилища Azure, см. раздел Создание учетной записи хранилища общего назначения.
- База данных SQL Azure. Вы используете базу данных как хранилище данных-приемник. Если у вас нет базы данных в Azure SQL Database, см. статью Создание базы данных в Azure SQL Database.
- Visual Studio. В пошаговом руководстве в этой статье используется Visual Studio 2019.
- Azure SDK для .NET.
- приложение Microsoft Entra. Если у вас нет приложения Microsoft Entra, см. раздел Создание приложения Microsoft Entra в Руководстве: Использование портала для создания приложения Microsoft Entra. Скопируйте следующие значения для использования в последующих шагах: идентификатор приложения (клиента), ключ проверки подлинности и идентификатор каталога (клиента). Назначьте приложению роль Участник, следуя указаниям в той же статье.
Создание BLOB и таблицы SQL
Теперь подготовьте Azure BLOB-объект и Azure SQL Database для этого руководства, создав исходный BLOB-объект и таблицу SQL-хранилища.
Создание исходного BLOB
Сначала нужно создать исходный BLOB-объект. Для этого создайте контейнер и отправьте в него входной текстовый файл.
Откройте Блокнот. Скопируйте следующий текст и сохраните его в локальный файл с именем inputEmp.txt.
John|Doe Jane|DoeИспользуйте средство, например Azure Storage Explorer для создания контейнера adfv2tutorial и отправки файла inputEmp.txt в контейнер.
Создание таблицы-приемника SQL
Далее создайте таблицу SQL для приема данных.
Используйте следующий скрипт SQL, чтобы создать таблицу dbo.emp в Azure SQL Database.
CREATE TABLE dbo.emp ( ID int IDENTITY(1,1) NOT NULL, FirstName varchar(50), LastName varchar(50) ) GO CREATE CLUSTERED INDEX IX_emp_ID ON dbo.emp (ID);Разрешить службам Azure доступ к базе данных SQL. Убедитесь, что вы разрешаете доступ к службам Azure на сервере, чтобы служба фабрики данных могла записывать данные в базу данных SQL. Чтобы проверить и при необходимости включить этот параметр, сделайте следующее.
Перейдите на портал Azure для управления сервером SQL Server. Выполните поиск по фразе Серверы SQL и выберите этот вариант.
Выберите нужный сервер.
В меню сервера SQL выберите в разделе Безопасность элемент Брандмауэры и виртуальные сети.
На странице Брандмауэр и виртуальные сети в разделе Разрешить службам и ресурсам Azure доступ к этому серверу выберите ВКЛ.
Создание проекта Visual Studio
С помощью Visual Studio создайте консольное приложение .NET C#.
- Откройте Visual Studio.
- В окне Начало работы выберите Создать проект.
- В окне Create новый проект выберите версию C# Console App (.NET Framework) из списка типов проектов. Затем выберите Далее.
- В окне Настройка вашего нового проекта введите название проектаADFv2Tutorial. Для параметра Расположение найдите или создайте каталог, в который будет сохранен проект. Затем выберите Создать. Новый проект появится в Visual Studio IDE.
Установка пакетов Nuget
Теперь установите обязательные пакеты библиотек с помощью диспетчера пакетов NuGet.
В строке меню выберите Tools>NuGet Package Manager>Package Manager Console.
В панели консоль Package Manager выполните следующие команды для установки пакетов. Сведения о пакете NuGet Azure Data Factory см. в разделе Microsoft.Azure. Management.DataFactory.
Install-Package Microsoft.Azure.Management.DataFactory Install-Package Microsoft.Azure.Management.ResourceManager -PreRelease Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
Создать клиента для фабрики данных
Чтобы создать клиент фабрики данных, сделайте следующее.
Откройте файл Program.cs и замените все существующие инструкции
usingследующим кодом, чтобы добавить ссылки на пространства имен.using System; using System.Collections.Generic; using System.Linq; using Microsoft.Rest; using Microsoft.Rest.Serialization; using Microsoft.Azure.Management.ResourceManager; using Microsoft.Azure.Management.DataFactory; using Microsoft.Azure.Management.DataFactory.Models; using Microsoft.IdentityModel.Clients.ActiveDirectory;Добавьте приведенный ниже код в метод
Main, в котором задаются переменные. Замените значения 14 заполнителей на собственные.Чтобы просмотреть список регионов Azure, в которых сейчас доступен Data Factory, см. в разделе Продукты, доступные по регионам. В раскрывающемся списке Продукты выберите Обзор>Аналитика>Фабрика данных. Затем в раскрывающемся списке Регионы выберите все интересующие регионы. Отобразится сетка со сведениями о доступности продуктов Фабрики данных для выбранных регионов.
Примечание.
Хранилища данных, такие как Azure Storage и Azure SQL Database, и вычислительные ресурсы, такие как HDInsight, используются фабрикой данных, могут находиться в других регионах, отличных от того, что вы выбираете для фабрики данных.
// Set variables string tenantID = "<your tenant ID>"; string applicationId = "<your application ID>"; string authenticationKey = "<your authentication key for the application>"; string subscriptionId = "<your subscription ID to create the factory>"; string resourceGroup = "<your resource group to create the factory>"; string region = "<location to create the data factory in, such as East US>"; string dataFactoryName = "<name of data factory to create (must be globally unique)>"; // Specify the source Azure Blob information string storageAccount = "<your storage account name to copy data>"; string storageKey = "<your storage account key>"; string inputBlobPath = "adfv2tutorial/"; string inputBlobName = "inputEmp.txt"; // Specify the sink Azure SQL Database information string azureSqlConnString = "Server=tcp:<your server name>.database.windows.net,1433;" + "Database=<your database name>;" + "User ID=<your username>@<your server name>;" + "Password=<your password>;" + "Trusted_Connection=False;Encrypt=True;Connection Timeout=30"; string azureSqlTableName = "dbo.emp"; string storageLinkedServiceName = "AzureStorageLinkedService"; string sqlDbLinkedServiceName = "AzureSqlDbLinkedService"; string blobDatasetName = "BlobDataset"; string sqlDatasetName = "SqlDataset"; string pipelineName = "Adfv2TutorialBlobToSqlCopy";Добавьте в метод
Mainследующий код, который создает экземпляр классаDataFactoryManagementClient. Этот объект используется для создания фабрики данных, связанной службы, наборов данных и конвейера. Вы также используете этот объект для отслеживания деталей выполнения конвейера.// Authenticate and create a data factory management client var context = new AuthenticationContext("https://login.windows.net/" + tenantID); ClientCredential cc = new ClientCredential(applicationId, authenticationKey); AuthenticationResult result = context.AcquireTokenAsync( "https://management.azure.com/", cc ).Result; ServiceClientCredentials cred = new TokenCredentials(result.AccessToken); var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
Создание фабрики данных
Добавьте в метод Main следующий код, который создает фабрику данных.
// Create a data factory
Console.WriteLine("Creating a data factory " + dataFactoryName + "...");
Factory dataFactory = new Factory
{
Location = region,
Identity = new FactoryIdentity()
};
client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
Console.WriteLine(
SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings)
);
while (
client.Factories.Get(
resourceGroup, dataFactoryName
).ProvisioningState == "PendingCreation"
)
{
System.Threading.Thread.Sleep(1000);
}
Создание связанных служб
Работая с этим руководством, вы создадите две связанные службы для источника и приемника, соответственно.
Создание связанной службы Azure Storage
Добавьте следующий код в метод Main, который создает связанную службу Azure Storage. Сведения о поддерживаемых свойствах и деталях см. в разделе свойства связанной службы Azure Blob.
// Create an Azure Storage linked service
Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
LinkedServiceResource storageLinkedService = new LinkedServiceResource(
new AzureStorageLinkedService
{
ConnectionString = new SecureString(
"DefaultEndpointsProtocol=https;AccountName=" + storageAccount +
";AccountKey=" + storageKey
)
}
);
client.LinkedServices.CreateOrUpdate(
resourceGroup, dataFactoryName, storageLinkedServiceName, storageLinkedService
);
Console.WriteLine(
SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings)
);
Создание связанной службы Azure SQL Database
Добавьте следующий код в метод Main, который создает связанную службу Azure SQL Database. Сведения о поддерживаемых свойствах и подробностях см. в разделе Свойства связанной службы Azure SQL Database.
// Create an Azure SQL Database linked service
Console.WriteLine("Creating linked service " + sqlDbLinkedServiceName + "...");
LinkedServiceResource sqlDbLinkedService = new LinkedServiceResource(
new AzureSqlDatabaseLinkedService
{
ConnectionString = new SecureString(azureSqlConnString)
}
);
client.LinkedServices.CreateOrUpdate(
resourceGroup, dataFactoryName, sqlDbLinkedServiceName, sqlDbLinkedService
);
Console.WriteLine(
SafeJsonConvert.SerializeObject(sqlDbLinkedService, client.SerializationSettings)
);
Создайте наборы данных.
В этом разделе вы создадите два набора данных: по одному для источника и приемника.
Создание набора данных для исходного Azure BLOB-объекта
Добавьте следующий код в метод Main, который создает набор данных объектов blob Azure. Сведения о поддерживаемых свойствах и деталях см. в разделе свойства набора данных Azure Blob.
Вы определяете набор данных, представляющий исходные данные в Azure BLOB-объекте. Этот набор данных BLOB-объектов ссылается на связанную службу Azure Storage, созданную на предыдущем шаге, и описывает следующее:
- Расположение BLOB, с которого необходимо выполнить копирование:
FolderPathиFileName. - Формат BLOB, указывающий, как анализировать содержимое:
TextFormatи его параметры, такие как разделитель столбцов. - структура данных, включая имена столбцов и типы данных, которые в нашем примере сопоставляются с таблицей-приемником SQL.
// Create an Azure Blob dataset
Console.WriteLine("Creating dataset " + blobDatasetName + "...");
DatasetResource blobDataset = new DatasetResource(
new AzureBlobDataset
{
LinkedServiceName = new LinkedServiceReference {
ReferenceName = storageLinkedServiceName
},
FolderPath = inputBlobPath,
FileName = inputBlobName,
Format = new TextFormat { ColumnDelimiter = "|" },
Structure = new List<DatasetDataElement>
{
new DatasetDataElement { Name = "FirstName", Type = "String" },
new DatasetDataElement { Name = "LastName", Type = "String" }
}
}
);
client.Datasets.CreateOrUpdate(
resourceGroup, dataFactoryName, blobDatasetName, blobDataset
);
Console.WriteLine(
SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings)
);
Создание набора данных для приемника Azure SQL Database
Добавьте следующий код в метод Main, который создает набор данных Azure SQL Database. Для получения информации о поддерживаемых свойствах и других деталях см. раздел Свойства набора данных Azure SQL Database.
Вы определяете набор данных, представляющий данные приемника в Azure SQL Database. Этот набор данных относится к связанной службе Azure SQL Database, созданной на предыдущем шаге. Он также указывает таблицу SQL, которая содержит копируемые данных.
// Create an Azure SQL Database dataset
Console.WriteLine("Creating dataset " + sqlDatasetName + "...");
DatasetResource sqlDataset = new DatasetResource(
new AzureSqlTableDataset
{
LinkedServiceName = new LinkedServiceReference
{
ReferenceName = sqlDbLinkedServiceName
},
TableName = azureSqlTableName
}
);
client.Datasets.CreateOrUpdate(
resourceGroup, dataFactoryName, sqlDatasetName, sqlDataset
);
Console.WriteLine(
SafeJsonConvert.SerializeObject(sqlDataset, client.SerializationSettings)
);
Создание конвейера
Добавьте следующий код в метод Main, который создает конвейер с операцией копирования. В этом руководстве этот конвейер содержит одно действие CopyActivity, которое принимает в качестве источника набор данных Blob, а в качестве приемника — набор данных SQL. Информацию о сведениях о копировании данных см. в разделе Copy activity в Azure Data Factory.
// Create a pipeline with copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
PipelineResource pipeline = new PipelineResource
{
Activities = new List<Activity>
{
new CopyActivity
{
Name = "CopyFromBlobToSQL",
Inputs = new List<DatasetReference>
{
new DatasetReference() { ReferenceName = blobDatasetName }
},
Outputs = new List<DatasetReference>
{
new DatasetReference { ReferenceName = sqlDatasetName }
},
Source = new BlobSource { },
Sink = new SqlSink { }
}
}
};
client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);
Console.WriteLine(
SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings)
);
Создание конвейера
Добавьте в метод Main следующий код, который активирует выполнение конвейера.
// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
resourceGroup, dataFactoryName, pipelineName
).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
Мониторинг выполнения конвейера
Теперь вставьте код, чтобы проверять состояния выполнения конвейера и получать сведения о выполнении действий копирования.
Добавьте в метод
Mainследующий код, который постоянно проверяет состояния выполнения конвейера, пока тот не завершит копирование данных.// Monitor the pipeline run Console.WriteLine("Checking pipeline run status..."); PipelineRun pipelineRun; while (true) { pipelineRun = client.PipelineRuns.Get( resourceGroup, dataFactoryName, runResponse.RunId ); Console.WriteLine("Status: " + pipelineRun.Status); if (pipelineRun.Status == "InProgress") System.Threading.Thread.Sleep(15000); else break; }Добавьте в метод
Mainследующий код, который извлекает сведения о выполнении действия копирования, например размер записанных и прочитанных данных.// Check the copy activity run details Console.WriteLine("Checking copy activity run details..."); RunFilterParameters filterParams = new RunFilterParameters( DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10) ); ActivityRunsQueryResponse queryResponse = client.ActivityRuns.QueryByPipelineRun( resourceGroup, dataFactoryName, runResponse.RunId, filterParams ); if (pipelineRun.Status == "Succeeded") { Console.WriteLine(queryResponse.Value.First().Output); } else Console.WriteLine(queryResponse.Value.First().Error); Console.WriteLine("\nPress any key to exit..."); Console.ReadKey();
Выполнение кода
Создайте приложение, выбрав Build>Собрать решение. Создайте и запустите приложение, выбрав Отладка>Начать отладку, а затем проверьте выполнение конвейера.
Консоль печатает процесс создания фабрики данных, связанной службы, наборов данных, конвейера и выполнения конвейера. Затем она проверяет состояние выполнения конвейера. Дождитесь появления деталей процесса копирования с информацией о размере прочитанных и записанных данных. Затем с помощью таких средств, как SQL Server Management Studio (SSMS) или Visual Studio, можно подключиться к целевому Azure SQL Database и проверить, содержит ли указанная целевая таблица скопированные данные.
Пример полученных результатов
Creating a data factory AdfV2Tutorial...
{
"identity": {
"type": "SystemAssigned"
},
"location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": {
"type": "SecureString",
"value": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
}
}
}
}
Creating linked service AzureSqlDbLinkedService...
{
"properties": {
"type": "AzureSqlDatabase",
"typeProperties": {
"connectionString": {
"type": "SecureString",
"value": "Server=tcp:<servername>.database.windows.net,1433;Database=<databasename>;User ID=<username>@<servername>;Password=<password>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
}
}
}
}
Creating dataset BlobDataset...
{
"properties": {
"type": "AzureBlob",
"typeProperties": {
"folderPath": "adfv2tutorial/",
"fileName": "inputEmp.txt",
"format": {
"type": "TextFormat",
"columnDelimiter": "|"
}
},
"structure": [
{
"name": "FirstName",
"type": "String"
},
{
"name": "LastName",
"type": "String"
}
],
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureStorageLinkedService"
}
}
}
Creating dataset SqlDataset...
{
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": "dbo.emp"
},
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureSqlDbLinkedService"
}
}
}
Creating pipeline Adfv2TutorialBlobToSqlCopy...
{
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "SqlSink"
}
},
"inputs": [
{
"type": "DatasetReference",
"referenceName": "BlobDataset"
}
],
"outputs": [
{
"type": "DatasetReference",
"referenceName": "SqlDataset"
}
],
"name": "CopyFromBlobToSQL"
}
]
}
}
Creating pipeline run...
Pipeline run ID: 1cd03653-88a0-4c90-aabc-ae12d843e252
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
"dataRead": 18,
"dataWritten": 28,
"rowsCopied": 2,
"copyDuration": 2,
"throughput": 0.01,
"errors": [],
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)",
"usedDataIntegrationUnits": 2,
"billedDuration": 2
}
Press any key to exit...
Связанный контент
Pipeline в этом примере копирует данные из одного места в другое место в хранилище блобов Azure. Вы научились выполнять следующие задачи:
- Создали фабрику данных.
- Создайте Azure Storage и связанные службы Azure SQL Database.
- Создайте наборы данных объектов BLOB Azure и базы данных Azure SQL.
- Создайте конвейер с действием копирования.
- Запуск конвейера.
- Следите за конвейером и запусками заданий.
Перейдите к следующему руководству, чтобы узнать о копировании данных из локальной среды в облако: