Inicio rápido: Creación de una factoría de datos y una canalización con SDK de .NET

SE APLICA A: Azure Data Factory Azure Synapse Analytics

En este inicio rápido se describe cómo usar SDK de .NET para crear una instancia de Azure Data Factory. La canalización que ha creado en esta factoría de datos copia los datos de una carpeta a otra en Azure Blob Storage. Para ver un tutorial acerca de cómo transformar datos mediante Azure Data Factory, consulte Tutorial: Transformación de datos con Spark.

Nota

En este artículo no se ofrece una introducción detallada al servicio Data Factory. Para ver una introducción al servicio Azure Data Factory, consulte Introducción al servicio Azure Data Factory.

Requisitos previos

Suscripción de Azure

Si no tiene una suscripción a Azure, cree una cuenta gratuita antes de empezar.

Roles de Azure

Para crear instancias de Data Factory, la cuenta de usuario que use para iniciar sesión en Azure debe ser un miembro de los roles colaborador o propietario, o de administrador de la suscripción de Azure. Para ver los permisos que tiene en la suscripción, vaya a Azure Portal, seleccione su nombre de usuario en la esquina superior derecha, seleccione el icono " ... " para ver más opciones y, después, seleccione Mis permisos. Si tiene acceso a varias suscripciones, elija la correspondiente.

Para crear y administrar recursos secundarios para Data Factory incluidos los conjuntos de datos, servicios vinculados, canalizaciones, desencadenadores y entornos de ejecución de integración, se aplican los siguientes requisitos:

  • Para crear y administrar recursos secundarios en Azure Portal, debe pertenecer al rol Colaborador de Data Factory en el nivel de grupo de recursos u otro nivel superior.
  • Para crear y administrar recursos secundarios con Powershell o el SDK, el rol de Colaborador en el nivel de recurso u otro nivel superior es suficiente.

Para obtener instrucciones de ejemplo sobre cómo agregar un usuario a un rol, consulte el artículo sobre la adición de roles.

Para más información, consulte los siguientes artículos:

Cuenta de Azure Storage

En esta guía de inicio rápido, use una cuenta de Azure Storage (en concreto Blob Storage) de uso general como almacén de datos de origen y destino. Si no dispone de una cuenta de Azure Storage de uso general, consulte el artículo Creación de una cuenta de almacenamiento, donde se indica cómo crearla.

Obtención del nombre de la cuenta de almacenamiento

En este inicio rápido, necesita el nombre de su cuenta de Azure Storage. El siguiente procedimiento especifica los pasos necesarios para obtener el nombre de una cuenta de almacenamiento:

  1. En un explorador web, vaya a Azure Portal e inicie sesión con su nombre de usuario y contraseña de Azure.
  2. En el menú de Azure Portal, seleccione Todos los servicios y, a continuación, seleccione Almacenamiento>Cuentas de almacenamiento. También puede buscar y seleccionar cuentas de almacenamiento desde cualquier página.
  3. En la página Cuentas de Storage, filtre por su cuenta de almacenamiento (si fuera necesario) y, después, seleccione su cuenta de Storage.

También puede buscar y seleccionar cuentas de almacenamiento desde cualquier página.

Creación de un contenedor de blobs

En esta sección se crea un contenedor de blobs denominado adftutorial en la instancia de Azure Blob Storage.

  1. En la página de la cuenta de almacenamiento, seleccione Información general>Contenedores.

  2. En la barra de herramientas de la página <Nombre de cuenta> - Contenedores, seleccione Contenedor.

  3. En el cuadro de diálogo Nuevo contenedor, escriba adftutorial para el nombre y seleccione Aceptar. La página <Nombre de cuenta> - Contenedores se actualiza para incluir adftutorial en la lista de contenedores.

    List of containers

Agregar una carpeta de entrada y un archivo para el contenedor de blobs

En esta sección, creará una carpeta denominada entrada en el contenedor que creó y cargará un archivo de ejemplo en dicha carpeta. Antes de empezar, abra un editor de texto, como el Bloc de notas, y cree un archivo denominado emp.txt con el siguiente contenido:

