Partilhar via


Atividades de ramificação e encadeamento num pipeline do Data Factory

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Gorjeta

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!

Neste tutorial, você cria um pipeline do Data Factory que mostra alguns recursos de fluxo de controle. Esse pipeline copia de um contêiner no Armazenamento de Blobs do Azure para outro contêiner na mesma conta de armazenamento. Se a atividade de cópia for bem-sucedida, o pipeline enviará detalhes da operação de cópia bem-sucedida em um e-mail. Essas informações podem incluir a quantidade de dados gravados. Se a atividade de cópia falhar, ela enviará detalhes da falha de cópia, como a mensagem de erro, em um e-mail. Ao longo do tutorial, vai ver como passar os parâmetros.

Este gráfico fornece uma visão geral do cenário:

O diagrama mostra o Armazenamento de Blobs do Azure, que é o destino de uma cópia, que, em caso de sucesso, envia um email com detalhes ou, em caso de falha, envia um email com detalhes de erro.

Este tutorial mostra como executar as seguintes tarefas:

  • Criar uma fábrica de dados
  • Criar um serviço ligado do Armazenamento do Azure
  • Criar um conjunto de dados do Blob do Azure
  • Criar um pipeline que contém uma atividade de cópia e uma atividade da Web
  • Enviar saídas de atividades para atividades posteriores
  • Usar passagem de parâmetros e variáveis do sistema
  • Iniciar um execução de pipeline
  • Monitorizar o pipeline e execuções de atividades

Este tutorial utiliza o .NET SDK. Você pode usar outros mecanismos para interagir com o Azure Data Factory. Para guias de início rápido do Data Factory, consulte Inícios rápidos de 5 minutos.

Se não tiver uma subscrição do Azure, crie uma conta gratuita antes de começar.

Pré-requisitos

  • Conta do Armazenamento do Azure. Você usa o armazenamento de blob como um armazenamento de dados de origem. Se você não tiver uma conta de armazenamento do Azure, consulte Criar uma conta de armazenamento.
  • Explorador de Armazenamento do Azure. Para instalar essa ferramenta, consulte Gerenciador de Armazenamento do Azure.
  • Base de Dados SQL do Azure. Pode utilizar a base de dados como um arquivo de dados sink. Se você não tiver um banco de dados no Banco de Dados SQL do Azure, consulte Criar um banco de dados no Banco de Dados SQL do Azure.
  • Visual Studio. Este artigo usa o Visual Studio 2019.
  • SDK do Azure .NET. Baixe e instale o SDK do Azure .NET.

Para obter uma lista das regiões do Azure nas quais o Data Factory está disponível no momento, consulte Produtos disponíveis por região. Os armazenamentos e cálculos de dados podem estar em outras regiões. As lojas incluem o Armazenamento do Azure e o Banco de Dados SQL do Azure. Os cálculos incluem o HDInsight, que o Data Factory usa.

Crie um aplicativo conforme descrito em Criar um aplicativo Microsoft Entra. Atribua o aplicativo à função de Colaborador seguindo as instruções no mesmo artigo. Você precisará de vários valores para partes posteriores deste tutorial, como ID do aplicativo (cliente) e ID do diretório (locatário).

Criar uma tabela de blob

  1. Abra um editor de texto. Copie o texto a seguir e salve-o localmente como input.txt.

    Ethel|Berg
    Tamika|Walsh
    
  2. Abra o Gerenciador de Armazenamento do Azure. Expanda a sua conta de armazenamento. Clique com o botão direito do rato em Contentores de Blobs e selecione Criar Contentor de Blobs.

  3. Nomeie o novo contêiner adfv2branch e selecione Carregar para adicionar seu arquivo de input.txt ao contêiner.

Criar projeto do Visual Studio

Crie um aplicativo de console .NET em C#:

  1. Inicie o Visual Studio e selecione Criar um novo projeto.
  2. Em Criar um novo projeto, escolha Aplicativo de Console (.NET Framework) para C# e selecione Avançar.
  3. Nomeie o projeto ADFv2BranchTutorial.
  4. Selecione .NET versão 4.5.2 ou superior e, em seguida, selecione Criar.

