Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
ПРИМЕНИМО К:
Azure Data Factory
Azure Synapse Analytics
Совет
Data Factory в Microsoft Fabric — это следующее поколение Azure Data Factory с более простой архитектурой, встроенным ИИ и новыми функциями. Если вы не знакомы с интеграцией данных, начните с Fabric Data Factory. Существующие рабочие нагрузки ADF могут обновляться до Fabric для доступа к новым возможностям в области обработки и анализа данных, аналитики в режиме реального времени и отчетов.
В этом кратком руководстве описывается, как использовать пакет SDK .NET для создания Azure Data Factory. Конвейер, создаваемый в этой фабрике данных, копирует данные из одной папки в другую в хранилище BLOB-объектов Azure. Руководство по преобразованию данных с помощью Azure Data Factory см. в разделе Учебник: преобразование данных с помощью Spark.
Предварительные требования
подписка Azure
Если у вас нет подписки Azure, создайте учетную запись free перед началом работы.
роли Azure
Чтобы создать экземпляры Data Factory, учетная запись пользователя, используемая для входа в Azure, должна быть членом роли contributor или owner, или administrator подписки Azure. Чтобы просмотреть разрешения, которые есть в подписке, перейдите на портал Azure выберите имя пользователя в правом верхнем углу, выберите "... значок " для получения дополнительных параметров и выберите Ми разрешения. Если у вас есть доступ к нескольким подпискам, выберите соответствующую подписку.
Чтобы создавать дочерние ресурсы для службы "Фабрика данных", в том числе наборы данных, связанные службы, конвейеры, триггеры и среды выполнения интеграции, а также управлять ими, выполните следующие требования:
- Чтобы создавать и управлять дочерними ресурсами на портале Azure, необходимо принадлежать к роли Участник Data Factory на уровне группы ресурсов или выше.
- Чтобы создавать дочерние ресурсы и управлять ими с помощью PowerShell или пакета SDK, достаточно роли Участник на уровне ресурса или выше.
Примеры инструкций о том, как добавить пользователя к роли, см. в статье Добавление ролей.
Дополнительные сведения см. в следующих статьях:
учетная запись Azure Storage
Вы используете учетную запись Azure Storage общего назначения (в частности BLOB-хранилище) как хранилища данных источника и назначения в этом кратком руководстве. Если у вас нет учетной записи Azure Storage общего назначения, см. статью Создание учетной записи хранения, чтобы создать её.
Получение имени учетной записи хранения
Для этого краткого руководства вам потребуется имя учетной записи Azure Storage. Процедура ниже предоставляет шаги для получения имени вашей учетной записи хранения.
- В веб-браузере перейдите на портал Azure и войдите с помощью имени пользователя и пароля Azure.
- В меню портала Azure выберите Все услуги, затем выберите Хранилище и >. Можно также выполнить поиск на любой странице и выбрать Учетные записи хранения.
- На странице Учетные записи хранения найдите с помощью фильтра свою учетную запись хранения (при необходимости), а затем выберите эту учетную запись.
Можно также выполнить поиск на любой странице и выбрать Учетные записи хранения.
Создать контейнер больших двоичных объектов
В этом разделе описано, как создать контейнер BLOB-объектов с именем adftutorial в хранилище BLOB-объектов Azure.
На странице учетной записи хранения выберите Общие сведения>Контейнеры.
На панели инструментов страницы <Имя учетной записи> - Контейнеры выберите Контейнер.
В диалоговом окне Создание контейнера введите adftutorial в качестве имени и щелкните ОК. Страница <Имя учетной записи> - Контейнеры будет обновлена, и в списке появится контейнер adftutorial.
Добавьте входную папку и файл для контейнера BLOB-объектов.
В этом разделе показано, как создать папку с именем input в созданном вами контейнере и отправить пример файла в эту папку. Прежде чем начать, откройте текстовый редактор, например Блокнот и создайте файл emp.txt с таким содержимым:
John, Doe
Jane, Doe
Сохраните файл в папкеC:\ADFv2QuickStartPSH. (Если папка еще не существует, создайте ее.) Затем вернитесь на портал Azure и выполните следующие действия:
На странице <Имя учетной записи> - Контейнеры, на которой вы ранее остановились, выберите adftutorial из обновленного списка контейнеров.
- Если вы закрыли окно или перешли на другую страницу, снова войдите на портал Azure.
- В меню портала Azure выберите Все услуги, затем выберите Хранилище и >. Можно также выполнить поиск на любой странице и выбрать Учетные записи хранения.
- Выберите свою учетную запись хранения, а затем выберите Контейнеры>adftutorial.
На панели инструментов adftutorial страницы контейнера выберите Отправка.
На странице Загрузка BLOB-объектов выберите поле Файлы, а затем перейдите к файлу emp.txt и выберите его.
Разверните заголовок Дополнительно. Теперь страница отображается, как показано ниже:
В поле Отправить в папку введите input.
Нажмите кнопку Отправить. В списке должен отобразиться файл emp.txt с состоянием отправки.
Щелкните значок Закрыть (крестик X), чтобы закрыть страницу Загрузка BLOB-объекта.
Не закрывайте страницу контейнера adftutorial. Вы используете это, чтобы проверить выходные данные в конце этого быстрого старта.
Visual Studio
В пошаговом руководстве в этой статье используется Visual Studio 2019. Процедуры Visual Studio 2013, 2015 или 2017 немного отличаются.
Создание приложения в Microsoft Entra ID
В разделах Как: Использовать портал для создания приложения Microsoft Entra и учетной записи службы, которая может получать доступ к ресурсам следуйте инструкциям для выполнения следующих задач:
- В Создание приложения Microsoft Entra создайте приложение, которое представляет собой .NET-приложение, разрабатываемое в данном учебном руководстве. В качестве URL-адреса входа можно указать фиктивный URL-адрес, как показано в статье (
https://contoso.org/exampleapp). - В разделе Получение значений для входа получите код приложения, а также идентификатор арендатора и запишите эти значения. Они потребуются в дальнейшем при выполнении инструкций этого руководства.
- В разделе Сертификаты и секреты получите ключ аутентификации и запишите это значение. Оно потребуется в дальнейшем при выполнении инструкций этого руководства.
- В разделе Назначение приложению роли назначьте приложению роль Участник на уровне подписки, чтобы приложение могло создавать фабрики данных в рамках подписки.
Создание проекта Visual Studio
Затем создайте консольное приложение C# .NET в Visual Studio:
- Запустите Visual Studio.
- В окне "Пуск" выберите Создать новый проект>Консольное приложение (.NET Framework). требуется .NET версии 4.5.2 или более поздней.
- В Название проекта введите ADFv2QuickStart.
- Выберите Создать, чтобы создать проект.
Установка пакетов Nuget
Выберите Tools>NuGet Package Manager>Package Manager Console.
В панели консоль Package Manager выполните следующие команды для установки пакетов. Дополнительные сведения см. в пакете NuGet Azure.ResourceManager.DataFactory.
Install-Package Azure.ResourceManager.DataFactory -IncludePrerelease Install-Package Azure.Identity
Создание фабрики данных
Откройте Program.cs и добавьте следующие инструкции, чтобы добавить ссылки на пространства имен.
using Azure; using Azure.Core; using Azure.Core.Expressions.DataFactory; using Azure.Identity; using Azure.ResourceManager; using Azure.ResourceManager.DataFactory; using Azure.ResourceManager.DataFactory.Models; using Azure.ResourceManager.Resources; using System; using System.Collections.Generic;Добавьте следующий код в метод Main, который задает следующие переменные. Замените значения заполнителей на собственные. В списке регионов Azure, в которых в настоящее время доступен Data Factory, выберите интересующие вас регионы на следующей странице, а затем разверните раздел Analytics, чтобы найти Data Factory: Продукты, доступные по регионам. Хранилища данных (Azure Storage, Azure SQL Database и многое другое) и вычислительные ресурсы (HDInsight и другие), используемые фабрикой данных, могут находиться в других регионах.
// Set variables string tenantID = "<your tenant ID>"; string applicationId = "<your application ID>"; string authenticationKey = "<your authentication key for the application>"; string subscriptionId = "<your subscription ID where the data factory resides>"; string resourceGroup = "<your resource group where the data factory resides>"; string region = "<the location of your resource group>"; string dataFactoryName = "<specify the name of data factory to create. It must be globally unique.>"; string storageAccountName = "<your storage account name to copy data>"; string storageKey = "<your storage account key>"; // specify the container and input folder from which all files // need to be copied to the output folder. string inputBlobContainer = "<blob container to copy data from, e.g. containername>"; string inputBlobPath = "<path to existing blob(s) to copy data from, e.g. inputdir/file>"; //specify the contains and output folder where the files are copied string outputBlobContainer = "<blob container to copy data from, e.g. containername>"; string outputBlobPath = "<the blob path to copy data to, e.g. outputdir/file>"; // name of the Azure Storage linked service, blob dataset, and the pipeline string storageLinkedServiceName = "AzureStorageLinkedService"; string blobDatasetName = "BlobDataset"; string pipelineName = "Adfv2QuickStartPipeline";Добавьте следующий код, создающий фабрику данных, в метод Main.
ArmClient armClient = new ArmClient( new ClientSecretCredential(tenantID, applicationId, authenticationKey, new TokenCredentialOptions { AuthorityHost = AzureAuthorityHosts.AzurePublicCloud }), subscriptionId, new ArmClientOptions { Environment = ArmEnvironment.AzurePublicCloud } ); ResourceIdentifier resourceIdentifier = SubscriptionResource.CreateResourceIdentifier(subscriptionId); SubscriptionResource subscriptionResource = armClient.GetSubscriptionResource(resourceIdentifier); Console.WriteLine("Get an existing resource group " + resourceGroupName + "..."); var resourceGroupOperation = subscriptionResource.GetResourceGroups().Get(resourceGroupName); ResourceGroupResource resourceGroupResource = resourceGroupOperation.Value; Console.WriteLine("Create a data factory " + dataFactoryName + "..."); DataFactoryData dataFactoryData = new DataFactoryData(AzureLocation.EastUS2); var dataFactoryOperation = resourceGroupResource.GetDataFactories().CreateOrUpdate(WaitUntil.Completed, dataFactoryName, dataFactoryData); Console.WriteLine(dataFactoryOperation.WaitForCompletionResponse().Content); // Get the data factory resource DataFactoryResource dataFactoryResource = dataFactoryOperation.Value;
Создание связанной службы
Добавьте следующий код в метод Main, который создает связанную службу Azure Storage.
Вы создаете связанные службы в фабрике данных, чтобы связать свои хранилища данных и вычислительные службы с фабрикой данных. В этом кратком руководстве необходимо создать только одну связанную службу Azure Blob Storage как для источника копирования, так и для хранилища-назначения. В этом примере она называется AzureBlobStorageLinkedService.
// Create an Azure Storage linked service
Console.WriteLine("Create a linked service " + storageLinkedServiceName + "...");
AzureBlobStorageLinkedService azureBlobStorage = new AzureBlobStorageLinkedService()
{
ConnectionString = azureBlobStorageConnectionString
};
DataFactoryLinkedServiceData linkedServiceData = new DataFactoryLinkedServiceData(azureBlobStorage);
var linkedServiceOperation = dataFactoryResource.GetDataFactoryLinkedServices().CreateOrUpdate(WaitUntil.Completed, storageLinkedServiceName, linkedServiceData);
Console.WriteLine(linkedServiceOperation.WaitForCompletionResponse().Content);
Создание набора данных
Добавьте следующий код в метод Main, который создает набор данных в текстовом формате с разделителями.
Вы определяете набор данных, представляющий данные для копирования из источника в приемник. В этом примере этот набор данных с разделенным текстом ссылается на подключенную службу Azure Blob Storage, созданную на предыдущем шаге. Набор данных принимает два параметра, значение которого задано в действии, использующее набор данных. Параметры используются для создания "контейнера" и пути к папке "folderPath", указывающего на место хранения данных.
// Create an Azure Blob dataset
DataFactoryLinkedServiceReference linkedServiceReference = new DataFactoryLinkedServiceReference(DataFactoryLinkedServiceReferenceType.LinkedServiceReference, storageLinkedServiceName);
DelimitedTextDataset delimitedTextDataset = new DelimitedTextDataset(linkedServiceReference)
{
DataLocation = new AzureBlobStorageLocation
{
Container = DataFactoryElement<string>.FromExpression("@dataset().container"),
FileName = DataFactoryElement<string>.FromExpression("@dataset().path")
},
Parameters =
{
new KeyValuePair<string, EntityParameterSpecification>("container",new EntityParameterSpecification(EntityParameterType.String)),
new KeyValuePair<string, EntityParameterSpecification>("path",new EntityParameterSpecification(EntityParameterType.String))
},
FirstRowAsHeader = false,
QuoteChar = "\"",
EscapeChar = "\\",
ColumnDelimiter = ","
};
DataFactoryDatasetData datasetData = new DataFactoryDatasetData(delimitedTextDataset);
var datasetOperation = dataFactoryResource.GetDataFactoryDatasets().CreateOrUpdate(WaitUntil.Completed, blobDatasetName, datasetData);
Console.WriteLine(datasetOperation.WaitForCompletionResponse().Content);
Создание конвейера
Добавьте в метод Main следующий код, создающий конвейер с действием копирования.
В этом примере этот конвейер содержит одно действие и принимает четыре параметра: входной контейнер blob и путь, а также выходной контейнер blob и путь. Значения для этих параметров устанавливаются при активации или выполнении конвейера. Действие копирования ссылается на тот же набор данных BLOB, созданный на предыдущем шаге, в качестве входных и выходных данных. Если набор данных используется в качестве входного набора данных, указывается входной контейнер и путь. При использовании набора данных в качестве выходного набора данных указывается выходной контейнер и путь.
// Create a pipeline with a copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
DataFactoryPipelineData pipelineData = new DataFactoryPipelineData()
{
Parameters =
{
new KeyValuePair<string, EntityParameterSpecification>("inputContainer",new EntityParameterSpecification(EntityParameterType.String)),
new KeyValuePair<string, EntityParameterSpecification>("inputPath",new EntityParameterSpecification(EntityParameterType.String)),
new KeyValuePair<string, EntityParameterSpecification>("outputContainer",new EntityParameterSpecification(EntityParameterType.String)),
new KeyValuePair<string, EntityParameterSpecification>("outputPath",new EntityParameterSpecification(EntityParameterType.String))
},
Activities =
{
new CopyActivity("CopyFromBlobToBlob",new DataFactoryBlobSource(),new DataFactoryBlobSink())
{
Inputs =
{
new DatasetReference(DatasetReferenceType.DatasetReference,blobDatasetName)
{
Parameters =
{
new KeyValuePair<string, BinaryData>("container", BinaryData.FromString("\"@pipeline().parameters.inputContainer\"")),
new KeyValuePair<string, BinaryData>("path", BinaryData.FromString("\"@pipeline().parameters.inputPath\""))
}
}
},
Outputs =
{
new DatasetReference(DatasetReferenceType.DatasetReference,blobDatasetName)
{
Parameters =
{
new KeyValuePair<string, BinaryData>("container", BinaryData.FromString("\"@pipeline().parameters.outputContainer\"")),
new KeyValuePair<string, BinaryData>("path", BinaryData.FromString("\"@pipeline().parameters.outputPath\""))
}
}
}
}
}
};
var pipelineOperation = dataFactoryResource.GetDataFactoryPipelines().CreateOrUpdate(WaitUntil.Completed, pipelineName, pipelineData);
Console.WriteLine(pipelineOperation.WaitForCompletionResponse().Content);
Создание конвейера
Добавьте в метод Main следующий код, активирующий выполнение конвейера.
Этот код также задает значения параметров inputContainer, inputPath, outputContainer и outputPath, указанных в конвейере, фактическими значениями путей к исходным и целевым BLOB-объектам.
// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, BinaryData> parameters = new Dictionary<string, BinaryData>()
{
{ "inputContainer",BinaryData.FromObjectAsJson(inputBlobContainer) },
{ "inputPath",BinaryData.FromObjectAsJson(inputBlobPath) },
{ "outputContainer",BinaryData.FromObjectAsJson(outputBlobContainer) },
{ "outputPath",BinaryData.FromObjectAsJson(outputBlobPath) }
};
var pipelineResource = dataFactoryResource.GetDataFactoryPipeline(pipelineName);
var runResponse = pipelineResource.Value.CreateRun(parameters);
Console.WriteLine("Pipeline run ID: " + runResponse.Value.RunId);
Мониторинг запуска конвейера
Добавьте следующий код в метод Main, чтобы постоянно проверять состояние до завершения копирования данных.
// Monitor the pipeline run Console.WriteLine("Checking pipeline run status..."); DataFactoryPipelineRunInfo pipelineRun; while (true) { pipelineRun = dataFactoryResource.GetPipelineRun(runResponse.Value.RunId.ToString()); Console.WriteLine("Status: " + pipelineRun.Status); if (pipelineRun.Status == "InProgress" || pipelineRun.Status == "Queued") System.Threading.Thread.Sleep(15000); else break; }Добавьте в метод Main следующий код, извлекающий сведения о выполнении действия копирования, например размер записанных и прочитанных данных.
// Check the copy activity run details Console.WriteLine("Checking copy activity run details..."); var queryResponse = dataFactoryResource.GetActivityRun(pipelineRun.RunId.ToString(), new RunFilterContent(DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10))); var enumerator = queryResponse.GetEnumerator(); enumerator.MoveNext(); if (pipelineRun.Status == "Succeeded") Console.WriteLine(enumerator.Current.Output); else Console.WriteLine(enumerator.Current.Error); Console.WriteLine("\nPress any key to exit..."); Console.ReadKey();
Выполнение кода
Создайте и запустите приложение, а затем проверьте выполнение конвейера.
Консоль отображает прогресс создания фабрики данных, связанной службы, наборов данных, конвейера и выполнения конвейера. Затем она проверяет состояние выполнения конвейера. Дождитесь отображения сведений о выполнении операции копирования с указанием объема данных на чтение/запись. Затем используйте такие средства, как Azure Storage Explorer, чтобы проверить, что BLOB-объекты были скопированы в outputBlobPath из inputBlobPath, как указано в переменных.
Пример полученных результатов
Create a data factory quickstart-adf...
{
"name": "quickstart-adf",
"type": "Microsoft.DataFactory/factories",
"properties": {
"provisioningState": "Succeeded",
"version": "2018-06-01"
},
"location": "eastus2"
}
Create a linked service AzureBlobStorage...
{
"name": "AzureBlobStorage",
"type": "Microsoft.DataFactory/factories/linkedservices",
"properties": {
"type": "AzureBlobStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;",
"encryptedCredential": "<encryptedCredential>"
}
}
}
Creating dataset BlobDelimitedDataset...
{
"name": "BlobDelimitedDataset",
"type": "Microsoft.DataFactory/factories/datasets",
"properties": {
"type": "DelimitedText",
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureBlobStorage"
},
"parameters": {
"container": {
"type": "String"
},
"path": {
"type": "String"
}
},
"typeProperties": {
"location": {
"container": {
"type": "Expression",
"value": "@dataset().container"
},
"type": "AzureBlobStorageLocation",
"fileName": {
"type": "Expression",
"value": "@dataset().path"
}
},
"columnDelimiter": ",",
"quoteChar": "\"",
"escapeChar": "\\",
"firstRowAsHeader": false
}
}
}
Creating pipeline Adfv2QuickStartPipeline...
{
"properties": {
"activities": [
{
"inputs": [
{
"type": "DatasetReference",
"referenceName": "BlobDelimitedDataset",
"parameters": {
"container": "@pipeline().parameters.inputContainer",
"path": "@pipeline().parameters.inputPath"
}
}
],
"outputs": [
{
"type": "DatasetReference",
"referenceName": "BlobDelimitedDataset",
"parameters": {
"container": "@pipeline().parameters.outputContainer",
"path": "@pipeline().parameters.outputPath"
}
}
],
"name": "CopyFromBlobToBlob",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "BlobSink"
}
}
}
],
"parameters": {
"inputContainer": {
"type": "String"
},
"inputPath": {
"type": "String"
},
"outputContainer": {
"type": "String"
},
"outputPath": {
"type": "String"
}
}
}
}
Creating pipeline run...
Pipeline run ID: 3aa26ffc-5bee-4db9-8bac-ccbc2d7b51c1
Checking pipeline run status...
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
"dataRead": 1048,
"dataWritten": 1048,
"filesRead": 1,
"filesWritten": 1,
"sourcePeakConnections": 1,
"sinkPeakConnections": 1,
"copyDuration": 8,
"throughput": 1.048,
"errors": [],
"effectiveIntegrationRuntime": "AutoResolveIntegrationRuntime (East US 2)",
"usedDataIntegrationUnits": 4,
"billingReference": {
"activityType": "DataMovement",
"billableDuration": [
{
"meterType": "AzureIR",
"duration": 0.06666666666666667,
"unit": "DIUHours"
}
],
"totalBillableDuration": [
{
"meterType": "AzureIR",
"duration": 0.06666666666666667,
"unit": "DIUHours"
}
]
},
"usedParallelCopies": 1,
"executionDetails": [
{
"source": {
"type": "AzureBlobStorage"
},
"sink": {
"type": "AzureBlobStorage"
},
"status": "Succeeded",
"start": "2023-12-15T10:25:33.9991558Z",
"duration": 8,
"usedDataIntegrationUnits": 4,
"usedParallelCopies": 1,
"profile": {
"queue": {
"status": "Completed",
"duration": 5
},
"transfer": {
"status": "Completed",
"duration": 1,
"details": {
"listingSource": {
"type": "AzureBlobStorage",
"workingDuration": 0
},
"readingFromSource": {
"type": "AzureBlobStorage",
"workingDuration": 0
},
"writingToSink": {
"type": "AzureBlobStorage",
"workingDuration": 0
}
}
}
},
"detailedDurations": {
"queuingDuration": 5,
"transferDuration": 1
}
}
],
"dataConsistencyVerification": {
"VerificationResult": "NotVerified"
}
}
Press any key to exit...
Проверка выходных данных
Конвейер автоматически создает папку для вывода в BLOB-контейнере adftutorial. Затем он копирует файл emp.txt из входной папки в выходную.
- На портале Azure на странице контейнера adftutorial, на которой вы остановились в разделе Добавление входной папки и файла для контейнера BLOB-объектов выше, выберите Обновить, чтобы просмотреть выходную папку.
- В списке папок выберите output.
- Убедитесь, что файл emp.txt скопирован в папку output.
Очистка ресурсов
Чтобы программно удалить фабрику данных, добавьте следующие строки кода в программу:
Console.WriteLine("Deleting the data factory");
dataFactoryResource.Delete(WaitUntil.Completed);
Следующие шаги
Pipeline в этом примере копирует данные из одного места в другое место в хранилище блобов Azure. Перейдите к руководствам, чтобы узнать об использовании фабрики данных в различных сценариях.