Förgrening och kedjning av aktiviteter i en Data Factory-pipeline

GÄLLER FÖR: Azure Data Factory Azure Synapse Analytics

Tips

Data Factory i Microsoft Fabric är nästa generations Azure Data Factory, med en enklare arkitektur, inbyggd AI och nya funktioner. Om dataintegrering är nytt för dig börjar du med Fabric Data Factory. Befintliga ADF-arbetsbelastningar kan uppgraderas till Fabric för att få åtkomst till nya funktioner inom datavetenskap, realtidsanalys och rapportering.

I den här självstudien skapar du en Data Factory-pipeline som visar vissa kontrollflödesfunktioner. Den här pipelinen kopieras från en container i Azure Blob Storage till en annan container på samma lagringskonto. Om kopieringsaktiviteten lyckas skickar pipelinen information om den lyckade kopieringsåtgärden i ett e-postmeddelande. Den informationen kan omfatta mängden data som skrivits. Om kopieringsaktiviteten misslyckas skickar den information om kopieringsfelet, till exempel felmeddelandet, i ett e-postmeddelande. Genom hela självstudiekursen får du se hur man skickar parametrar.

Den här bilden ger en översikt över scenariot:

Diagram visar Azure Blob Storage, som är målet för en kopia, som vid lyckat resultat skickar ett e-postmeddelande med information eller vid fel skickar ett e-postmeddelande med felinformation.

Den här tutorialen visar så här hur du utför följande uppgifter:

  • Skapa en datafabrik
  • Skapa en Azure Storage länkad tjänst
  • Skapa en Azure Blob-datauppsättning
  • Skapa en pipeline som innehåller en kopieringsaktivitet och en webbaktivitet
  • Skicka utdata för aktiviteter till efterföljande aktiviteter
  • Använda parameteröverföring och systemvariabler
  • Starta en pipelinekörning
  • Övervaka pipelinen och aktivitetskörningar

I den här handledningen används .NET SDK. Du kan använda andra mekanismer för att interagera med Azure Data Factory. För snabbstarter av Data Factory, se 5-Minute Quickstarts.

Om du inte har någon Azure prenumeration skapar du ett free-konto innan du börjar.

Förutsättningar

  • Azure Storage konto. Du använder bloblagring som källdatalager. Om du inte har något Azure lagringskonto kan du läsa Skapa ett lagringskonto.
  • Azure Storage Explorer. Information om hur du installerar det här verktyget finns i Azure Storage Explorer.
  • Azure SQL Database. Du använder databasen som mottagare för datalagringen. Om du inte har någon databas i Azure SQL Database kan du läsa Skapa en databas i Azure SQL Database.
  • Visual Studio. Den här artikeln använder Visual Studio 2019.
  • Azure .NET SDK. Ladda ned och installera Azure .NET SDK.

En lista över Azure regioner där Data Factory är tillgängligt finns i Products available by region. Datalager och beräkningar kan finnas i andra regioner. Butikerna innehåller Azure Storage och Azure SQL Database. Beräkningen omfattar HDInsight, som Data Factory använder.

Skapa ett program enligt beskrivningen i Skapa ett Microsoft Entra-program. Tilldela programmet rollen Deltagare genom att följa anvisningarna i samma artikel. Du behöver flera värden för senare delar av den här självstudien, till exempel program (klient)-ID och katalog (klientorganisation)-ID.

Skapa en blobtabell

  1. Öppna en textredigerare. Kopiera följande text och spara den lokalt som input.txt.

    Ethel|Berg
    Tamika|Walsh
    
  2. Öppna Azure Storage Explorer. Expandera ditt lagringskonto. Högerklicka på Blobcontainrar och välj Skapa blobcontainer.

  3. Ge den nya containern namnet adfv2branch och välj Ladda upp för att lägga till filen input.txt i containern.

Skapa Visual Studio projekt

Skapa ett C#-.NET konsolprogram:

  1. Starta Visual Studio och välj Skapa ett nytt projekt.
  2. I Skapa ett nytt projekt väljer du Konsolapp (.NET Framework) för C# och väljer Nästa.
  3. Ge projektet namnet ADFv2BranchTutorial.
  4. Välj .NET version 4.5.2 eller senare och välj sedan Skapa.