John, Doe
Jane, Doe

Guarde el archivo en la carpeta C:\ADFv2QuickStartPSH. (Si la carpeta no existe, créela). A continuación, vuelva a Azure Portal y siga estos pasos:

  1. En la página <Nombre de cuenta> - Contenedores, en el punto donde se quedó, seleccione adftutorial en la lista actualizada de contenedores.

    1. Si ha cerrado la ventana o ha pasado a otra página; inicie sesión de nuevo en Azure Portal.
    2. En el menú de Azure Portal, seleccione Todos los servicios y, a continuación, seleccione Almacenamiento>Cuentas de almacenamiento. También puede buscar y seleccionar cuentas de almacenamiento desde cualquier página.
    3. Seleccione la cuenta de almacenamiento y, después, seleccione Contenedores>adftutorial.
  2. En la barra de herramientas de la página del contenedor adftutorial, seleccione Cargar.

  3. En la página Cargar blob, seleccione Archivos y, a continuación, busque y seleccione el archivo emp.txt.

  4. Expanda el título Avanzado. La página aparece ahora como a continuación:

    Select Advanced link

  5. En el cuadro Cargar en carpeta, escriba input.

  6. Seleccione el botón Cargar. Debería ver el archivo emp.txt y el estado de la carga en la lista.

  7. Seleccione el icono Cerrar (X) para cerrar la página Cargar blob.

Mantenga abierta la página del contenedor adftutorial. Úsela para comprobar la salida al final de esta guía de inicio rápido.

Visual Studio

En el tutorial de este artículo se usa Visual Studio 2019. Los procedimientos para Visual Studio 2013, 2015 o 2017 son ligeramente diferentes.

Creación de una aplicación en Azure Active Directory