Instalar pacotes NuGet

  1. Selecione Ferramentas NuGet Package Manager Package Manager Console (Ferramentas>NuGet Package Manager>PackageManager Console).

  2. No Console do Gerenciador de Pacotes, execute os seguintes comandos para instalar pacotes. Consulte Microsoft.Azure.Management.DataFactory nuget package para obter detalhes.

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

Criar um cliente de fábrica de dados

  1. Abra Program.cs e adicione as seguintes instruções:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. Adicione essas variáveis estáticas à Program classe. Substitua os marcadores de posição pelos seus próprios valores.

    // Set variables
    static string tenantID = "<tenant ID>";
    static string applicationId = "<application ID>";
    static string authenticationKey = "<Authentication key for your application>";
    static string subscriptionId = "<Azure subscription ID>";
    static string resourceGroup = "<Azure resource group name>";
    
    static string region = "East US";
    static string dataFactoryName = "<Data factory name>";
    
    // Specify the source Azure Blob information
    static string storageAccount = "<Azure Storage account name>";
    static string storageKey = "<Azure Storage account key>";
    // confirm that you have the input.txt file placed in th input folder of the adfv2branch container.
    static string inputBlobPath = "adfv2branch/input";
    static string inputBlobName = "input.txt";
    static string outputBlobPath = "adfv2branch/output";
    static string emailReceiver = "<specify email address of the receiver>";
    
    static string storageLinkedServiceName = "AzureStorageLinkedService";
    static string blobSourceDatasetName = "SourceStorageDataset";
    static string blobSinkDatasetName = "SinkStorageDataset";
    static string pipelineName = "Adfv2TutorialBranchCopy";
    
    static string copyBlobActivity = "CopyBlobtoBlob";
    static string sendFailEmailActivity = "SendFailEmailActivity";
    static string sendSuccessEmailActivity = "SendSuccessEmailActivity";
    
  3. Adicione o seguinte código ao método Main. Esse código cria uma instância de DataFactoryManagementClient classe. Em seguida, use esse objeto para criar data factory, serviço vinculado, conjuntos de dados e pipeline. Você também pode usar esse objeto para monitorar os detalhes de execução do pipeline.

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
    

Criar uma fábrica de dados

  1. Adicione um CreateOrUpdateDataFactory método ao seu arquivo Program.cs :

    static Factory CreateOrUpdateDataFactory(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating data factory " + dataFactoryName + "...");
        Factory resource = new Factory
        {
            Location = region
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
    
        Factory response;
        {
            response = client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, resource);
        }
    
        while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation")
        {
            System.Threading.Thread.Sleep(1000);
        }
        return response;
    }
    
  2. Adicione a seguinte linha ao Main método que cria um data factory:

    Factory df = CreateOrUpdateDataFactory(client);
    

Criar um serviço ligado do Armazenamento do Azure

  1. Adicione um StorageLinkedServiceDefinition método ao seu arquivo Program.cs :

    static LinkedServiceResource StorageLinkedServiceDefinition(DataFactoryManagementClient client)
    {
       Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
       AzureStorageLinkedService storageLinkedService = new AzureStorageLinkedService
       {
           ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey)
       };
       Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
       LinkedServiceResource linkedService = new LinkedServiceResource(storageLinkedService, name:storageLinkedServiceName);
       return linkedService;
    }
    
  2. Adicione a seguinte linha ao Main método que cria um serviço vinculado do Armazenamento do Azure:

    client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
    

Para obter mais informações sobre propriedades e detalhes suportados, consulte Propriedades do serviço vinculado.

Criar conjuntos de dados

Nesta seção, você cria dois conjuntos de dados, um para a origem e outro para o coletor.

Criar um conjunto de dados para um Blob do Azure de origem

Adicione um método que cria um conjunto de dados de blob do Azure. Para obter mais informações sobre propriedades e detalhes com suporte, consulte Propriedades do conjunto de dados do Blob do Azure.

Adicione um SourceBlobDatasetDefinition método ao seu arquivo Program.cs :

static DatasetResource SourceBlobDatasetDefinition(DataFactoryManagementClient client)
{
    Console.WriteLine("Creating dataset " + blobSourceDatasetName + "...");
    AzureBlobDataset blobDataset = new AzureBlobDataset
    {
        FolderPath = new Expression { Value = "@pipeline().parameters.sourceBlobContainer" },
        FileName = inputBlobName,
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = storageLinkedServiceName
        }
    };
    Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
    DatasetResource dataset = new DatasetResource(blobDataset, name:blobSourceDatasetName);
    return dataset;
}

