Rychlé zprovoznění: Vytvoření datové továrny a kanálu pomocí sady .NET SDK

PLATÍ PRO: Azure Data Factory Azure Synapse Analytics

Tento rychlý start popisuje, jak pomocí sady .NET SDK vytvořit Azure Data Factory. Kanál, který vytvoříte v této datové továrně , kopíruje data z jedné složky do jiné ve službě Azure Blob Storage. Kurz transformace dat pomocí Azure Data Factory najdete v kurzu Transformace dat pomocí Sparku.

Poznámka

Tento článek neposkytuje podrobný úvod do služby Data Factory. Úvod do služby Azure Data Factory najdete v tématu Úvod do Azure Data Factory.

Požadavky

Předplatné Azure

Pokud ještě nemáte předplatné Azure, vytvořte si napřed bezplatný účet.

Role Azure

Pro vytvoření instancí služby Data Factory musí být uživatelský účet, který použijete pro přihlášení k Azure, členem role přispěvatel nebo vlastník nebo správcem předplatného Azure. Pokud chcete zobrazit oprávnění, která máte v předplatném, přejděte na Azure Portal, v pravém horním rohu vyberte své uživatelské jméno, vyberte ikonu ..., abyste zobrazili další možnosti, a pak vyberte Moje oprávnění. Pokud máte přístup k několika předplatným, vyberte odpovídající předplatné.

Při vytváření a správě podřízených prostředků pro službu Data Factory, včetně datových sad, propojených služeb, kanálů, triggerů a prostředí Integration Runtime, platí následující požadavky:

  • Pokud chcete vytvářet a spravovat podřízené prostředky v Azure Portal, musíte patřit do role Přispěvatel data Factory na úrovni skupiny prostředků nebo vyšší.
  • Pro vytváření a správu podřízených prostředků pomocí PowerShellu nebo sady SDK na úrovni prostředku nebo vyšší je dostatečná role Přispěvatel.

Ukázku pokynů pro přidání uživatele do role najdete v článku věnovaném přidávání rolí.

Další informace najdete v následujících článcích:

Účet služby Azure Storage

V tomto rychlém startu použijete účet Azure Storage pro obecné účely (konkrétně úložiště objektů blob) jako zdrojová i cílová úložiště dat. Pokud účet Azure Storage pro obecné účely nemáte, přečtěte si téma Vytvoření účtu úložiště a jeho vytvoření.

Získání názvu účtu úložiště

Pro účely tohoto rychlého startu potřebujete název účtu služby Azure Storage. Následující postup obsahuje postup, jak získat název účtu úložiště:

  1. Ve webovém prohlížeči přejděte na Azure Portal a přihlaste se pomocí svého uživatelského jména a hesla Azure.
  2. V nabídce Azure Portal vyberte Všechny služby a pakúčty úložiště>. Účty úložiště můžete také vyhledat a vybrat na libovolné stránce.
  3. Na stránce Účty úložiště vyfiltrujte svůj účet úložiště (v případě potřeby) a pak vyberte svůj účet úložiště.

Účty úložiště můžete také vyhledat a vybrat na libovolné stránce.

Vytvoření kontejneru objektů blob

V této části vytvoříte v úložišti objektů blob v Azure kontejner objektů blob s názvem adftutorial.

  1. Na stránce účtu úložiště vyberte Kontejnery přehledu>.

  2. Na panelu <nástrojů stránky Název účtu> - Kontejnery vyberte Kontejner.

  3. V dialogovém okně Nový kontejner jako název zadejte adftutorial a pak vyberte OK. Stránka <Název účtu> - Kontejnery se aktualizuje tak, aby v seznamu kontejnerů obsahovala adftutorial .

    Seznam kontejnerů

Přidání vstupní složky a souboru pro kontejner objektů blob

V této části vytvoříte v kontejneru, který jste vytvořili, složku s názvem input a pak do vstupní složky nahrajete ukázkový soubor. Než začnete, otevřete textový editor, například Poznámkový blok, a vytvořte soubor s názvem emp.txt s následujícím obsahem:

