Het vertakken en koppelen van activiteiten in een Data Factory-pijplijn

Van toepassing op: Azure Data Factory Azure Synapse Analytics

Tip

Data Factory in Microsoft Fabric is de volgende generatie van Azure Data Factory, met een eenvoudigere architectuur, ingebouwde AI en nieuwe functies. Als u nieuw bent in gegevensintegratie, begint u met Fabric Data Factory. Bestaande ADF-workloads kunnen upgraden naar Fabric om toegang te krijgen tot nieuwe mogelijkheden voor gegevenswetenschap, realtime analyses en rapportage.

In deze zelfstudie maakt u een Data Factory-pijplijn die enkele stroombeheerfuncties demonstreert. Deze pijplijn kopieert van een container in Azure Blob Storage naar een andere container in hetzelfde opslagaccount. Als de kopieerbewerking is geslaagd, worden de details over de geslaagde kopieerbewerking via de pijplijn verzonden in een e-mail. Deze informatie kan de hoeveelheid geschreven gegevens omvatten. Als de kopieerbewerking is mislukt, worden de details over de mislukte kopieerbewerking (zoals de foutmelding) verzonden in een e-mail. Tijdens de handleiding ziet u hoe u parameters kunt doorgeven.

Deze afbeelding biedt een overzicht van het scenario:

Diagram toont Azure Blob Storage, het doel van een kopie, die bij succes een e-mail met details verzendt of, bij een fout, een e-mailbericht met foutdetails verzendt.

In deze zelfstudie leert u hoe u de volgende taken uitvoert:

  • Een data factory maken
  • Een gekoppelde Azure Storage-service maken
  • Een Azure Blob-gegevensset maken
  • Een pijplijn met een kopieeractiviteit en een webactiviteit maken
  • Stuur uitvoer van activiteiten naar volgende activiteiten
  • Parameters doorgeven en systeemvariabelen gebruiken
  • Een pijplijnuitvoering starten
  • De uitvoering van de pijplijn en van de activiteit controleren

In deze zelfstudie wordt gebruikgemaakt van .NET SDK. U kunt andere mechanismen gebruiken om te communiceren met Azure Data Factory. Zie Quickstarts van 5 minuten voor Data Factory-quickstarts.

Als u geen Azure-abonnement hebt, maakt u een vrij account voordat u begint.

Voorvereisten

  • Azure opslagaccount. U gebruikt de blobopslag als bron-gegevensopslag. Als u geen Azure opslagaccount hebt, raadpleegt u Maak een opslagaccount.
  • Azure Storage Explorer. Zie Azure Storage Explorer om dit hulpprogramma te installeren.
  • Azure SQL Database. U gebruikt de database als sink-gegevensopslag. Als u geen database in Azure SQL Database hebt, raadpleegt u de Database maken in Azure SQL Database.
  • Visual Studio. In dit artikel wordt gebruikgemaakt van Visual Studio 2019.
  • Azure .NET SDK. Download en installeer de Azure .NET SDK.

Zie Products beschikbaar per regio voor een lijst met Azure regio's waarin Data Factory momenteel beschikbaar is. De gegevensopslag en berekeningen kunnen zich in andere regio’s bevinden. De winkels bevatten Azure Storage en Azure SQL Database. De berekeningen bevatten HDInsight, die Data Factory gebruikt.

Maak een toepassing zoals beschreven in Maak een Microsoft Entra-toepassing. Wijs de toepassing toe aan de rol Inzender door de instructies in hetzelfde artikel te volgen. U hebt verschillende waarden nodig voor latere onderdelen van deze zelfstudie, zoals Toepassings-id (client) en Directory-id (tenant).

Een blobtabel maken

  1. Open een teksteditor. Kopieer de volgende tekst en sla deze lokaal op als input.txt.

    Ethel|Berg
    Tamika|Walsh
    
  2. Open Azure Storage Explorer. Breid uw opslagaccount uit. Klik met de rechtermuisknop op Blobcontainers en selecteer Blobcontainer maken.

  3. Geef de nieuwe container de naam adfv2branch en selecteer Uploaden om het bestand input.txt toe te voegen aan de container.

Visual Studio project maken

Een C#-.NET-consoletoepassing maken:

  1. Start Visual Studio en selecteer Maak een nieuw project.
  2. Kies in Maak een nieuw projectConsole-app (.NET Framework) voor C# en selecteer Volgende.
  3. Geef het project de naam ADFv2BranchTutorial.
  4. Selecteer .NET versie 4.5.2 of hoger en selecteer vervolgens Maak.