Defina um conjunto de dados que represente os dados de origem no Blob do Azure. Este conjunto de dados de Blob refere-se ao serviço vinculado do Armazenamento do Azure com suporte na etapa anterior. O conjunto de dados Blob descreve o local do blob do qual copiar: FolderPath e FileName.

Observe o uso de parâmetros para o FolderPath. sourceBlobContainer é o nome do parâmetro e a expressão é substituída pelos valores passados na execução do pipeline. A sintaxe para definir os parâmetros é @pipeline().parameters.<parameterName>

Criar um conjunto de dados para um coletor de Blob do Azure

  1. Adicione um SourceBlobDatasetDefinition método ao seu arquivo Program.cs :

    static DatasetResource SinkBlobDatasetDefinition(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating dataset " + blobSinkDatasetName + "...");
        AzureBlobDataset blobDataset = new AzureBlobDataset
        {
            FolderPath = new Expression { Value = "@pipeline().parameters.sinkBlobContainer" },
            LinkedServiceName = new LinkedServiceReference
            {
                ReferenceName = storageLinkedServiceName
            }
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
        DatasetResource dataset = new DatasetResource(blobDataset, name: blobSinkDatasetName);
        return dataset;
    }
    
  2. Adicione o código a seguir ao Main método que cria os conjuntos de dados de origem e coletor de Blob do Azure.

    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
    
    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
    

Criar uma classe C#: EmailRequest

Em seu projeto C#, crie uma classe chamada EmailRequest. Essa classe define quais propriedades o pipeline envia na solicitação de corpo ao enviar um email. Neste tutorial, o pipeline envia quatro propriedades do pipeline para o e-mail:

  • Mensagem. Corpo do e-mail. Para uma cópia bem-sucedida, essa propriedade contém a quantidade de dados gravados. Para uma cópia com falha, esta propriedade contém detalhes do erro.
  • Nome da fábrica de dados. Nome do data factory.
  • Nome do pipeline. Nome do pipeline.
  • Recetor. Parâmetro que passa. Esta propriedade especifica o recetor do e-mail.
    class EmailRequest
    {
        [Newtonsoft.Json.JsonProperty(PropertyName = "message")]
        public string message;

        [Newtonsoft.Json.JsonProperty(PropertyName = "dataFactoryName")]
        public string dataFactoryName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "pipelineName")]
        public string pipelineName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "receiver")]
        public string receiver;

        public EmailRequest(string input, string df, string pipeline, string receiverName)
        {
            message = input;
            dataFactoryName = df;
            pipelineName = pipeline;
            receiver = receiverName;
        }
    }

Criar pontos finais de fluxo de trabalho de e-mail

Para acionar o envio de um email, use os Aplicativos Lógicos do Azure para definir o fluxo de trabalho. Para obter mais informações, consulte Criar um exemplo de fluxo de trabalho do aplicativo lógico de consumo.

Fluxo de trabalho de e-mail de êxito

No portal do Azure, crie um fluxo de trabalho de aplicativo lógico chamado CopySuccessEmail. Adicione o gatilho de solicitação chamado Quando uma solicitação HTTP for recebida. No gatilho Solicitação, preencha a caixa Esquema JSON do Corpo da Solicitação com o seguinte JSON:

{
    "properties": {
        "dataFactoryName": {
            "type": "string"
        },
        "message": {
            "type": "string"
        },
        "pipelineName": {
            "type": "string"
        },
        "receiver": {
            "type": "string"
        }
    },
    "type": "object"
}

Seu fluxo de trabalho se parece com o exemplo a seguir:

Fluxo de trabalho de e-mail de êxito

Esse conteúdo JSON está alinhado com a EmailRequest classe criada na seção anterior.

Adicione a ação do Office 365 Outlook chamada Enviar um email. Para esta ação, personalize como você deseja formatar o e-mail, usando as propriedades passadas no esquema JSON do corpo da solicitação. Eis um exemplo:

Designer de fluxo de trabalho com a ação chamada Enviar um e-mail.

Depois de salvar o fluxo de trabalho, copie e salve o valor HTTP POST URL do gatilho.