John, Doe
Jane, Doe

Uložte soubor do složky C:\ADFv2QuickStartPSH . (Pokud složka ještě neexistuje, vytvořte ji.) Pak se vraťte do Azure Portal a postupujte takto:

  1. Na stránceNázevúčtu>Kontejnery, kde jste skončili, vyberte v aktualizovaném seznamu kontejnerů adftutorial<. -

    1. Pokud jste okno zavřeli nebo přešli na jinou stránku, přihlaste se k Azure Portal znovu.
    2. V nabídce Azure Portal vyberte Všechny služby a pakúčty úložiště>. Účty úložiště můžete také vyhledat a vybrat na libovolné stránce.
    3. Vyberte svůj účet úložiště a pak vyberte Kontejnery>adftutorial.
  2. Na panelu nástrojů stránky kontejneru adftutorial vyberte Nahrát.

  3. Na stránce Nahrát objekt blob vyberte pole Soubory a pak vyhledejte a vyberte souboremp.txt .

  4. Rozbalte nadpis Upřesnit . Stránka se teď zobrazí takto:

    Výběr odkazu Upřesnit

  5. Do pole Nahrát do složky zadejte vstup.

  6. Vyberte tlačítko Nahrát. Měli byste vidět soubor emp.txt a stav nahrávání v seznamu.

  7. Výběrem ikony Zavřít ( X) zavřete stránku Nahrát objekt blob .

Nechte stránku kontejneru adftutorial otevřenou . Použijete ji k ověření výstupu na konci tohoto rychlého startu.

Visual Studio

Návod v tomto článku používá Visual Studio 2019. Postupy pro Visual Studio 2013, 2015 nebo 2017 se mírně liší.

Vytvoření aplikace v Azure Active Directory