Installera NuGet-paket

  1. Välj Tools>NuGet Package Manager>Package Manager Console.

  2. I Package Manager Console kör du följande kommandon för att installera paket. Se Microsoft.Azure. Management.DataFactory nuget package för mer information.

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

Skapa en datafabriksklient

  1. Öppna Program.cs och lägg till följande instruktioner:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. Lägg till dessa statiska variabler i Program klassen. Ersätt platshållarna med dina egna värden.

    // Set variables
    static string tenantID = "<tenant ID>";
    static string applicationId = "<application ID>";
    static string authenticationKey = "<Authentication key for your application>";
    static string subscriptionId = "<Azure subscription ID>";
    static string resourceGroup = "<Azure resource group name>";
    
    static string region = "East US";
    static string dataFactoryName = "<Data factory name>";
    
    // Specify the source Azure Blob information
    static string storageAccount = "<Azure Storage account name>";
    static string storageKey = "<Azure Storage account key>";
    // confirm that you have the input.txt file placed in th input folder of the adfv2branch container.
    static string inputBlobPath = "adfv2branch/input";
    static string inputBlobName = "input.txt";
    static string outputBlobPath = "adfv2branch/output";
    static string emailReceiver = "<specify email address of the receiver>";
    
    static string storageLinkedServiceName = "AzureStorageLinkedService";
    static string blobSourceDatasetName = "SourceStorageDataset";
    static string blobSinkDatasetName = "SinkStorageDataset";
    static string pipelineName = "Adfv2TutorialBranchCopy";
    
    static string copyBlobActivity = "CopyBlobtoBlob";
    static string sendFailEmailActivity = "SendFailEmailActivity";
    static string sendSuccessEmailActivity = "SendSuccessEmailActivity";
    
  3. Lägg till följande kod i metoden Main. Den här koden skapar en instans av DataFactoryManagementClient klassen. Sedan använder du det här objektet för att skapa datafabrik, länkad tjänst, datauppsättningar och pipeline. Du kan också använda det här objektet för att övervaka pipelinekörningsinformationen.

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
    

Skapa en datafabrik

  1. Lägg till en CreateOrUpdateDataFactory metod i din Program.cs-fil :

    static Factory CreateOrUpdateDataFactory(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating data factory " + dataFactoryName + "...");
        Factory resource = new Factory
        {
            Location = region
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
    
        Factory response;
        {
            response = client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, resource);
        }
    
        while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation")
        {
            System.Threading.Thread.Sleep(1000);
        }
        return response;
    }
    
  2. Lägg till följande rad i metoden Main som skapar en datafabrik:

    Factory df = CreateOrUpdateDataFactory(client);
    

Skapa en Azure Storage länkad tjänst

  1. Lägg till en StorageLinkedServiceDefinition metod i din Program.cs-fil :

    static LinkedServiceResource StorageLinkedServiceDefinition(DataFactoryManagementClient client)
    {
       Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
       AzureStorageLinkedService storageLinkedService = new AzureStorageLinkedService
       {
           ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey)
       };
       Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
       LinkedServiceResource linkedService = new LinkedServiceResource(storageLinkedService, name:storageLinkedServiceName);
       return linkedService;
    }
    
  2. Lägg till följande rad i metoden Main som skapar en Azure Storage länkad tjänst:

    client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
    

Mer information om stöttade egenskaper och detaljer finns i Länkade tjänstegenskaper.

Skapa datauppsättningar

I det här avsnittet skapar du två datauppsättningar, en för källan och en för mottagaren.

Skapa en datauppsättning för en källa Azure Blob

Lägg till en metod som skapar en Azure blob-datauppsättning. För mer information om stödda egenskaper och detaljer, se Azure Blob-datasetegenskaper.

Lägg till en SourceBlobDatasetDefinition metod i din Program.cs-fil :

static DatasetResource SourceBlobDatasetDefinition(DataFactoryManagementClient client)
{
    Console.WriteLine("Creating dataset " + blobSourceDatasetName + "...");
    AzureBlobDataset blobDataset = new AzureBlobDataset
    {
        FolderPath = new Expression { Value = "@pipeline().parameters.sourceBlobContainer" },
        FileName = inputBlobName,
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = storageLinkedServiceName
        }
    };
    Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
    DatasetResource dataset = new DatasetResource(blobDataset, name:blobSourceDatasetName);
    return dataset;
}