NuGet-pakketten installeren

  1. Selecteer Tools>NuGet Package Manager>Package Manager Console.

  2. Voer in de Package Manager Console de volgende opdrachten uit om pakketten te installeren. Raadpleeg Microsoft.Azure. Management.DataFactory nuget-pakket voor meer informatie.

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

Een data factory-client maken

  1. Open Program.cs en voeg de volgende instructies toe:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. Voeg deze statische variabelen toe aan de klasse Program. Vervang de plaatsaanduidingen door uw eigen waarden.

    // Set variables
    static string tenantID = "<tenant ID>";
    static string applicationId = "<application ID>";
    static string authenticationKey = "<Authentication key for your application>";
    static string subscriptionId = "<Azure subscription ID>";
    static string resourceGroup = "<Azure resource group name>";
    
    static string region = "East US";
    static string dataFactoryName = "<Data factory name>";
    
    // Specify the source Azure Blob information
    static string storageAccount = "<Azure Storage account name>";
    static string storageKey = "<Azure Storage account key>";
    // confirm that you have the input.txt file placed in th input folder of the adfv2branch container.
    static string inputBlobPath = "adfv2branch/input";
    static string inputBlobName = "input.txt";
    static string outputBlobPath = "adfv2branch/output";
    static string emailReceiver = "<specify email address of the receiver>";
    
    static string storageLinkedServiceName = "AzureStorageLinkedService";
    static string blobSourceDatasetName = "SourceStorageDataset";
    static string blobSinkDatasetName = "SinkStorageDataset";
    static string pipelineName = "Adfv2TutorialBranchCopy";
    
    static string copyBlobActivity = "CopyBlobtoBlob";
    static string sendFailEmailActivity = "SendFailEmailActivity";
    static string sendSuccessEmailActivity = "SendSuccessEmailActivity";
    
  3. Voeg de volgende code toe aan de methode Main. Deze code maakt een instantie van klasse DataFactoryManagementClient. U gebruikt dit object vervolgens voor het maken van een data factory, een gekoppelde service, gegevenssets en een pijplijn. U kunt dit object ook gebruiken om de details van de pijplijnuitvoering te controleren.

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
    

Een data factory maken

  1. Een methode CreateOrUpdateDataFactory toevoegen aan uw bestand Program.cs:

    static Factory CreateOrUpdateDataFactory(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating data factory " + dataFactoryName + "...");
        Factory resource = new Factory
        {
            Location = region
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
    
        Factory response;
        {
            response = client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, resource);
        }
    
        while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation")
        {
            System.Threading.Thread.Sleep(1000);
        }
        return response;
    }
    
  2. Voeg de volgende regel toe aan de methode Main waarmee een data factory wordt gemaakt:

    Factory df = CreateOrUpdateDataFactory(client);
    

Een gekoppelde Azure Storage-service maken

  1. Een methode StorageLinkedServiceDefinition toevoegen aan uw bestand Program.cs:

    static LinkedServiceResource StorageLinkedServiceDefinition(DataFactoryManagementClient client)
    {
       Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
       AzureStorageLinkedService storageLinkedService = new AzureStorageLinkedService
       {
           ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey)
       };
       Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
       LinkedServiceResource linkedService = new LinkedServiceResource(storageLinkedService, name:storageLinkedServiceName);
       return linkedService;
    }
    
  2. Voeg de volgende regel toe aan de methode Main waarmee een Azure Storage gekoppelde service wordt gemaakt:

    client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
    

Zie Eigenschappen van een gekoppelde service voor meer informatie over ondersteunde eigenschappen en details.

Gegevenssets maken

In dit gedeelte maakt u twee gegevenssets: één voor de bron en de andere voor de sink.

Een gegevensset maken voor een bron-Azure-blob

Voeg een methode toe die een Azure Blob-dataset maakt. Voor meer informatie over ondersteunde eigenschappen en details, zie Azure Blob-gegevensseteigenschappen.

Een methode SourceBlobDatasetDefinition toevoegen aan uw bestand Program.cs:

static DatasetResource SourceBlobDatasetDefinition(DataFactoryManagementClient client)
{
    Console.WriteLine("Creating dataset " + blobSourceDatasetName + "...");
    AzureBlobDataset blobDataset = new AzureBlobDataset
    {
        FolderPath = new Expression { Value = "@pipeline().parameters.sourceBlobContainer" },
        FileName = inputBlobName,
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = storageLinkedServiceName
        }
    };
    Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
    DatasetResource dataset = new DatasetResource(blobDataset, name:blobSourceDatasetName);
    return dataset;
}