Fluxo de trabalho de e-mail de falha

Clone o fluxo de trabalho do CopySuccessEmail aplicativo lógico em um novo fluxo de trabalho chamado CopyFailEmail. No gatilho Request, o esquema JSON do corpo da solicitação é o mesmo. Altere o formato do e-mail, como o Subject, para adaptar a um e-mail de falha. Segue-se um exemplo:

Designer de fluxo de trabalho e o fluxo de trabalho de email com falha.

Depois de salvar o fluxo de trabalho, copie e salve o valor HTTP POST URL do gatilho.

Agora você deve ter duas URLs de fluxo de trabalho, como os exemplos a seguir:

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

Criar um pipeline

Volte para seu projeto no Visual Studio. Agora adicionaremos o código que cria um pipeline com uma atividade de cópia e DependsOn propriedade. Neste tutorial, o pipeline contém uma atividade, uma atividade de cópia, que recebe o conjunto de dados Blob como uma fonte e outro conjunto de dados Blob como um coletor. Se a atividade de cópia for bem-sucedida ou falhar, ela chamará tarefas de email diferentes.

Neste pipeline, utiliza as seguintes funcionalidades:

  • Parâmetros
  • Atividade Web
  • Dependência das atividades
  • Usando a saída de uma atividade como entrada para outra atividade
  1. Adicione este método ao seu projeto. As seções a seguir fornecem mais detalhes.

    static PipelineResource PipelineDefinition(DataFactoryManagementClient client)
            {
                Console.WriteLine("Creating pipeline " + pipelineName + "...");
                PipelineResource resource = new PipelineResource
                {
                    Parameters = new Dictionary<string, ParameterSpecification>
                    {
                        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    
                    },
                    Activities = new List<Activity>
                    {
                        new CopyActivity
                        {
                            Name = copyBlobActivity,
                            Inputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSourceDatasetName
                                }
                            },
                            Outputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSinkDatasetName
                                }
                            },
                            Source = new BlobSource { },
                            Sink = new BlobSink { }
                        },
                        new WebActivity
                        {
                            Name = sendSuccessEmailActivity,
                            Method = WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/00000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Succeeded" }
                                }
                            }
                        },
                        new WebActivity
                        {
                            Name = sendFailEmailActivity,
                            Method =WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').error.message}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Failed" }
                                }
                            }
                        }
                    }
                };
                Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
                return resource;
            }
    
  2. Adicione a seguinte linha ao Main método que cria o pipeline:

    client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
    

Parâmetros

A primeira seção do nosso código de pipeline define parâmetros.

  • sourceBlobContainer. O conjunto de dados de blob de origem consome esse parâmetro no pipeline.
  • sinkBlobContainer. O conjunto de dados de blob do coletor consome esse parâmetro no pipeline.
  • receiver. As duas atividades da Web no pipeline que enviam e-mails de sucesso ou falha para o destinatário usam esse parâmetro.
Parameters = new Dictionary<string, ParameterSpecification>
    {
        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    },

Atividade Web

A atividade da Web permite uma chamada para qualquer ponto de extremidade REST. Para obter mais informações sobre a atividade, consulte Atividade da Web no Azure Data Factory. Esse pipeline usa uma atividade da Web para chamar o fluxo de trabalho de email dos Aplicativos Lógicos. Você cria duas atividades da Web: uma que chama o CopySuccessEmail fluxo de trabalho e outra que chama o CopyFailWorkFlow.

        new WebActivity
        {
            Name = sendCopyEmailActivity,
            Method = WebActivityMethod.POST,
            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/12345",
            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
            DependsOn = new List<ActivityDependency>
            {
                new ActivityDependency
                {
                    Activity = copyBlobActivity,
                    DependencyConditions = new List<String> { "Succeeded" }
                }
            }
        }