Du definierar en datauppsättning som representerar källdata i Azure Blob. Den här blobdatauppsättningen refererar till den Azure Storage länkade tjänst som stöds i föregående steg. Blobdatauppsättningen beskriver platsen för bloben som ska kopieras från: FolderPath och FileName.

Observera användningen av parametrar för FolderPath. sourceBlobContainer är namnet på parametern och uttrycket ersätts med de värden som skickas i pipelinekörningen. Syntaxen för att definiera parametrar är @pipeline().parameters.<parameterName>

Skapa en datauppsättning för en mottagare Azure Blob

  1. Lägg till en SourceBlobDatasetDefinition metod i din Program.cs-fil :

    static DatasetResource SinkBlobDatasetDefinition(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating dataset " + blobSinkDatasetName + "...");
        AzureBlobDataset blobDataset = new AzureBlobDataset
        {
            FolderPath = new Expression { Value = "@pipeline().parameters.sinkBlobContainer" },
            LinkedServiceName = new LinkedServiceReference
            {
                ReferenceName = storageLinkedServiceName
            }
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
        DatasetResource dataset = new DatasetResource(blobDataset, name: blobSinkDatasetName);
        return dataset;
    }
    
  2. Lägg till följande kod i metoden Main som skapar både Azure Blob-käll- och mottagardatauppsättningar.

    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
    
    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
    

Skapa en C#-klass: EmailRequest

I ditt C#-projekt skapar du en klass med namnet EmailRequest. Den här klassen definierar vilka egenskaper pipelinen skickar i brödtextbegäran när du skickar ett e-postmeddelande. I den här självstudiekursen skickar pipelinen fyra egenskaper från pipelinen till e-postmeddelandet:

  • Meddelande. E-postmeddelandets brödtext För en lyckad kopiering innehåller den här egenskapen mängden data som har skrivits. För en misslyckad kopia innehåller den här egenskapen information om felet.
  • Namn på datafabrik. Namnet på datafabriken.
  • Pipelinenamn. Namnet på pipelinen.
  • Mottagare. Parameter som går igenom. Den här egenskapen anger mottagaren av e-postmeddelandet.
    class EmailRequest
    {
        [Newtonsoft.Json.JsonProperty(PropertyName = "message")]
        public string message;

        [Newtonsoft.Json.JsonProperty(PropertyName = "dataFactoryName")]
        public string dataFactoryName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "pipelineName")]
        public string pipelineName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "receiver")]
        public string receiver;

        public EmailRequest(string input, string df, string pipeline, string receiverName)
        {
            message = input;
            dataFactoryName = df;
            pipelineName = pipeline;
            receiver = receiverName;
        }
    }

Skapa slutpunkter för e-postarbetsflödet

Om du vill utlösa sändning av ett e-postmeddelande använder du Azure Logic Apps för att definiera arbetsflödet. Mer information finns i Skapa ett exempel på ett arbetsflöde för en förbrukningslogikapp.

Lyckat e-postarbetsflöde

I Azure-portalen skapar du ett logikapparbetsflöde med namnet CopySuccessEmail. Lägg till triggern med namnet När en HTTP-begäran tas emot. I begäranutlösaren fyller du i rutan Begärandetext JSON-schema med följande JSON:

{
    "properties": {
        "dataFactoryName": {
            "type": "string"
        },
        "message": {
            "type": "string"
        },
        "pipelineName": {
            "type": "string"
        },
        "receiver": {
            "type": "string"
        }
    },
    "type": "object"
}

Arbetsflödet ser ut ungefär som i följande exempel:

Lyckat e-postarbetsflöde

Det här JSON-innehållet överensstämmer med den EmailRequest klass som du skapade i föregående avsnitt.

Lägg till åtgärden Office 365 Outlook med namnet Send ett e-postmeddelande. För den här åtgärden anpassar du hur du vill formatera e-postmeddelandet med hjälp av egenskaperna som skickas i JSON-schemat för begärandetext. Här är ett exempel:

Arbetsflödesdesigner med åtgärden Skicka ett e-postmeddelande.

När du har sparat flödet kopierar och sparar du HTTP POST-URL-värdet från utlösaren.