U definieert een gegevensset die de brongegevens in Azure Blob vertegenwoordigt. Deze Blob-gegevensset verwijst naar de gekoppelde Azure Storage-service die in de vorige stap werd ondersteund. De Blob-gegevensset beschrijft de locatie van de blob waaruit moet worden gekopieerd: FolderPath en FileName.

Let op het gebruik van parameters voor FolderPath. sourceBlobContainer is de naam van de parameter en de expressie wordt vervangen door de waarden die worden doorgegeven in de pijplijnuitvoering. De syntaxis voor het definiëren van parameters is@pipeline().parameters.<parameterName>

Een gegevensset maken voor een sink-Azure Blob

  1. Een methode SourceBlobDatasetDefinition toevoegen aan uw bestand Program.cs:

    static DatasetResource SinkBlobDatasetDefinition(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating dataset " + blobSinkDatasetName + "...");
        AzureBlobDataset blobDataset = new AzureBlobDataset
        {
            FolderPath = new Expression { Value = "@pipeline().parameters.sinkBlobContainer" },
            LinkedServiceName = new LinkedServiceReference
            {
                ReferenceName = storageLinkedServiceName
            }
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
        DatasetResource dataset = new DatasetResource(blobDataset, name: blobSinkDatasetName);
        return dataset;
    }
    
  2. Voeg de volgende code toe aan de Main-methode die zowel Azure Blob-bron- als -doelgegevenssets maakt.

    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
    
    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
    

Maak een C#-klasse: EmailRequest

Maak in uw C#-project een klasse met de naam EmailRequest. Deze klasse definieert welke eigenschappen de pijplijn in de inhoud van het verzoek verzendt bij het verzenden van een e-mail. In deze zelfstudie verzendt de pijplijn vier eigenschappen van de pijplijn naar de e-mail:

  • Bericht. Hoofdtekst van het e-mailbericht. Voor een geslaagde kopie bevat deze eigenschap de hoeveelheid gegevens die is geschreven. Voor een mislukte kopie bevat deze eigenschap details over de fout.
  • Datafabrieknaam Naam van de datafabriek.
  • Naam pijplijn. Naam van de pijplijn.
  • Ontvanger. Parameter die wordt doorgegeven. Met deze eigenschap wordt de ontvanger van het e-mailbericht opgegeven.
    class EmailRequest
    {
        [Newtonsoft.Json.JsonProperty(PropertyName = "message")]
        public string message;

        [Newtonsoft.Json.JsonProperty(PropertyName = "dataFactoryName")]
        public string dataFactoryName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "pipelineName")]
        public string pipelineName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "receiver")]
        public string receiver;

        public EmailRequest(string input, string df, string pipeline, string receiverName)
        {
            message = input;
            dataFactoryName = df;
            pipelineName = pipeline;
            receiver = receiverName;
        }
    }

Eindpunten voor de e-mailwerkstroom maken

Als u het verzenden van een e-mailbericht wilt activeren, gebruikt u Azure Logic Apps om de werkstroom te definiëren. Zie Een voorbeeld van een workflow voor een logische consumptie-app maken voor meer informatie.

Werkstroom voor e-mail met succesbericht

Maak in de Azure portal een werkstroom voor logische apps met de naam CopySuccessEmail. Voeg de aanvraagtrigger toe met de naam Wanneer een HTTP-aanvraag wordt ontvangen. Vul in de aanvraagtrigger het vak aanvraagbody JSON-schema in met de volgende JSON:

{
    "properties": {
        "dataFactoryName": {
            "type": "string"
        },
        "message": {
            "type": "string"
        },
        "pipelineName": {
            "type": "string"
        },
        "receiver": {
            "type": "string"
        }
    },
    "type": "object"
}

Uw werkstroom ziet er ongeveer uit als het volgende voorbeeld:

Succes e-mail werkstroom

Deze JSON-inhoud correspondeert met de EmailRequest-klasse die u in het vorige gedeelte hebt gemaakt.

Voeg de actie Office 365 Outlook met de naam Een e-mail verzenden toe. Pas voor deze actie aan hoe u het e-mailbericht wilt opmaken met behulp van de eigenschappen die zijn doorgegeven in het JSON-schema van de aanvraagbody. Hier volgt een voorbeeld:

Werkstroomontwerper met de actie Een e-mailbericht verzenden.

Nadat u de werkstroom hebt opgeslagen, moet u de waarde HTTP POST URL van de trigger kopiëren en opslaan.

Mislukken e-mailworkflow

