快速入門:使用 .NET SDK 建立資料處理站和管線

適用於:Azure Data Factory Azure Synapse Analytics

本快速入門說明如何使用 .NET SDK 來建立 Azure Data Factory。 在此資料處理站中建立的管線會將資料從 Azure Blob 儲存體中的一個資料夾複製到其他資料夾。 如需如何使用 Azure Data Factory 轉換資料的教學課程,請參閱教學課程︰使用 Spark 轉換資料

注意

本文不提供 Data Factory 服務的詳細簡介。 如需 Azure Data Factory 服務簡介,請參閱 Azure Data Factory 簡介

必要條件

Azure 訂用帳戶

如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶

Azure 角色

若要建立 Data Factory 執行個體,您用來登入 Azure 的使用者帳戶必須為「參與者」 或「擁有者」 角色,或是 Azure 訂用帳戶的「管理員」 。 若要檢視您在訂用帳戶中擁有的權限,請移至 Azure 入口網站,選取右上角的使用者名稱、選取更多選項的 " ... " 圖示,然後選取 [我的權限] 。 如果您有多個訂用帳戶的存取權,請選取適當的訂用帳戶。

若要建立及管理 Data factory 的子資源 (包括資料集、連結服務、管線、觸發程序和整合執行階段),必須要符合下列需求:

  • 若要在 Azure 入口網站中建立及管理子資源,您必須屬於資源群組層級或更高層級的 Data Factory 參與者角色。
  • 若要使用 PowerShell 或 SDK 來建立及管理子資源,具備資源層級或更高層級的參與者角色即已足夠。

如需將使用者新增至角色的範例指示,請參閱新增角色一文。

如需詳細資訊,請參閱下列文章:

Azure 儲存體帳戶

您可以使用一般用途的 Azure 儲存體帳戶 (特別是 Blob 儲存體) 作為本教學課程中的「來源」 和「目的地」 資料存放區。 如果您沒有一般用途的 Azure 儲存體帳戶,請參閱建立儲存體帳戶以建立帳戶。

取得儲存體帳戶名稱

您在此快速入門中需要使用 Azure 儲存體帳戶的名稱。 下列程序提供可取得儲存體帳戶名稱的步驟:

  1. 在網頁瀏覽器中,移至 Azure 入口網站,然後使用您的 Azure 使用者名稱與密碼登入。
  2. 從 [Azure 入口網站] 功能表中,選取 [所有服務] ,然後選取 [儲存體] >[儲存體帳戶] 。 您也可以從任何頁面搜尋並選取 [儲存體帳戶] 。
  3. 在 [儲存體帳戶] 頁面中,篩選您的儲存體帳戶 (如有需要),然後選取您的儲存體帳戶。

您也可以從任何頁面搜尋並選取 [儲存體帳戶] 。

建立 Blob 容器

在這一節中,您會在 Azure Blob 儲存體中建立一個名為 adftutorial 的 Blob 容器。

  1. 從 [儲存體帳戶] 頁面中,選取 [概觀] >[容器] 。

  2. 在 <[帳戶名稱]> - [容器] 頁面的工具列上,選取 [容器]。

  3. 在 [新增容器] 對話方塊中,輸入 adftutorial 作為名稱,然後選取 [確定] 。 <[帳戶名稱]> - [容器] 頁面已更新,以在容器清單中納入 [adftutorial]。

    容器清單

新增 Blob 容器的輸入資料夾與檔案

在此節中,您會在您建立的容器中建立名為 input 的資料夾,然後將範例檔案上傳到 input 資料夾。 在您開始之前,請開啟文字編輯器 (例如 [記事本] ) 並使用下列內容建立名為 emp.txt 的檔案:

John, Doe
Jane, Doe

將該檔案儲存在 C:\ADFv2QuickStartPSH 資料夾中。 (如果該資料夾不存在,請予以建立。)然後返回 Azure 入口網站並依照下列步驟執行:

  1. 在您先前退出的 <[帳戶名稱]> - [容器] 頁面中,從更新的容器清單中選取 [adftutorial]。

    1. 如果您關閉視窗,或移至另一個頁面,請再次登入 Azure 入口網站
    2. 從 [Azure 入口網站] 功能表中,選取 [所有服務] ,然後選取 [儲存體] >[儲存體帳戶] 。 您也可以從任何頁面搜尋並選取 [儲存體帳戶] 。
    3. 選取您的儲存體帳戶,然後選取 [容器] >[adftutorial] 。
  2. 在 [adftutorial] 容器頁面的工具列上,選取 [上傳] 。

  3. 在 [上傳 Blob] 頁面上,選取 [檔案] 方塊,然後瀏覽並選取 emp.txt 檔。

  4. 展開 [進階] 標題。 現在會顯示該頁面,如下所示:

    選取進階連結

  5. 在 [上傳至資料夾] 方塊中,輸入 input

  6. 選取 [上傳] 按鈕。 您應該會在清單中看到 emp.txt 檔案以及上傳的狀態。

  7. 選取 [關閉] 圖示 (X) 以關閉 [上傳 Blob] 頁面。

