Краткое руководство. Создание фабрики данных и конвейера с помощью пакета SDK .NET
ОБЛАСТЬ ПРИМЕНЕНИЯ:Фабрика данных Azure
Azure Synapse Analytics
В этом кратком руководстве показано, как создать фабрику данных Azure с помощью пакета SDK для .NET. Конвейер, который вы создадите в этой фабрике данных, копирует данные из одной папки в другую в хранилище BLOB-объектов Azure. Инструкции по преобразованию данных с помощью Фабрики данных Azure см. в статье Преобразование данных с помощью действия Spark в фабрике данных Azure.
Примечание
Эта статья не содержит подробный обзор службы фабрики данных. Общие сведения о службе фабрики данных Azure см. в статье Введение в фабрику данных Azure.
Предварительные требования
Подписка Azure.
Если у вас еще нет подписки Azure, создайте бесплатную учетную запись, прежде чем начинать работу.
Роли Azure
Чтобы создать экземпляры фабрики данных, нужно назначить учетной записи пользователя, используемой для входа в Azure, роль участника, владельца либо администратора подписки Azure. Чтобы просмотреть имеющиеся разрешения в подписке на портале Azure, выберите имя пользователя в правом верхнем углу и щелкните значок ... для выбора дополнительных параметров, а затем выберите Мои разрешения. Если у вас есть доступ к нескольким подпискам, выберите соответствующую подписку.
Чтобы создавать дочерние ресурсы для службы "Фабрика данных", в том числе наборы данных, связанные службы, конвейеры, триггеры и среды выполнения интеграции, а также управлять ими, выполните следующие требования:
- Чтобы создавать дочерние ресурсы и управлять ими на портале Azure, необходимо иметь роль Участник Фабрики данных на уровне группы ресурсов или более высоком.
- Чтобы создавать дочерние ресурсы и управлять ими с помощью PowerShell или пакета SDK, достаточно роли Участник на уровне ресурса или более высоком.
Примеры инструкций по назначению пользователю роли см. в статье Добавление или изменение администраторов подписки Azure.
Дополнительные сведения см. в следующих статьях:
- Участник Фабрики данных
- Roles and permissions for Azure Data Factory (Роли и разрешения для службы "Фабрика данных Azure")
Учетная запись хранения Azure
В этом кратком руководстве в качестве исходного и целевого хранилища данных используется учетная запись хранения Azure общего назначения (в частности, хранилища BLOB-объектов). Если у вас нет учетной записи хранения Azure общего назначения, см. инструкции по ее созданию.
Получение имени учетной записи хранения
Для выполнения инструкций этого краткого руководства вам потребуется имя учетной записи хранения Azure. Далее описана процедура получения имени учетной записи хранения.
- В веб-браузере перейдите к Портал Azure и выполните вход с помощью имени пользователя и пароля Azure.
- В меню портала Azure выберите Все службы, а затем выберите Хранилище>Учетные записи хранения. Можно также выполнить поиск на любой странице и выбрать Учетные записи хранения.
- На странице Учетные записи хранения найдите с помощью фильтра свою учетную запись хранения (при необходимости), а затем выберите эту учетную запись.
Можно также выполнить поиск на любой странице и выбрать Учетные записи хранения.
Создание контейнера BLOB-объектов
В этом разделе вы создадите контейнер больших двоичных объектов с именем adftutorial в хранилище BLOB-объектов Azure.
На странице учетной записи хранения выберите Общие сведения>Контейнеры.
На панели инструментов страницы <Имя учетной записи> - Контейнеры выберите Контейнер.
В диалоговом окне Создание контейнера введите adftutorial в качестве имени и щелкните ОК. Страница <Имя учетной записи> - Контейнеры будет обновлена, и в списке появится контейнер adftutorial.
Добавление входной папки и файла для контейнера BLOB-объектов
В этом разделе показано, как создать папку с именем input в созданном вами контейнере и отправить пример файла в эту папку. Прежде чем начать, откройте текстовый редактор, например Блокнот и создайте файл emp.txt с таким содержимым:
John, Doe
Jane, Doe
Сохраните файл в папкеC:\ADFv2QuickStartPSH. (Если папка не существует, создайте ее.) После этого вернитесь на портал Azure и выполните следующие действия.
На странице <Имя учетной записи> - Контейнеры, где вы остановились, в обновленном списке контейнеров выберите adftutorial.
- Если вы закрыли окно или перешли на другую страницу, снова войдите на портал Azure.
- В меню портала Azure выберите Все службы, а затем выберите Хранилище>Учетные записи хранения. Можно также выполнить поиск на любой странице и выбрать Учетные записи хранения.
- Выберите свою учетную запись хранения, а затем выберите Контейнеры>adftutorial.
На панели инструментов adftutorial страницы контейнера выберите Отправка.
На странице Отправка BLOB-объектов выберите поле Файлы, а затем найдите и выберите файл emp.txt.
Разверните заголовок Дополнительно. Теперь страница отображается, как показано ниже:
В поле Отправить в папку введите input.
Нажмите кнопку Отправить. В списке должен отобразиться файл emp.txt с состоянием отправки.
Щелкните Закрыть значок (X), чтобы закрыть страницу Отправка BLOB-объектов.
Не закрывайте страницу контейнера adftutorial. Она понадобится для проверки выходных данных в конце этого руководства.
Visual Studio
В этом пошаговом руководстве используется Visual Studio 2019. Процедуры в Visual Studio 2013, 2015 или 2017 незначительно отличаются.
Создание приложения в Azure Active Directory
В разделе Практическое руководство. Создание приложения Azure Active Directory и субъекта-службы с доступом к ресурсам с помощью портала следуйте инструкциям по выполнению этих задач.
- В разделе Создание приложения Azure Active Directory создайте приложение, представляющее приложение .NET, создаваемое в этом руководстве. В качестве URL-адреса входа можно указать фиктивный URL-адрес, как показано в статье (
https://contoso.org/exampleapp
). - В разделе Получение значений для входа получите код приложения, а также идентификатор арендатора и запишите эти значения. Они потребуются в дальнейшем при выполнении инструкций этого руководства.
- В разделе Сертификаты и секреты получите ключ аутентификации и запишите это значение. Оно потребуется в дальнейшем при выполнении инструкций этого руководства.
- В разделе Назначение приложению роли назначьте приложению роль Участник на уровне подписки, чтобы приложение могло создавать фабрики данных в подписке.
Создание проекта Visual Studio
Далее создайте в Visual Studio консольное приложение C# .NET.
- Запустите Visual Studio.
- В окне запуска выберите Создать новый проект>Консольное приложение (платформа .NET Framework) . Требуется .NET версии 4.5.2 или более поздней.
- В окне Имя проекта введите ADFv2QuickStart.
- Выберите Создать, чтобы создать проект.
Установка пакетов Nuget
Выберите Инструменты>Диспетчер пакетов NuGet>Консоль диспетчера пакетов.
На панели Консоль диспетчера пакетов выполните следующие команды, чтобы установить пакеты. Дополнительные сведения см . в пакете NuGet Microsoft.Azure.Management.DataFactory.
Install-Package Microsoft.Azure.Management.DataFactory Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease Install-Package Microsoft.Identity.Client
Создание клиента фабрики данных
Откройте файл Program.cs и включите следующие инструкции, чтобы добавить ссылки на пространства имен.
using System; using System.Collections.Generic; using System.Linq; using Microsoft.Rest; using Microsoft.Rest.Serialization; using Microsoft.Azure.Management.ResourceManager; using Microsoft.Azure.Management.DataFactory; using Microsoft.Azure.Management.DataFactory.Models; using Microsoft.Identity.Client;
Добавьте следующий код в метод Main, который задает следующие переменные. Замените значения заполнителей на собственные. Чтобы получить список регионов Azure, в которых сейчас доступна Фабрика данных, выберите интересующие вас регионы на следующей странице, а затем разверните раздел Аналитика, чтобы найти пункт Фабрика данных: Доступность продуктов по регионам. Хранилища данных (служба хранилища Azure, база данных SQL Azure и многие другие) и вычисления (HDInsight и другие), используемые фабрикой данных, могут располагаться в других регионах.
// Set variables string tenantID = "<your tenant ID>"; string applicationId = "<your application ID>"; string authenticationKey = "<your authentication key for the application>"; string subscriptionId = "<your subscription ID where the data factory resides>"; string resourceGroup = "<your resource group where the data factory resides>"; string region = "<the location of your resource group>"; string dataFactoryName = "<specify the name of data factory to create. It must be globally unique.>"; string storageAccount = "<your storage account name to copy data>"; string storageKey = "<your storage account key>"; // specify the container and input folder from which all files // need to be copied to the output folder. string inputBlobPath = "<path to existing blob(s) to copy data from, e.g. containername/inputdir>"; //specify the contains and output folder where the files are copied string outputBlobPath = "<the blob path to copy data to, e.g. containername/outputdir>"; // name of the Azure Storage linked service, blob dataset, and the pipeline string storageLinkedServiceName = "AzureStorageLinkedService"; string blobDatasetName = "BlobDataset"; string pipelineName = "Adfv2QuickStartPipeline";
Примечание
Для национального облака необходимо использовать соответствующие конечные точки для ActiveDirectoryAuthority и ResourceManagerUrl (BaseUri). Например, в Azure US Gov используйте https://login.microsoftonline.us вместо https://login.microsoftonline.com и https://management.usgovcloudapi.net вместо https://management.azure.com/ , а затем создайте клиент управления фабрикой данных. С помощью PowerShell можно легко получить URL-адреса конечных точек для различных облаков, выполнив команду Get-AzEnvironment | Format-List, которая возвращает список конечных точек для каждой облачной среды.
Добавьте в метод Main приведенный ниже код, создающий экземпляр класса DataFactoryManagementClient. Этот объект используется не только для создания фабрики данных, связанной службы, наборов данных и конвейера, но и для отслеживания подробностей выполнения конвейера.
// Authenticate and create a data factory management client IConfidentialClientApplication app = ConfidentialClientApplicationBuilder.Create(applicationId) .WithAuthority("https://login.microsoftonline.com/" + tenantID) .WithClientSecret(authenticationKey) .WithLegacyCacheCompatibility(false) .WithCacheOptions(CacheOptions.EnableSharedCacheOptions) .Build(); AuthenticationResult result = await app.AcquireTokenForClient( new string[]{ "https://management.azure.com//.default"}) .ExecuteAsync(); ServiceClientCredentials cred = new TokenCredentials(result.AccessToken); var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
Создание фабрики данных
Добавьте следующий код, создающий фабрику данных, в метод Main.
// Create a data factory
Console.WriteLine("Creating data factory " + dataFactoryName + "...");
Factory dataFactory = new Factory
{
Location = region,
Identity = new FactoryIdentity()
};
client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
Console.WriteLine(
SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings));
while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState ==
"PendingCreation")
{
System.Threading.Thread.Sleep(1000);
}
Создание связанной службы
Добавьте следующий код, создающий связанную службу хранилища Azure, в метод Main.
Связанная служба в фабрике данных связывает хранилища данных и службы вычислений с фабрикой данных. В этом руководстве необходимо создать одну связанную службу хранилища Azure для копирования хранилища-источника и приемника. В примере она называется AzureStorageLinkedService.
// Create an Azure Storage linked service
Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
LinkedServiceResource storageLinkedService = new LinkedServiceResource(
new AzureStorageLinkedService
{
ConnectionString = new SecureString(
"DefaultEndpointsProtocol=https;AccountName=" + storageAccount +
";AccountKey=" + storageKey)
}
);
client.LinkedServices.CreateOrUpdate(
resourceGroup, dataFactoryName, storageLinkedServiceName, storageLinkedService);
Console.WriteLine(SafeJsonConvert.SerializeObject(
storageLinkedService, client.SerializationSettings));
Создание набора данных
Добавьте следующий код, который создает набор данных большого двоичного объекта Azure, в метод Main.
Вы можете определить набор данных, который будет представлять данные для копирования из источника в приемник. В этом примере набор данных большого двоичного объекта относится к связанным службам хранилища Azure, созданным на предыдущем шаге. Набор данных принимает параметр, значение которого задается в действии, использующем набор данных. Параметр используется для создания folderPath, указывающего, где находятся или хранятся данные.
// Create an Azure Blob dataset
Console.WriteLine("Creating dataset " + blobDatasetName + "...");
DatasetResource blobDataset = new DatasetResource(
new AzureBlobDataset
{
LinkedServiceName = new LinkedServiceReference
{
ReferenceName = storageLinkedServiceName
},
FolderPath = new Expression { Value = "@{dataset().path}" },
Parameters = new Dictionary<string, ParameterSpecification>
{
{ "path", new ParameterSpecification { Type = ParameterType.String } }
}
}
);
client.Datasets.CreateOrUpdate(
resourceGroup, dataFactoryName, blobDatasetName, blobDataset);
Console.WriteLine(
SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
Создание конвейера
Добавьте в метод Main следующий код, создающий конвейер с действием копирования.
В этом примере этот конвейер содержит одно действие и принимает два параметра: путь входного и выходного большого двоичного объекта. Значения для этих параметров устанавливаются при активации или выполнении конвейера. Действие копирования ссылается на тот же набор данных большого двоичного объекта, который был создан на предыдущем шаге, в качестве входного и выходного. Если набор данных используется в качестве входного, указывается путь к входным данным. И если набор данных используется в качестве выходного, указывается путь к выходным данным.
// Create a pipeline with a copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
PipelineResource pipeline = new PipelineResource
{
Parameters = new Dictionary<string, ParameterSpecification>
{
{ "inputPath", new ParameterSpecification { Type = ParameterType.String } },
{ "outputPath", new ParameterSpecification { Type = ParameterType.String } }
},
Activities = new List<Activity>
{
new CopyActivity
{
Name = "CopyFromBlobToBlob",
Inputs = new List<DatasetReference>
{
new DatasetReference()
{
ReferenceName = blobDatasetName,
Parameters = new Dictionary<string, object>
{
{ "path", "@pipeline().parameters.inputPath" }
}
}
},
Outputs = new List<DatasetReference>
{
new DatasetReference
{
ReferenceName = blobDatasetName,
Parameters = new Dictionary<string, object>
{
{ "path", "@pipeline().parameters.outputPath" }
}
}
},
Source = new BlobSource { },
Sink = new BlobSink { }
}
}
};
client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);
Console.WriteLine(SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings));
Создание конвейера
Добавьте в метод Main следующий код, активирующий выполнение конвейера.
Этот код также указывает параметрам inputPath и outputPath, задаваемым в конвейере, фактические значения путей большого двоичного объекта источника и приемника.
// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> parameters = new Dictionary<string, object>
{
{ "inputPath", inputBlobPath },
{ "outputPath", outputBlobPath }
};
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
resourceGroup, dataFactoryName, pipelineName, parameters: parameters
).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
Мониторинг выполнения конвейера
Добавьте следующий код в метод Main, чтобы постоянно проверять состояние до завершения копирования данных.
// Monitor the pipeline run Console.WriteLine("Checking pipeline run status..."); PipelineRun pipelineRun; while (true) { pipelineRun = client.PipelineRuns.Get( resourceGroup, dataFactoryName, runResponse.RunId); Console.WriteLine("Status: " + pipelineRun.Status); if (pipelineRun.Status == "InProgress" || pipelineRun.Status == "Queued") System.Threading.Thread.Sleep(15000); else break; }
Добавьте в метод Main следующий код, извлекающий сведения о выполнении действия копирования, например размер записанных и прочитанных данных.
// Check the copy activity run details Console.WriteLine("Checking copy activity run details..."); RunFilterParameters filterParams = new RunFilterParameters( DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)); ActivityRunsQueryResponse queryResponse = client.ActivityRuns.QueryByPipelineRun( resourceGroup, dataFactoryName, runResponse.RunId, filterParams); if (pipelineRun.Status == "Succeeded") Console.WriteLine(queryResponse.Value.First().Output); else Console.WriteLine(queryResponse.Value.First().Error); Console.WriteLine("\nPress any key to exit..."); Console.ReadKey();
Выполнение кода
Создайте и запустите приложение, а затем проверьте выполнение конвейера.
Консоль выведет ход выполнения создания фабрики данных, связанной службы, наборов данных, конвейера и выполнения конвейера. Затем она проверяет состояние выполнения конвейера. Дождитесь появления сведений о действии копирования с размером данных для чтения или записи. Затем воспользуйтесь такими средствами, как Обозреватель службы хранилища Azure, чтобы проверить, скопированы ли большие двоичные объекты в outputBlobPath из inputBlobPath, как указано в переменных.
Пример выходных данных
Creating data factory SPv2Factory0907...
{
"identity": {
"type": "SystemAssigned"
},
"location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": {
"value": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>",
"type": "SecureString"
}
}
}
}
Creating dataset BlobDataset...
{
"properties": {
"type": "AzureBlob",
"typeProperties": {
"folderPath": {
"value": "@{dataset().path}",
"type": "Expression"
}
},
"linkedServiceName": {
"referenceName": "AzureStorageLinkedService",
"type": "LinkedServiceReference"
},
"parameters": {
"path": {
"type": "String"
}
}
}
}
Creating pipeline Adfv2QuickStartPipeline...
{
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "BlobSink"
}
},
"inputs": [
{
"referenceName": "BlobDataset",
"parameters": {
"path": "@pipeline().parameters.inputPath"
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "BlobDataset",
"parameters": {
"path": "@pipeline().parameters.outputPath"
},
"type": "DatasetReference"
}
],
"name": "CopyFromBlobToBlob"
}
],
"parameters": {
"inputPath": {
"type": "String"
},
"outputPath": {
"type": "String"
}
}
}
}
Creating pipeline run...
Pipeline run ID: 308d222d-3858-48b1-9e66-acd921feaa09
Checking pipeline run status...
Status: InProgress
Status: InProgress
Checking copy activity run details...
{
"dataRead": 331452208,
"dataWritten": 331452208,
"copyDuration": 23,
"throughput": 14073.209,
"errors": [],
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (West US)",
"usedDataIntegrationUnits": 2,
"billedDuration": 23
}
Press any key to exit...
Проверка выходных данных
Конвейер автоматически создает выходную папку в контейнере больших двоичных объектов adftutorial. Затем он копирует файл emp.txt из входной папки в выходную.
- На портале Azure на странице контейнера adftutorial в разделе Добавление входной папки и файла для контейнера больших двоичных объектов, описанном выше, выберите Обновить, чтобы просмотреть выходную папку.
- В списке папок щелкните output в списке папок.
- Убедитесь, что файл emp.txt скопирован в папку output.
Очистка ресурсов
Чтобы программно удалить фабрики данных, добавьте следующие строки кода в программу:
Console.WriteLine("Deleting the data factory");
client.Factories.Delete(resourceGroup, dataFactoryName);
Дальнейшие действия
В этом примере конвейер копирует данные из одного расположения в другое в хранилище BLOB-объектов Azure. Перейдите к руководствам, чтобы узнать об использовании фабрики данных в различных сценариях.