En las secciones de Instrucciones: Uso del portal para crear una aplicación de Azure AD y una entidad de servicio con acceso a los recursos, siga las instrucciones para realizar las siguientes tareas:

  1. En Crear una aplicación de Azure Active Directory, cree una aplicación que represente la aplicación .NET que se va a crear en este tutorial. Para la dirección URL de inicio de sesión, puede proporcionar una dirección URL ficticia, tal como se muestra en el artículo (https://contoso.org/exampleapp).
  2. En Obtener valores para iniciar sesión, obtenga el identificador de aplicación y el identificador de inquilino y tome nota de estos valores que se usa más adelante en este tutorial.
  3. En Certificados y secretos, obtenga la clave de autenticación y tome nota de este valor que se usa más adelante en este tutorial.
  4. En Asignar la aplicación a un rol, asigne la aplicación al rol Colaborador en el nivel de suscripción para que la aplicación pueda crear factorías de datos en la suscripción.

Creación de un proyecto de Visual Studio

A continuación, cree una aplicación de consola .NET de C# en Visual Studio:

  1. Inicie Visual Studio.
  2. En la ventana Inicio, seleccione Crear un nuevo proyecto>Aplicación de consola (.NET Framework) . Se requiere .NET versión 4.5.2 o posterior.
  3. En Nombre del proyecto, escriba ADFv2QuickStart.
  4. Seleccione Crear para crear el proyecto.

Instalación de paquetes NuGet

  1. Seleccione Herramientas>Administrador de paquetes NuGet>Consola del Administrador de paquetes.

  2. En el panel Consola del Administrador de paquetes, ejecute los comandos siguientes para instalar los paquetes. Para obtener más información, consulte el paquete NuGet Microsoft.Azure.Management.DataFactory.

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease
    Install-Package Microsoft.Identity.Client
    

Creación de un cliente de factoría de datos

  1. Abra Program.cs e incluya las siguientes instrucciones para agregar referencias a espacios de nombres.

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Rest.Serialization;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.Identity.Client;
    
  2. Agregue el siguiente código al método main que define las variables. Reemplace los marcadores de posición por sus propios valores. Para una lista de las regiones de Azure en las que Data Factory está disponible actualmente, seleccione las regiones que le interesen en la página siguiente y expanda Análisis para poder encontrar Data Factory: Productos disponibles por región. Los almacenes de datos (Azure Storage, Azure SQL Database, etc.) y los procesos (HDInsight, etc.) que usa la factoría de datos pueden encontrarse en otras regiones.

    // Set variables
    string tenantID = "<your tenant ID>";
    string applicationId = "<your application ID>";
    string authenticationKey = "<your authentication key for the application>";
    string subscriptionId = "<your subscription ID where the data factory resides>";
    string resourceGroup = "<your resource group where the data factory resides>";
    string region = "<the location of your resource group>";
    string dataFactoryName = 
        "<specify the name of data factory to create. It must be globally unique.>";
    string storageAccount = "<your storage account name to copy data>";
    string storageKey = "<your storage account key>";
    // specify the container and input folder from which all files 
    // need to be copied to the output folder. 
    string inputBlobPath =
        "<path to existing blob(s) to copy data from, e.g. containername/inputdir>";
    //specify the contains and output folder where the files are copied
    string outputBlobPath =
        "<the blob path to copy data to, e.g. containername/outputdir>";
    
    // name of the Azure Storage linked service, blob dataset, and the pipeline
    string storageLinkedServiceName = "AzureStorageLinkedService";
    string blobDatasetName = "BlobDataset";
    string pipelineName = "Adfv2QuickStartPipeline";
    

Nota

En el caso de nubes soberanas, debe usar los puntos de conexión adecuados específicos de la nube para ActiveDirectoryAuthority y ResourceManagerUrl (BaseUri). Por ejemplo, en el caso de la nube de Azure de Administración Pública de los Estados Unidos, debería usar la autoridad de https://login.microsoftonline.us en lugar de https://login.microsoftonline.com y https://management.usgovcloudapi.net en lugar de https://management.azure.com/ y, luego, crear el cliente de administración de la factoría de datos. Puede usar PowerShell para obtener fácilmente las direcciones URL de las diversas nubes ejecutando "Get-AzEnvironment | Format-List ", que devolverá una lista de puntos de conexión para cada entorno de nube.

  1. Agregue el código siguiente al método Main que crea una instancia de la clase DataFactoryManagementClient. Este objeto se usa para crear una factoría de datos, un servicio vinculado, conjuntos de datos y una canalización. También se usa para supervisar los detalles de ejecución de la canalización.

    // Authenticate and create a data factory management client
    IConfidentialClientApplication app = ConfidentialClientApplicationBuilder.Create(applicationId)
     .WithAuthority("https://login.microsoftonline.com/" + tenantID)
     .WithClientSecret(authenticationKey)
     .WithLegacyCacheCompatibility(false)
     .WithCacheOptions(CacheOptions.EnableSharedCacheOptions)
     .Build();
    
    AuthenticationResult result = await app.AcquireTokenForClient(
      new string[]{ "https://management.azure.com//.default"})
       .ExecuteAsync();
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) {
        SubscriptionId = subscriptionId };
    

Crear una factoría de datos

Agregue el código siguiente al método main que crea una factoría de datos.

// Create a data factory
Console.WriteLine("Creating data factory " + dataFactoryName + "...");
Factory dataFactory = new Factory
{
    Location = region,
    Identity = new FactoryIdentity()
};
client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings));

while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState ==
       "PendingCreation")
{
    System.Threading.Thread.Sleep(1000);
}

Creación de un servicio vinculado

Agregue el código siguiente al método main que crea un servicio vinculado de Azure Storage.

Los servicios vinculados se crean en una factoría de datos para vincular los almacenes de datos y los servicios de proceso con la factoría de datos. En esta guía de inicio rápido, basta con crear un servicio vinculado de Azure Storage para copiar el almacén de origen y el receptor. En el ejemplo, se denomina "AzureStorageLinkedService".

// Create an Azure Storage linked service
Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");