保持 [adftutorial] 容器頁面開啟。 您可以在本快速入門結尾處使用它來確認輸出。

Visual Studio

本文中的逐步解說是使用 Visual Studio 2019。 Visual Studio 2013、2015 或 2017 的程序會稍有不同。

在 Azure Active Directory 中建立應用程式

操作說明:使用入口網站來建立可存取資源的 Azure AD 應用程式和服務主體中的小節,遵循其中的指示以執行下列工作:

  1. 建立 Azure Active Directory 應用程式中,建立代表您要在本教學課程中建立之 .NET 應用程式的應用程式。 針對登入 URL,您可以提供虛擬 URL,如文章中所示 (https://contoso.org/exampleapp)。
  2. 取得值以便登入中,取得應用程式識別碼租用戶識別碼,並記下這些值以稍後在本教學課程中使用。
  3. 憑證和祕密中,取得驗證金鑰,並記下此值以稍後在本教學課程中使用。
  4. 指派角色給應用程式中,將應用程式指派給訂用帳戶層級的參與者角色,讓應用程式可以在訂用帳戶中建立資料處理站。

建立 Visual Studio 專案

接下來,在 Visual Studio 中建立 C# .NET 主控台應用程式:

  1. 啟動 Visual Studio
  2. 在 [開始] 視窗中,選取 [建立新專案] >[主控台應用程式 (.NET Framework)] 。 需要 .NET 4.5.2 版或更新版本。
  3. 在 [專案名稱] 中,輸入 ADFv2QuickStart
  4. 選取 [Create] \(建立\) 以建立專案。

安裝 NuGet 套件

  1. 選取 [工具] >[NuGet 套件管理員] >[套件管理員主控台] 。

  2. 在 [套件管理員主控台] 窗格中,執行下列命令以安裝套件。 如需詳細資訊,請參閱 Microsoft.Azure.Management.DataFactory nuget 套件 \(英文\)。

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease
    Install-Package Microsoft.Identity.Client
    

建立資料處理站用戶端

  1. 開啟 Program.cs,併入下列陳述式以將參考新增至命名空間。

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Rest.Serialization;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.Identity.Client;
    
  2. 將下列程式碼新增至 Main 方法,以設定變數。 將預留位置取代為您自己的值。 如需目前可使用 Data Factory 的 Azure 區域清單,請在下列頁面上選取您感興趣的區域,然後展開 [分析] 以找出 [Data Factory] :依區域提供的產品。 資料處理站所使用的資料存放區 (Azure 儲存體、Azure SQL Database 等) 和計算 (HDInsight 等) 可位於其他區域。

    // Set variables
    string tenantID = "<your tenant ID>";
    string applicationId = "<your application ID>";
    string authenticationKey = "<your authentication key for the application>";
    string subscriptionId = "<your subscription ID where the data factory resides>";
    string resourceGroup = "<your resource group where the data factory resides>";
    string region = "<the location of your resource group>";
    string dataFactoryName = 
        "<specify the name of data factory to create. It must be globally unique.>";
    string storageAccount = "<your storage account name to copy data>";
    string storageKey = "<your storage account key>";
    // specify the container and input folder from which all files 
    // need to be copied to the output folder. 
    string inputBlobPath =
        "<path to existing blob(s) to copy data from, e.g. containername/inputdir>";
    //specify the contains and output folder where the files are copied
    string outputBlobPath =
        "<the blob path to copy data to, e.g. containername/outputdir>";
    
    // name of the Azure Storage linked service, blob dataset, and the pipeline
    string storageLinkedServiceName = "AzureStorageLinkedService";
    string blobDatasetName = "BlobDataset";
    string pipelineName = "Adfv2QuickStartPipeline";
    

注意

針對主權雲端,您必須針對 ActiveDirectoryAuthority 和 ResourceManagerUrl (BaseUri) 使用適當的雲端專用端點。 例如,在 US Azure Gov 中,您會使用 https://login.microsoftonline.us 的授權單位,而不是 https://login.microsoftonline.com ,並使用 https://management.usgovcloudapi.net,而不是 https://management.azure.com/ ,然後才建立資料處理站管理用戶端。 您可以透過執行「Get-AzEnvironment | Format-List」,使用 PowerShell 輕鬆地取得各種雲端的端點,其會傳回每個雲端環境的端點清單。

  1. 將下列程式碼新增至 Main 方法,以建立 DataFactoryManagementClient 類別的執行個體。 您會使用此物件來建立資料處理站、連結服務、資料集和管線。 您也可以使用此物件來監視管線執行的詳細資料。

    // Authenticate and create a data factory management client
    IConfidentialClientApplication app = ConfidentialClientApplicationBuilder.Create(applicationId)
     .WithAuthority("https://login.microsoftonline.com/" + tenantID)
     .WithClientSecret(authenticationKey)
     .WithLegacyCacheCompatibility(false)
     .WithCacheOptions(CacheOptions.EnableSharedCacheOptions)
     .Build();
    
    AuthenticationResult result = await app.AcquireTokenForClient(
      new string[]{ "https://management.azure.com//.default"})
       .ExecuteAsync();
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) {
        SubscriptionId = subscriptionId };
    