V částech Postupy: Použití portálu k vytvoření aplikace Azure AD a instančního objektu, které mají přístup k prostředkům, postupujte podle pokynů k těmto úlohám:

  1. V části Vytvoření aplikace Azure Active Directory vytvořte aplikaci, která představuje aplikaci .NET, kterou vytváříte v tomto kurzu. Jako přihlašovací adresu URL můžete poskytnout fiktivní URL, jak ukazuje článek (https://contoso.org/exampleapp).
  2. V části Získat hodnoty pro přihlášení získejte ID aplikace a ID tenanta a poznamenejte si tyto hodnoty, které použijete později v tomto kurzu.
  3. V části Certifikáty a tajné kódy získejte ověřovací klíč a poznamenejte si tuto hodnotu, kterou použijete později v tomto kurzu.
  4. V části Přiřazení aplikace k roli přiřaďte aplikaci k roli Přispěvatel na úrovni předplatného, aby aplikace v předplatném vytvořila datové továrny.

Vytvoření projektu ve Visual Studiu

Dále vytvořte konzolovou aplikaci C# .NET v sadě Visual Studio:

  1. Spusťte Visual Studio.
  2. V okně Start vyberte Vytvořit novou konzolovou aplikaci projektu>(.NET Framework). Vyžaduje se .NET verze 4.5.2 nebo novější.
  3. Do pole Název projektu zadejte ADFv2QuickStart.
  4. Vyberte Vytvořit a vytvořte projekt.

Instalace balíčků NuGet

  1. Vyberte Nástroje>Správce balíčků NuGet>Konzola správce balíčků.

  2. V podokně Konzola Správce balíčků spusťte následující příkazy pro instalaci balíčků. Další informace najdete v balíčku NuGet Microsoft.Azure.Management.DataFactory.

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease
    Install-Package Microsoft.Identity.Client
    

Vytvoření klienta datové továrny

  1. Otevřete soubor Program.cs a vložte do něj následující příkazy. Přidáte tak odkazy na obory názvů.

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Rest.Serialization;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.Identity.Client;
    
  2. Do metody Main přidejte následující kód, který nastaví proměnné. Zástupné symboly nahraďte vlastními hodnotami. Pokud chcete zobrazit seznam oblastí Azure, ve kterých je služba Data Factory aktuálně dostupná, na následující stránce vyberte oblasti, které vás zajímají, pak rozbalte Analýza a vyhledejte Data Factory:Dostupné produkty v jednotlivých oblastech. Úložiště dat (Azure Storage, Azure SQL Database a další) a výpočetní prostředky (HDInsight a další) používané datovou továrnou můžou být v jiných oblastech.

    // Set variables
    string tenantID = "<your tenant ID>";
    string applicationId = "<your application ID>";
    string authenticationKey = "<your authentication key for the application>";
    string subscriptionId = "<your subscription ID where the data factory resides>";
    string resourceGroup = "<your resource group where the data factory resides>";
    string region = "<the location of your resource group>";
    string dataFactoryName = 
        "<specify the name of data factory to create. It must be globally unique.>";
    string storageAccount = "<your storage account name to copy data>";
    string storageKey = "<your storage account key>";
    // specify the container and input folder from which all files 
    // need to be copied to the output folder. 
    string inputBlobPath =
        "<path to existing blob(s) to copy data from, e.g. containername/inputdir>";
    //specify the contains and output folder where the files are copied
    string outputBlobPath =
        "<the blob path to copy data to, e.g. containername/outputdir>";
    
    // name of the Azure Storage linked service, blob dataset, and the pipeline
    string storageLinkedServiceName = "AzureStorageLinkedService";
    string blobDatasetName = "BlobDataset";
    string pipelineName = "Adfv2QuickStartPipeline";
    

Poznámka

Pro suverénní cloudy musíte použít příslušné koncové body specifické pro cloud pro ActiveDirectoryAuthority a ResourceManagerUrl (BaseUri). Například v us Azure Gov byste místo použili autoritu https://login.microsoftonline.ushttps://login.microsoftonline.comhttps://management.usgovcloudapi.net a místo https://management.azure.com/a pak vytvořili klienta pro správu datové továrny. Pomocí PowerShellu můžete snadno získat adresy URL koncových bodů pro různé cloudy spuštěním příkazu Get-AzEnvironment | Format-List" vrátí seznam koncových bodů pro každé cloudové prostředí.

  1. Do metody Main přidejte následující kód, který vytvoří instanci třídy DataFactoryManagementClient . Tento objekt použijete k vytvoření datové továrny, propojené služby, datových sad a kanálu. Použijete ho také k monitorování podrobných informací o spuštění kanálu.

    // Authenticate and create a data factory management client
    IConfidentialClientApplication app = ConfidentialClientApplicationBuilder.Create(applicationId)
     .WithAuthority("https://login.microsoftonline.com/" + tenantID)
     .WithClientSecret(authenticationKey)
     .WithLegacyCacheCompatibility(false)
     .WithCacheOptions(CacheOptions.EnableSharedCacheOptions)
     .Build();
    
    AuthenticationResult result = await app.AcquireTokenForClient(
      new string[]{ "https://management.azure.com//.default"})
       .ExecuteAsync();
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) {
        SubscriptionId = subscriptionId };
    

Vytvoření datové továrny

Do metody Main přidejte následující kód, který vytvoří datovou továrnu.

// Create a data factory
Console.WriteLine("Creating data factory " + dataFactoryName + "...");
Factory dataFactory = new Factory
{
    Location = region,
    Identity = new FactoryIdentity()
};
client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings));

while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState ==
       "PendingCreation")
{
    System.Threading.Thread.Sleep(1000);
}

Vytvoření propojené služby

Do metody Main přidejte následující kód, který vytvoří propojenou službu Azure Storage.

V datové továrně vytvoříte propojené služby, abyste svá úložiště dat a výpočetní služby spojili s datovou továrnou. V tomto rychlém startu stačí vytvořit pouze jednu propojenou službu Azure Storage pro zdroj kopírování i úložiště jímky. V ukázce má název AzureStorageLinkedService.

// Create an Azure Storage linked service
Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");