Misslyckat arbetsflöde för e-post

Klona logikappens CopySuccessEmail arbetsflöde till ett nytt arbetsflöde med namnet CopyFailEmail. I begärandeutlösaren är JSON-schemat för begärandetext detsamma. Ändra formatet på ditt e-postmeddelande som Subject för att anpassa det till ett felmeddelande. Här är ett exempel:

Arbetsflödesdesignern och arbetsflödet för e-post som inte fungerar.

När du har sparat flödet kopierar och sparar du HTTP POST-URL-värdet från utlösaren.

Du bör nu ha två arbetsflödes-URL:er, som följande exempel:

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

Skapa en pipeline

Gå tillbaka till projektet i Visual Studio. Nu ska vi lägga till koden som skapar en pipeline med en kopieringsaktivitet och DependsOn -egenskap. I den här självstudien innehåller pipelinen en aktivitet, en kopieringsaktivitet, som tar in blobdatauppsättningen som källa och en annan Blob-datauppsättning som mottagare. Om kopieringsaktiviteten lyckas eller misslyckas anropas olika e-postmeddelanden.

I denna pipeline kan du använda följande funktioner:

  • Parametrar
  • Webbaktivitet
  • Aktivitetsberoende
  • Använda utdata från en aktivitet som indata till en annan aktivitet
  1. Lägg till den här metoden i projektet. Följande avsnitt innehåller mer information.

    static PipelineResource PipelineDefinition(DataFactoryManagementClient client)
            {
                Console.WriteLine("Creating pipeline " + pipelineName + "...");
                PipelineResource resource = new PipelineResource
                {
                    Parameters = new Dictionary<string, ParameterSpecification>
                    {
                        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    
                    },
                    Activities = new List<Activity>
                    {
                        new CopyActivity
                        {
                            Name = copyBlobActivity,
                            Inputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSourceDatasetName
                                }
                            },
                            Outputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSinkDatasetName
                                }
                            },
                            Source = new BlobSource { },
                            Sink = new BlobSink { }
                        },
                        new WebActivity
                        {
                            Name = sendSuccessEmailActivity,
                            Method = WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/00000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Succeeded" }
                                }
                            }
                        },
                        new WebActivity
                        {
                            Name = sendFailEmailActivity,
                            Method =WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').error.message}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Failed" }
                                }
                            }
                        }
                    }
                };
                Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
                return resource;
            }
    
  2. Lägg till följande rad i metoden Main som skapar pipelinen:

    client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
    

Parametrar

Det första avsnittet i vår pipelinekod definierar parametrar.

  • sourceBlobContainer. Källblobdatauppsättningen använder den här parametern i pipelinen.
  • sinkBlobContainer. Datauppsättningen för mottagarblob använder den här parametern i pipelinen.
  • receiver. De två webbaktiviteterna i pipelinen som skickar lyckade eller misslyckade e-postmeddelanden till mottagaren använder den här parametern.
Parameters = new Dictionary<string, ParameterSpecification>
    {
        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    },

Webbaktivitet

Webbaktiviteten tillåter ett anrop till valfri REST-slutpunkt. Mer information om aktiviteten finns i Web-aktivitet i Azure Data Factory. Den här pipelinen använder en webbaktivitet för att anropa Logic Apps e-postarbetsflöde. Du skapar två webbaktiviteter: en som anropar arbetsflödet CopySuccessEmail och en som anropar CopyFailWorkFlow.

        new WebActivity
        {
            Name = sendCopyEmailActivity,
            Method = WebActivityMethod.POST,
            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/12345",
            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
            DependsOn = new List<ActivityDependency>
            {
                new ActivityDependency
                {
                    Activity = copyBlobActivity,
                    DependencyConditions = new List<String> { "Succeeded" }
                }
            }
        }