建立 Data Factory

將下列程式碼新增至 Main 方法,以建立資料處理站

// Create a data factory
Console.WriteLine("Creating data factory " + dataFactoryName + "...");
Factory dataFactory = new Factory
{
    Location = region,
    Identity = new FactoryIdentity()
};
client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings));

while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState ==
       "PendingCreation")
{
    System.Threading.Thread.Sleep(1000);
}

建立連結的服務

將下列程式碼新增至 Main 方法,以建立 Azure 儲存體連結服務

您在資料處理站中建立的連結服務會將您的資料存放區和計算服務連結到資料處理站。 在本快速入門中,您只需要為複製來源與接收存放區建立一個 Azure 儲存體連結服務;其在此範例中名為 "AzureStorageLinkedService"。

// Create an Azure Storage linked service
Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");

LinkedServiceResource storageLinkedService = new LinkedServiceResource(
    new AzureStorageLinkedService
    {
        ConnectionString = new SecureString(
            "DefaultEndpointsProtocol=https;AccountName=" + storageAccount +
            ";AccountKey=" + storageKey)
    }
);
client.LinkedServices.CreateOrUpdate(
    resourceGroup, dataFactoryName, storageLinkedServiceName, storageLinkedService);
Console.WriteLine(SafeJsonConvert.SerializeObject(
    storageLinkedService, client.SerializationSettings));

建立資料集

將下列程式碼新增至 Main 方法,以建立 Azure Blob 資料集

您可以定義資料集來代表要從來源複製到接收的資料。 在此範例中,此 Blob 資料集會參考您在上一個步驟中建立的 Azure 儲存體連結服務。 資料集會使用一個參數,其值在取用資料集的活動中設定。 該參數是用來建構 "folderPath",以指向資料位於/儲存所在的位置。

// Create an Azure Blob dataset
Console.WriteLine("Creating dataset " + blobDatasetName + "...");
DatasetResource blobDataset = new DatasetResource(
    new AzureBlobDataset
    {
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = storageLinkedServiceName
        },
        FolderPath = new Expression { Value = "@{dataset().path}" },
        Parameters = new Dictionary<string, ParameterSpecification>
        {
            { "path", new ParameterSpecification { Type = ParameterType.String } }
        }
    }
);
client.Datasets.CreateOrUpdate(
    resourceGroup, dataFactoryName, blobDatasetName, blobDataset);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));

建立管線

將下列程式碼新增至 Main 方法,以建立具有複製活動的管線

在此範例中,此管線包含一個活動,並使用兩個參數:輸入 Blob 路徑和輸出 Blob 路徑。 這些參數的值是在觸發/執行管線時設定。 複製活動指的是在前一個步驟中建立作為輸入和輸出的同一 Blob 資料集。 將該資料集用作輸入資料集時,即會指定輸入路徑。 並且,將該資料集用作輸出資料集時,即會指定輸出路徑。

// Create a pipeline with a copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
PipelineResource pipeline = new PipelineResource
{
    Parameters = new Dictionary<string, ParameterSpecification>
    {
        { "inputPath", new ParameterSpecification { Type = ParameterType.String } },
        { "outputPath", new ParameterSpecification { Type = ParameterType.String } }
    },
    Activities = new List<Activity>
    {
        new CopyActivity
        {
            Name = "CopyFromBlobToBlob",
            Inputs = new List<DatasetReference>
            {
                new DatasetReference()
                {
                    ReferenceName = blobDatasetName,
                    Parameters = new Dictionary<string, object>
                    {
                        { "path", "@pipeline().parameters.inputPath" }
                    }
                }
            },
            Outputs = new List<DatasetReference>
            {
                new DatasetReference
                {
                    ReferenceName = blobDatasetName,
                    Parameters = new Dictionary<string, object>
                    {
                        { "path", "@pipeline().parameters.outputPath" }
                    }
                }
            },
            Source = new BlobSource { },
            Sink = new BlobSink { }
        }
    }
};
client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);
Console.WriteLine(SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings));