LinkedServiceResource storageLinkedService = new LinkedServiceResource(
    new AzureStorageLinkedService
    {
        ConnectionString = new SecureString(
            "DefaultEndpointsProtocol=https;AccountName=" + storageAccount +
            ";AccountKey=" + storageKey)
    }
);
client.LinkedServices.CreateOrUpdate(
    resourceGroup, dataFactoryName, storageLinkedServiceName, storageLinkedService);
Console.WriteLine(SafeJsonConvert.SerializeObject(
    storageLinkedService, client.SerializationSettings));

Vytvoření datové sady

Do metody Main přidejte následující kód, který vytvoří datovou sadu objektů blob Azure.

Definujete datovou sadu, která představuje data pro kopírování ze zdroje do jímky. Tato datová sada objektů blob v tomto příkladu odkazuje na propojenou službu Azure Storage, kterou jste vytvořili v předchozím kroku: Datová sada přebírá parametr, jehož hodnota je nastavená v aktivitě, která tuto datovou sadu využívá. Parametr se používá k vytvoření "folderPath" odkazující na místo, kde se data nacházejí nebo jsou uložena.

// Create an Azure Blob dataset
Console.WriteLine("Creating dataset " + blobDatasetName + "...");
DatasetResource blobDataset = new DatasetResource(
    new AzureBlobDataset
    {
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = storageLinkedServiceName
        },
        FolderPath = new Expression { Value = "@{dataset().path}" },
        Parameters = new Dictionary<string, ParameterSpecification>
        {
            { "path", new ParameterSpecification { Type = ParameterType.String } }
        }
    }
);
client.Datasets.CreateOrUpdate(
    resourceGroup, dataFactoryName, blobDatasetName, blobDataset);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));

Vytvoření kanálu

Do metody Main přidejte následující kód, který vytvoří kanál s aktivitou kopírování.

V tomto příkladu tento kanál obsahuje jednu aktivitu a přebírá dva parametry: vstupní cestu k objektu blob a výstupní cestu k objektu blob. Hodnoty pro tyto parametry se nastaví při aktivaci nebo spuštění kanálu. Aktivita kopírování odkazuje na stejnou datovou sadu objektů blob, kterou jste vytvořili v předchozím kroku jako vstup a výstup. Když se tato datová sada použije jako vstupní, zadá se vstupní cesta. A když se tato datová sada použije jako výstupní, zadá se výstupní cesta.

// Create a pipeline with a copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
PipelineResource pipeline = new PipelineResource
{
    Parameters = new Dictionary<string, ParameterSpecification>
    {
        { "inputPath", new ParameterSpecification { Type = ParameterType.String } },
        { "outputPath", new ParameterSpecification { Type = ParameterType.String } }
    },
    Activities = new List<Activity>
    {
        new CopyActivity
        {
            Name = "CopyFromBlobToBlob",
            Inputs = new List<DatasetReference>
            {
                new DatasetReference()
                {
                    ReferenceName = blobDatasetName,
                    Parameters = new Dictionary<string, object>
                    {
                        { "path", "@pipeline().parameters.inputPath" }
                    }
                }
            },
            Outputs = new List<DatasetReference>
            {
                new DatasetReference
                {
                    ReferenceName = blobDatasetName,
                    Parameters = new Dictionary<string, object>
                    {
                        { "path", "@pipeline().parameters.outputPath" }
                    }
                }
            },
            Source = new BlobSource { },
            Sink = new BlobSink { }
        }
    }
};
client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);
Console.WriteLine(SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings));

Vytvoření spuštění kanálu

Do metody Main přidejte následující kód, který aktivuje spuštění kanálu.