Kloon de workflow van de logische app naar een nieuwe workflow met de naam CopySuccessEmail. In de aanvraagtrigger is het JSON-schema voor de aanvraagbody hetzelfde. Wijzig de indeling van uw e-mailbericht met behulp van Subject om er een faal-e-mail van te maken. Dit is een voorbeeld:

Werkstroomontwerper en de workflow voor mislukte e-mails.

Nadat u de werkstroom hebt opgeslagen, moet u de waarde HTTP POST URL van de trigger kopiëren en opslaan.

U zou nu twee werkstroom-URL’s moeten hebben, zoals in de volgende voorbeelden:

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

Een pipeline maken

Ga terug naar uw project in Visual Studio. We voegen nu de code toe die een pijplijn creëert met een kopieeractiviteit en de eigenschap DependsOn. In deze zelfstudie bevat deze pijplijn maar één activiteit: een kopieeractiviteit waarbij de Blob-gegevensset als bron wordt gebruikt en een andere Blob-gegevensset als sink. Als de kopieeractiviteit slaagt of mislukt, worden verschillende e-mailtaken aangeroepen.

In deze pijpelijn gebruikt u de volgende functies:

  • Parameters
  • Webactiviteit
  • Afhankelijkheid van activiteiten
  • De uitvoer van een activiteit gebruiken als invoer voor een andere activiteit
  1. Voeg deze methode toe aan uw project. De volgende gedeelten bevatten meer details.

    static PipelineResource PipelineDefinition(DataFactoryManagementClient client)
            {
                Console.WriteLine("Creating pipeline " + pipelineName + "...");
                PipelineResource resource = new PipelineResource
                {
                    Parameters = new Dictionary<string, ParameterSpecification>
                    {
                        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    
                    },
                    Activities = new List<Activity>
                    {
                        new CopyActivity
                        {
                            Name = copyBlobActivity,
                            Inputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSourceDatasetName
                                }
                            },
                            Outputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSinkDatasetName
                                }
                            },
                            Source = new BlobSource { },
                            Sink = new BlobSink { }
                        },
                        new WebActivity
                        {
                            Name = sendSuccessEmailActivity,
                            Method = WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/00000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Succeeded" }
                                }
                            }
                        },
                        new WebActivity
                        {
                            Name = sendFailEmailActivity,
                            Method =WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').error.message}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Failed" }
                                }
                            }
                        }
                    }
                };
                Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
                return resource;
            }
    
  2. Voeg de volgende regel toe aan de methode Main waarmee de pijplijn wordt gemaakt:

    client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
    

Parameters

Het eerste gedeelte van onze pijplijncode definieert parameters.

  • sourceBlobContainer. De bron-blob-gegevensset gebruikt deze parameter in de pijplijn.
  • sinkBlobContainer. De sink-blob-gegevensset gebruikt deze parameter in de pijplijn.
  • receiver. De twee webactiviteiten in de pijplijn die e-mailberichten over succes of mislukking naar de ontvanger sturen, gebruiken deze parameter.
Parameters = new Dictionary<string, ParameterSpecification>
    {
        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    },

Webactiviteit

De webactiviteit staat een aanroep toe naar elk REST-eindpunt. Zie Webactiviteit in Azure Data Factory voor meer informatie over de activiteit. Deze pijplijn gebruikt een webactiviteit voor het aanroepen van de Logic Apps-e-mailwerkstroom. U maakt twee webactiviteiten: een die de werkstroom CopySuccessEmail aanroept, en één die CopyFailWorkFlow aanroept.

        new WebActivity
        {
            Name = sendCopyEmailActivity,
            Method = WebActivityMethod.POST,
            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/12345",
            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
            DependsOn = new List<ActivityDependency>
            {
                new ActivityDependency
                {
                    Activity = copyBlobActivity,
                    DependencyConditions = new List<String> { "Succeeded" }
                }
            }
        }