建立管線執行

將下列程式碼新增至 Main 方法,以觸發管線執行

此程式碼也會以來源和接收 Blob 路徑的實際值,設定在管線中指定的 inputPathoutputPath 參數的值。

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> parameters = new Dictionary<string, object>
{
    { "inputPath", inputBlobPath },
    { "outputPath", outputBlobPath }
};
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
    resourceGroup, dataFactoryName, pipelineName, parameters: parameters
).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

監視管線執行

  1. 將下列程式碼新增至 Main 方法,以持續檢查狀態,直到完成複製資料為止。

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(
            resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress" || pipelineRun.Status == "Queued")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    
  2. 將下列程式碼新增至 Main 方法,以取出複製活動執行的詳細資料,例如被讀取或寫入的資料大小。

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    RunFilterParameters filterParams = new RunFilterParameters(
        DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10));
    ActivityRunsQueryResponse queryResponse = client.ActivityRuns.QueryByPipelineRun(
        resourceGroup, dataFactoryName, runResponse.RunId, filterParams);
    if (pipelineRun.Status == "Succeeded")
        Console.WriteLine(queryResponse.Value.First().Output);
    else
        Console.WriteLine(queryResponse.Value.First().Error);
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

執行程式碼

建置並啟動應用程式,然後確認管線執行。

主控台會印出建立資料處理站、連結服務、資料集、管線和管線執行的進度。 然後會檢查管線執行狀態。 等待出現複製活動執行詳細資料,以及讀取/寫入資料的大小。 然後,使用 Azure 儲存體總管之類的工具,檢查 Blob 已從 "inputBlobPath" 被複製到 "outputBlobPath" (如您在變數中所指定)。

範例輸出

Creating data factory SPv2Factory0907...
{
  "identity": {
    "type": "SystemAssigned"
  },
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "properties": {
    "type": "AzureStorage",
    "typeProperties": {
      "connectionString": {
        "value": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>",
        "type": "SecureString"
      }
    }
  }
}
Creating dataset BlobDataset...
{
  "properties": {
    "type": "AzureBlob",
    "typeProperties": {
      "folderPath": {
        "value": "@{dataset().path}",
        "type": "Expression"
      }
    },
    "linkedServiceName": {
      "referenceName": "AzureStorageLinkedService",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "path": {
        "type": "String"
      }
    }
  }
}
Creating pipeline Adfv2QuickStartPipeline...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "inputs": [
          {
            "referenceName": "BlobDataset",
            "parameters": {
              "path": "@pipeline().parameters.inputPath"
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "BlobDataset",
            "parameters": {
              "path": "@pipeline().parameters.outputPath"
            },
            "type": "DatasetReference"
          }
        ],
        "name": "CopyFromBlobToBlob"
      }
    ],
    "parameters": {
      "inputPath": {
        "type": "String"
      },
      "outputPath": {
        "type": "String"
      }
    }
  }
}
Creating pipeline run...
Pipeline run ID: 308d222d-3858-48b1-9e66-acd921feaa09
Checking pipeline run status...
Status: InProgress
Status: InProgress
Checking copy activity run details...
{
    "dataRead": 331452208,
    "dataWritten": 331452208,
    "copyDuration": 23,
    "throughput": 14073.209,
    "errors": [],
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (West US)",
    "usedDataIntegrationUnits": 2,
    "billedDuration": 23
}

Press any key to exit...

驗證輸出

管道會自動在 adftutorial Blob 容器中建立輸出資料夾。 然後,它會將 emp.txt 檔案從輸入資料夾複製到輸出資料夾。

  1. 在 Azure 入口網站中,於您在上述為 Blob 容器新增輸入資料夾和檔案一節中停止時所位於的 adftutorial 容器頁面上,選取 [重新整理] 以查看輸出資料夾。
  2. 在資料夾清單中,選取 [輸出] 。
  3. 確認 emp.txt 已複製到輸出資料夾。

清除資源

若要以程式設計方式刪除資料處理站,請將下列程式碼新增至程式:

Console.WriteLine("Deleting the data factory");
client.Factories.Delete(resourceGroup, dataFactoryName);

後續步驟

在此範例中的管線會將資料從 Azure Blob 儲存體中的一個位置複製到其他位置。 瀏覽教學課程以了解使用 Data Factory 的更多案例。