Url Na propriedade, cole os pontos de extremidade HTTP POST URL de seus fluxos de trabalho de Aplicativos Lógicos. Body Na propriedade, passe uma instância da EmailRequest classe. O pedido de e-mail contém as seguintes propriedades:

  • Mensagem. Passa o valor de @{activity('CopyBlobtoBlob').output.dataWritten. Acessa uma propriedade da atividade de cópia anterior e passa o valor de dataWritten. Para o caso de falha, passe a saída de erro em vez de @{activity('CopyBlobtoBlob').error.message.
  • Nome do Data Factory. Passa o valor de Esta variável de sistema permite que você acesse o nome de fábrica de @{pipeline().DataFactory} dados correspondente. Para obter uma lista de variáveis do sistema, consulte Variáveis do sistema.
  • Nome do pipeline. Passa o valor de @{pipeline().Pipeline}. Essa variável de sistema permite que você acesse o nome do pipeline correspondente.
  • Recetor. Passa o valor de "@pipeline().parameters.receiver". Acessa os parâmetros do pipeline.

Esse código cria uma nova dependência de atividade que depende da atividade de cópia anterior.

Criar uma execução de pipeline

Adicione o seguinte código ao Main método que dispara uma execução de pipeline.

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Classe principal

Seu método final Main deve ter esta aparência.

// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };

Factory df = CreateOrUpdateDataFactory(client);

client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));

client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));

Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Compile e execute o seu programa para acionar uma execução de pipeline!

Monitorizar uma execução de pipeline

  1. Adicione o seguinte código ao método Main:

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    

    Esse código verifica continuamente o status da execução até concluir a cópia dos dados.

  2. Adicione o seguinte código ao Main método que recupera os detalhes da execução da atividade de cópia, por exemplo, o tamanho dos dados lidos/gravados:

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    List<ActivityRun> activityRuns = client.ActivityRuns.ListByPipelineRun(
    resourceGroup, dataFactoryName, runResponse.RunId, DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)).ToList();
    
    if (pipelineRun.Status == "Succeeded")
    {
        Console.WriteLine(activityRuns.First().Output);
        //SaveToJson(SafeJsonConvert.SerializeObject(activityRuns.First().Output, client.SerializationSettings), "ActivityRunResult.json", folderForJsons);
    }
    else
        Console.WriteLine(activityRuns.First().Error);
    
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

Executar o código

Crie e inicie a aplicação e, em seguida, verifique a execução de pipeline.

O aplicativo exibe o progresso da criação de data factory, serviço vinculado, conjuntos de dados, pipeline e execução de pipeline. Em seguida, verifica o estado de execução do pipeline. Aguarde até ver os detalhes da execução da atividade de cópia com o tamanho dos dados lidos/escritos. Em seguida, use ferramentas como o Gerenciador de Armazenamento do Azure para verificar se o blob foi copiado para outputBlobPath de inputBlobPath conforme especificado em variáveis.

Sua saída deve ser semelhante ao seguinte exemplo:

Creating data factory DFTutorialTest...
{
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "type": "AzureStorage",
  "typeProperties": {
    "connectionString": "DefaultEndpointsProtocol=https;AccountName=***;AccountKey=***"
  }
}
Creating dataset SourceStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sourceBlobContainer"
    },
    "fileName": "input.txt"
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating dataset SinkStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sinkBlobContainer"
    }
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating pipeline Adfv2TutorialBranchCopy...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SourceStorageDataset"
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SinkStorageDataset"
          }
        ],
        "name": "CopyBlobtoBlob"
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').output.dataWritten}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendSuccessEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Succeeded"
            ]
          }
        ]
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').error.message}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendFailEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Failed"
            ]
          }
        ]
      }
    ],
    "parameters": {
      "sourceBlobContainer": {
        "type": "String"
      },
      "sinkBlobContainer": {
        "type": "String"
      },
      "receiver": {
        "type": "String"
      }
    }
  }
}
Creating pipeline run...
Pipeline run ID: 00000000-0000-0000-0000-0000000000000
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 20,
  "dataWritten": 20,
  "copyDuration": 4,
  "throughput": 0.01,
  "errors": [],
  "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
}
{}

Press any key to exit...

Você executou as seguintes tarefas neste tutorial:

  • Criar uma fábrica de dados
  • Criar um serviço ligado do Armazenamento do Azure
  • Criar um conjunto de dados do Blob do Azure
  • Criar um pipeline que contém uma atividade de cópia e uma atividade da Web
  • Enviar saídas de atividades para atividades posteriores
  • Usar passagem de parâmetros e variáveis do sistema
  • Iniciar um execução de pipeline
  • Monitorizar o pipeline e execuções de atividades

Agora você pode continuar na seção Conceitos para obter mais informações sobre o Azure Data Factory.