Plak de eindpunten van de Url uit uw Logic Apps-werkstroom in de eigenschap . Geef in de eigenschap Body een exemplaar van de klasse EmailRequest door. De e-mailaanvraag bevat de volgende eigenschappen:

  • Bericht. Hiermee wordt de waarde van @{activity('CopyBlobtoBlob').output.dataWritten doorgegeven. Leest een eigenschap van de vorige kopieeractiviteit en geeft de waarde van dataWritten door. In geval van een fout, geef de foutuitvoer door in plaats van @{activity('CopyBlobtoBlob').error.message.
  • Naam van de datafactory. Geeft de waarde van @{pipeline().DataFactory} door. Met deze systeemvariabele hebt u toegang tot de bijbehorende data factory-naam. Voor een lijst van systeemvariabelen raadpleegt u Systeemvariabelen.
  • Naam pijplijn. Hiermee wordt de waarde van @{pipeline().Pipeline} doorgegeven. Met deze systeemvariabele hebt u toegang tot de bijbehorende pijplijnnaam.
  • Ontvanger. Hiermee wordt de waarde van "@pipeline().parameters.receiver" doorgegeven. Biedt toegang tot de pijplijnparameters.

Deze code maakt een nieuwe activiteitafhankelijkheid, die afhankelijk is van de vorige kopieeractiviteit.

Een pijplijnrun maken

Voeg de volgende code toe aan de methode Mainwaarmee een pijplijnuitvoering wordt geactiveerd.

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Main-klasse

Uw laatste Main-methode moet er zo uitzien.

// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };

Factory df = CreateOrUpdateDataFactory(client);

client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));

client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));

Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Bouw het programma en voer het uit om een pijplijnuitvoering te activeren.

Een pijplijnuitvoering controleren

  1. Voeg de volgende code toe aan de methode Main:

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    

    Met deze code wordt doorlopend de status van de uitvoering gecontroleerd totdat deze klaar is met het kopiëren van de gegevens.

  2. Voeg de volgende code toe aan de methode Main om details van de kopieeractiviteit weer te geven, zoals de omvang van de gelezen of weggeschreven gegevens:

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    List<ActivityRun> activityRuns = client.ActivityRuns.ListByPipelineRun(
    resourceGroup, dataFactoryName, runResponse.RunId, DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)).ToList();
    
    if (pipelineRun.Status == "Succeeded")
    {
        Console.WriteLine(activityRuns.First().Output);
        //SaveToJson(SafeJsonConvert.SerializeObject(activityRuns.First().Output, client.SerializationSettings), "ActivityRunResult.json", folderForJsons);
    }
    else
        Console.WriteLine(activityRuns.First().Error);
    
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

De code uitvoeren

Bouw en start de toepassing en controleer vervolgens de uitvoering van de pijplijn.

In de toepassing wordt de voortgang weergegeven van het maken van een data factory, een gekoppelde service, data sets, pijplijn en pijplijnuitvoering. Vervolgens wordt de status van de uitvoering van de pijplijn gecontroleerd. Wacht totdat u details ziet van de uitvoering van de kopieeractiviteit, waaronder de omvang van de gelezen/weggeschreven gegevens. Gebruik vervolgens hulpprogramma's zoals Azure Storage Explorer om te controleren of de blob is gekopieerd naar outputBlobPath uit inputBlobPath zoals u hebt opgegeven in variabelen.

Uw uitvoer moet lijken op het volgende voorbeeld:

Creating data factory DFTutorialTest...
{
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "type": "AzureStorage",
  "typeProperties": {
    "connectionString": "DefaultEndpointsProtocol=https;AccountName=***;AccountKey=***"
  }
}
Creating dataset SourceStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sourceBlobContainer"
    },
    "fileName": "input.txt"
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating dataset SinkStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sinkBlobContainer"
    }
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating pipeline Adfv2TutorialBranchCopy...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SourceStorageDataset"
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SinkStorageDataset"
          }
        ],
        "name": "CopyBlobtoBlob"
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').output.dataWritten}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendSuccessEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Succeeded"
            ]
          }
        ]
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').error.message}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendFailEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Failed"
            ]
          }
        ]
      }
    ],
    "parameters": {
      "sourceBlobContainer": {
        "type": "String"
      },
      "sinkBlobContainer": {
        "type": "String"
      },
      "receiver": {
        "type": "String"
      }
    }
  }
}
Creating pipeline run...
Pipeline run ID: 00000000-0000-0000-0000-0000000000000
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 20,
  "dataWritten": 20,
  "copyDuration": 4,
  "throughput": 0.01,
  "errors": [],
  "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
}
{}

Press any key to exit...

U hebt de volgende taken uitgevoerd in deze zelfstudie:

  • Een data factory maken
  • Een gekoppelde Azure Storage-service maken
  • Een Azure Blob-gegevensset maken
  • Een pijplijn met een kopieeractiviteit en een webactiviteit maken
  • Stuur uitvoer van activiteiten naar volgende activiteiten
  • Parameters doorgeven en systeemvariabelen gebruiken
  • Een pijplijnuitvoering starten
  • De uitvoering van de pijplijn en van de activiteit controleren

U kunt nu doorgaan naar de sectie Concepten voor meer informatie over Azure Data Factory.