LinkedServiceResource storageLinkedService = new LinkedServiceResource(
    new AzureStorageLinkedService
    {
        ConnectionString = new SecureString(
            "DefaultEndpointsProtocol=https;AccountName=" + storageAccount +
            ";AccountKey=" + storageKey)
    }
);
client.LinkedServices.CreateOrUpdate(
    resourceGroup, dataFactoryName, storageLinkedServiceName, storageLinkedService);
Console.WriteLine(SafeJsonConvert.SerializeObject(
    storageLinkedService, client.SerializationSettings));

Crear un conjunto de datos

Agregue el código siguiente al método Main que crea un conjunto de datos de blob de Azure.

Se define un conjunto de datos que representa los datos que se copian de un origen a un receptor. En este ejemplo, este conjunto de datos de blob hace referencia al servicio vinculado de Azure Storage que creó en el paso anterior. En el conjunto de datos se usa un parámetro cuyo valor se establece en una actividad que consume el conjunto de datos. El parámetro se usa para construir la ruta "folderPath" que apunta a la ubicación o al almacén de los datos.

// Create an Azure Blob dataset
Console.WriteLine("Creating dataset " + blobDatasetName + "...");
DatasetResource blobDataset = new DatasetResource(
    new AzureBlobDataset
    {
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = storageLinkedServiceName
        },
        FolderPath = new Expression { Value = "@{dataset().path}" },
        Parameters = new Dictionary<string, ParameterSpecification>
        {
            { "path", new ParameterSpecification { Type = ParameterType.String } }
        }
    }
);
client.Datasets.CreateOrUpdate(
    resourceGroup, dataFactoryName, blobDatasetName, blobDataset);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));

Crear una canalización

Agregue el código siguiente al método main que crea una canalización con una actividad de copia.

En este ejemplo, esta canalización contiene una actividad y se usan dos parámetros: la ruta de acceso de blob de entrada y la de salida. Los valores para estos parámetros se establecen cuando se desencadena/ejecuta la canalización. La actividad de copia hace referencia al mismo conjunto de datos de blob creado en el paso anterior como entrada y salida. Cuando se usa el conjunto de datos como conjunto de datos de entrada, se especifica una ruta de acceso de entrada. Cuando se usa el conjunto de datos como conjunto de datos de salida, se especifica una ruta de acceso de salida.

// Create a pipeline with a copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
PipelineResource pipeline = new PipelineResource
{
    Parameters = new Dictionary<string, ParameterSpecification>
    {
        { "inputPath", new ParameterSpecification { Type = ParameterType.String } },
        { "outputPath", new ParameterSpecification { Type = ParameterType.String } }
    },
    Activities = new List<Activity>
    {
        new CopyActivity
        {
            Name = "CopyFromBlobToBlob",
            Inputs = new List<DatasetReference>
            {
                new DatasetReference()
                {
                    ReferenceName = blobDatasetName,
                    Parameters = new Dictionary<string, object>
                    {
                        { "path", "@pipeline().parameters.inputPath" }
                    }
                }
            },
            Outputs = new List<DatasetReference>
            {
                new DatasetReference
                {
                    ReferenceName = blobDatasetName,
                    Parameters = new Dictionary<string, object>
                    {
                        { "path", "@pipeline().parameters.outputPath" }
                    }
                }
            },
            Source = new BlobSource { },
            Sink = new BlobSink { }
        }
    }
};
client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);
Console.WriteLine(SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings));

Creación de una ejecución de canalización

Agregue el código siguiente al método Main que desencadena una ejecución de canalización.