Tento kód také nastaví hodnoty parametrů inputPath a outputPath zadaných v kanálu se skutečnými hodnotami cest ke zdrojovému objektu blob a jímce.

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> parameters = new Dictionary<string, object>
{
    { "inputPath", inputBlobPath },
    { "outputPath", outputBlobPath }
};
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
    resourceGroup, dataFactoryName, pipelineName, parameters: parameters
).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Monitorování spuštění kanálu

  1. Do metody Main přidejte následující kód, který nepřetržitě kontroluje stav, dokud se kopírování dat nedokončí.

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(
            resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress" || pipelineRun.Status == "Queued")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    
  2. Do metody Main přidejte následující kód, který načte podrobnosti o spuštění aktivity kopírování, například velikost přečtených nebo zapsaných dat.

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    RunFilterParameters filterParams = new RunFilterParameters(
        DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10));
    ActivityRunsQueryResponse queryResponse = client.ActivityRuns.QueryByPipelineRun(
        resourceGroup, dataFactoryName, runResponse.RunId, filterParams);
    if (pipelineRun.Status == "Succeeded")
        Console.WriteLine(queryResponse.Value.First().Output);
    else
        Console.WriteLine(queryResponse.Value.First().Error);
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

Spuštění kódu

Sestavte a spusťte aplikaci a potom ověřte spuštění kanálu.

Konzola vytiskne průběh vytváření datové továrny, propojených služeb, datových sad, kanálu a spuštění kanálu. Potom zkontroluje stav spuštění kanálu. Počkejte, až se zobrazí podrobnosti o spuštění aktivity kopírování s velikostí dat pro čtení a zápis. Pak pomocí nástrojů, jako je Průzkumník služby Azure Storage, zkontrolujte, že se objekty blob zkopírují do "outputBlobPath" z "inputBlobPath", jak jste zadali v proměnných.

Ukázkový výstup

Creating data factory SPv2Factory0907...
{
  "identity": {
    "type": "SystemAssigned"
  },
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "properties": {
    "type": "AzureStorage",
    "typeProperties": {
      "connectionString": {
        "value": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>",
        "type": "SecureString"
      }
    }
  }
}
Creating dataset BlobDataset...
{
  "properties": {
    "type": "AzureBlob",
    "typeProperties": {
      "folderPath": {
        "value": "@{dataset().path}",
        "type": "Expression"
      }
    },
    "linkedServiceName": {
      "referenceName": "AzureStorageLinkedService",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "path": {
        "type": "String"
      }
    }
  }
}
Creating pipeline Adfv2QuickStartPipeline...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "inputs": [
          {
            "referenceName": "BlobDataset",
            "parameters": {
              "path": "@pipeline().parameters.inputPath"
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "BlobDataset",
            "parameters": {
              "path": "@pipeline().parameters.outputPath"
            },
            "type": "DatasetReference"
          }
        ],
        "name": "CopyFromBlobToBlob"
      }
    ],
    "parameters": {
      "inputPath": {
        "type": "String"
      },
      "outputPath": {
        "type": "String"
      }
    }
  }
}
Creating pipeline run...
Pipeline run ID: 308d222d-3858-48b1-9e66-acd921feaa09
Checking pipeline run status...
Status: InProgress
Status: InProgress
Checking copy activity run details...
{
    "dataRead": 331452208,
    "dataWritten": 331452208,
    "copyDuration": 23,
    "throughput": 14073.209,
    "errors": [],
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (West US)",
    "usedDataIntegrationUnits": 2,
    "billedDuration": 23
}

Press any key to exit...

Ověření výstupu

Kanál automaticky vytvoří výstupní složku v kontejneru objektů blob adftutorial . Potom zkopíruje souboremp.txt ze vstupní složky do výstupní složky.

  1. V Azure Portal na stránce kontejneru adftutorial, na které jste se zastavili v části Přidat vstupní složku a soubor pro kontejner objektů blob výše, vyberte Aktualizovat, aby se zobrazila výstupní složka.
  2. V seznamu složek vyberte výstup.
  3. Potvrďte, že je do výstupní složky zkopírovaný soubor emp.txt.

Vyčištění prostředků

Pokud chcete datovou továrnu odstranit prostřednictvím kódu programu, přidejte do programu následující řádky kódu:

Console.WriteLine("Deleting the data factory");
client.Factories.Delete(resourceGroup, dataFactoryName);

Další kroky

Kanál v této ukázce kopíruje data z jednoho umístění do jiného umístění v úložišti objektů blob Azure. Projděte si kurzy, kde se dozvíte o použití služby Data Factory ve více scénářích.