I egenskapen Url klistrar du in HTTP POST-URL-slutpunkterna från dina Logic Apps-arbetsflöden. I egenskapen Body skickar du en instans av EmailRequest klassen. E-postbegäran innehåller följande egenskaper:

  • Meddelande. Skickar värdet för @{activity('CopyBlobtoBlob').output.dataWritten. Får tillgång till en egenskap av den tidigare kopieringsaktiviteten och skickar värdet av dataWritten. Vid ett fel skickas felutdata i stället för @{activity('CopyBlobtoBlob').error.message.
  • Namn på datafabrik. Skickar värdet för @{pipeline().DataFactory} Den här systemvariabeln gör att du kan komma åt motsvarande datafabriksnamn. En lista över systemvariabler finns i Systemvariabler.
  • Pipelinenamn. Skickar värdet för @{pipeline().Pipeline}. Med den här systemvariabeln kan du komma åt motsvarande pipelinenamn.
  • Mottagare. Skickar värdet för "@pipeline().parameters.receiver". Kommer åt pipelineparametrarna.

Den här koden skapar ett nytt aktivitetsberoende som är beroende av den tidigare kopieringsaktiviteten.

Skapa en pipelinekörning

Lägg till följande kod i metoden Main som utlöser en pipelinekörning.

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Main-klass

Den slutliga Main metoden bör se ut så här.

// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };

Factory df = CreateOrUpdateDataFactory(client);

client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));

client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));

Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Skapa och kör programmet för att utlösa en pipelinekörning!

Övervaka en pipelinekörning

  1. Lägg till följande kod i metoden Main:

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    

    Den här koden kontrollerar kontinuerligt statusen för programkörningen tills den är klar med att kopiera data.

  2. Lägg till följande kod i metoden Main som hämtar information om kopieringsaktivitetskörning, till exempel storleken på data som lästs/skrivits:

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    List<ActivityRun> activityRuns = client.ActivityRuns.ListByPipelineRun(
    resourceGroup, dataFactoryName, runResponse.RunId, DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)).ToList();
    
    if (pipelineRun.Status == "Succeeded")
    {
        Console.WriteLine(activityRuns.First().Output);
        //SaveToJson(SafeJsonConvert.SerializeObject(activityRuns.First().Output, client.SerializationSettings), "ActivityRunResult.json", folderForJsons);
    }
    else
        Console.WriteLine(activityRuns.First().Error);
    
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

Kör koden

Skapa och starta programmet och kontrollera sedan pipelinekörningen.

Programmet visar förloppet för att skapa datafabrik, länkad tjänst, datauppsättningar, pipeline och pipelinekörning. Sedan kontrolleras status för pipelinekörningen. Vänta tills du ser information om körningen av kopieringsaktiviteten med storlek för lästa/skrivna data. Använd sedan verktyg som Azure Storage Explorer för att kontrollera att bloben kopierades till outputBlobPath från inputBlobPath som du angav i variabler.

Dina utdata bör likna följande exempel:

Creating data factory DFTutorialTest...
{
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "type": "AzureStorage",
  "typeProperties": {
    "connectionString": "DefaultEndpointsProtocol=https;AccountName=***;AccountKey=***"
  }
}
Creating dataset SourceStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sourceBlobContainer"
    },
    "fileName": "input.txt"
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating dataset SinkStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sinkBlobContainer"
    }
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating pipeline Adfv2TutorialBranchCopy...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SourceStorageDataset"
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SinkStorageDataset"
          }
        ],
        "name": "CopyBlobtoBlob"
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').output.dataWritten}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendSuccessEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Succeeded"
            ]
          }
        ]
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').error.message}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendFailEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Failed"
            ]
          }
        ]
      }
    ],
    "parameters": {
      "sourceBlobContainer": {
        "type": "String"
      },
      "sinkBlobContainer": {
        "type": "String"
      },
      "receiver": {
        "type": "String"
      }
    }
  }
}
Creating pipeline run...
Pipeline run ID: 00000000-0000-0000-0000-0000000000000
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 20,
  "dataWritten": 20,
  "copyDuration": 4,
  "throughput": 0.01,
  "errors": [],
  "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
}
{}

Press any key to exit...

Du har gjort följande uppgifter i den här självstudien:

  • Skapa en datafabrik
  • Skapa en Azure Storage länkad tjänst
  • Skapa en Azure Blob-datauppsättning
  • Skapa en pipeline som innehåller en kopieringsaktivitet och en webbaktivitet
  • Skicka utdata för aktiviteter till efterföljande aktiviteter
  • Använda parameteröverföring och systemvariabler
  • Starta en pipelinekörning
  • Övervaka pipelinen och aktivitetskörningar

Du kan nu fortsätta till avsnittet Begrepp för mer information om Azure Data Factory.