Este código también establece los valores de los parámetros inputPath y outputPath especificados en la canalización con los valores reales de las rutas de acceso de blob de origen y receptor.

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> parameters = new Dictionary<string, object>
{
    { "inputPath", inputBlobPath },
    { "outputPath", outputBlobPath }
};
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
    resourceGroup, dataFactoryName, pipelineName, parameters: parameters
).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Supervisar una ejecución de canalización

  1. Agregue el código siguiente al método Main para comprobar continuamente el estado hasta que termine de copiar los datos.

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(
            resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress" || pipelineRun.Status == "Queued")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    
  2. Agregue el código siguiente al método Main que recupera detalles de la ejecución de actividad de copia, como el tamaño de los datos que se leen o escriben.

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    RunFilterParameters filterParams = new RunFilterParameters(
        DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10));
    ActivityRunsQueryResponse queryResponse = client.ActivityRuns.QueryByPipelineRun(
        resourceGroup, dataFactoryName, runResponse.RunId, filterParams);
    if (pipelineRun.Status == "Succeeded")
        Console.WriteLine(queryResponse.Value.First().Output);
    else
        Console.WriteLine(queryResponse.Value.First().Error);
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

Ejecución del código

Compile e inicie la aplicación y, a continuación, compruebe la ejecución de la canalización.

La consola imprime el progreso de la creación de la factoría de datos, el servicio vinculado, los conjuntos de datos, la canalización y la ejecución de canalización. A continuación, comprueba el estado de la ejecución de canalización. Espere hasta que vea los detalles de ejecución de actividad de copia con el tamaño de los datos leídos o escritos. Luego, use herramientas como el Explorador de Azure Storage para comprobar que los blobs se copian a "outputBlobPath" desde "inputBlobPath", como especificó en las variables.

Salida de ejemplo

Creating data factory SPv2Factory0907...
{
  "identity": {
    "type": "SystemAssigned"
  },
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "properties": {
    "type": "AzureStorage",
    "typeProperties": {
      "connectionString": {
        "value": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>",
        "type": "SecureString"
      }
    }
  }
}
Creating dataset BlobDataset...
{
  "properties": {
    "type": "AzureBlob",
    "typeProperties": {
      "folderPath": {
        "value": "@{dataset().path}",
        "type": "Expression"
      }
    },
    "linkedServiceName": {
      "referenceName": "AzureStorageLinkedService",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "path": {
        "type": "String"
      }
    }
  }
}
Creating pipeline Adfv2QuickStartPipeline...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "inputs": [
          {
            "referenceName": "BlobDataset",
            "parameters": {
              "path": "@pipeline().parameters.inputPath"
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "BlobDataset",
            "parameters": {
              "path": "@pipeline().parameters.outputPath"
            },
            "type": "DatasetReference"
          }
        ],
        "name": "CopyFromBlobToBlob"
      }
    ],
    "parameters": {
      "inputPath": {
        "type": "String"
      },
      "outputPath": {
        "type": "String"
      }
    }
  }
}
Creating pipeline run...
Pipeline run ID: 308d222d-3858-48b1-9e66-acd921feaa09
Checking pipeline run status...
Status: InProgress
Status: InProgress
Checking copy activity run details...
{
    "dataRead": 331452208,
    "dataWritten": 331452208,
    "copyDuration": 23,
    "throughput": 14073.209,
    "errors": [],
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (West US)",
    "usedDataIntegrationUnits": 2,
    "billedDuration": 23
}

Press any key to exit...

Comprobación del resultado

La canalización crea automáticamente la carpeta de salida en el contenedor de blobs adftutorial. A continuación, copia el archivo emp.txt de la carpeta de entrada a la carpeta de salida.

  1. En Azure Portal, en la página del contenedor adftutorial que detuvo en la sección anterior Agregar una carpeta de entrada y un archivo para el contenedor de blobs, seleccione Actualizar para ver la carpeta de salida.
  2. En la lista de carpetas, haga clic en output.
  3. Confirme que emp.txt se ha copiado en la carpeta de salida.

Limpieza de recursos

Para eliminar la factoría de datos mediante programación, agregue las líneas de código siguientes al programa:

Console.WriteLine("Deleting the data factory");
client.Factories.Delete(resourceGroup, dataFactoryName);

Pasos siguientes

La canalización de este ejemplo copia los datos de una ubicación a otra en una instancia de Azure Blob Storage. Consulte los tutoriales para obtener información acerca del uso de Data Factory en otros escenarios.