快速入門:使用 .NET SDK 建立資料處理站和管線

適用於:Azure Data Factory Azure Synapse Analytics

提示

試用 Microsoft Fabric 中的 Data Factory (部分機器翻譯),這是適用於企業的全方位分析解決方案。 Microsoft Fabric (部分機器翻譯) 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用 (部分機器翻譯)!

本快速入門說明如何使用 .NET SDK 來建立 Azure Data Factory。 在此資料處理站中建立的管線會將資料從 Azure Blob 儲存體中的一個資料夾複製到其他資料夾。 如需如何使用 Azure Data Factory 轉換資料的教學課程,請參閱教學課程︰使用 Spark 轉換資料

注意

本文不提供 Data Factory 服務的詳細簡介。 如需 Azure Data Factory 服務簡介,請參閱 Azure Data Factory 簡介

必要條件

Azure 訂用帳戶

如尚未擁有 Azure 訂用帳戶,請在開始之前先建立免費帳戶

Azure 角色

若要建立 Data Factory 執行個體,您用來登入 Azure 的使用者帳戶必須是 參與者擁有者 角色的成員,或是 Azure 訂用帳戶的 系統管理員。 若要檢視您在訂用帳戶中擁有的權限,請移至 Azure 入口網站,選取右上角的使用者名稱、選取更多選項的 "..." 圖示,然後選取 [我的權限]。 如果您有多個訂用帳戶的存取權,請選取適當的訂用帳戶。

若要建立及管理 Data factory 的子資源 (包括資料集、連結服務、管線、觸發程序和整合執行階段),必須要符合下列需求:

  • 若要在 Azure 入口網站中建立和管理子資源,您必須屬於資源群組層級或更新版本的 [Data Factory 參與者] 角色。
  • 若要使用 PowerShell 或 SDK 來建立及管理子資源,具備資源層級或更高層級的參與者角色即已足夠。

如需將使用者新增至角色的範例指示,請參閱新增角色一文。

如需詳細資訊,請參閱下列文章:

Azure 儲存體帳戶

您可以使用一般用途的 Azure 儲存體帳戶 (特別是 Blob 儲存體) 作為本教學課程中的「來源」和「目的地」資料存放區。 如果您沒有一般用途的 Azure 儲存體帳戶,請參閱建立儲存體帳戶以建立帳戶。

取得儲存體帳戶名稱

您在此快速入門中需要使用 Azure 儲存體帳戶的名稱。 下列程序提供可取得儲存體帳戶名稱的步驟:

  1. 在網頁瀏覽器中,移至 Azure 入口網站,然後使用您的 Azure 使用者名稱與密碼登入。
  2. 從 [Azure 入口網站] 功能表中,選取 [所有服務],然後選取 [儲存體]>[儲存體帳戶]。 您也可以從任何頁面搜尋並選取 [儲存體帳戶]
  3. 在 [儲存體帳戶] 頁面中,篩選您的儲存體帳戶 (如有需要),然後選取您的儲存體帳戶。

您也可以從任何頁面搜尋並選取 [儲存體帳戶]

建立 Blob 容器

在這一節中,您會在 Azure Blob 儲存體中建立一個名為 adftutorial 的 Blob 容器。

  1. 從 [儲存體帳戶] 頁面中,選取 [概觀]>[容器]

  2. 在[帳戶名稱] <> - [容器] 頁面的工具列上,選取 [容器]

  3. 在 [新增容器] 對話方塊中,輸入 adftutorial 作為名稱,然後選取 [確定]。 [帳戶名稱] <> [容器] - 頁面已更新,以在容器清單中納入 adftutorial

    List of containers

新增 Blob 容器的輸入資料夾與檔案

在此節中,您會在您建立的容器中建立名為 input 的資料夾,然後將範例檔案上傳到 input 資料夾。 在您開始之前,請開啟文字編輯器 (例如 [記事本]) 並使用下列內容建立名為 emp.txt 的檔案:

John, Doe
Jane, Doe

將該檔案儲存在 C:\ADFv2QuickStartPSH 資料夾中。 (如果該資料夾不存在,請予以建立。)然後返回 Azure 入口網站並依照下列步驟執行:

  1. 在您先前退出的 [帳戶名稱] <> - [容器] 頁面中,從更新的容器清單中選取 [adftutorial]

    1. 如果您關閉視窗,或移至另一個頁面,請再次登入 Azure 入口網站
    2. 從 [Azure 入口網站] 功能表中,選取 [所有服務],然後選取 [儲存體]>[儲存體帳戶]。 您也可以從任何頁面搜尋並選取 [儲存體帳戶]
    3. 選取您的儲存體帳戶,然後選取 [容器]>[adftutorial]
  2. 在 [adftutorial] 容器頁面的工具列上,選取 [上傳]

  3. 在 [上傳 Blob] 頁面上,選取 [檔案] 方塊,然後瀏覽並選取 emp.txt 檔。

  4. 展開 [進階] 標題。 現在會顯示該頁面,如下所示:

    Select Advanced link

  5. 在 [上傳至資料夾] 方塊中,輸入 input

  6. 選取上傳按鈕。 您應該會在清單中看到 emp.txt 檔案以及上傳的狀態。

  7. 選取 [關閉] 圖示 (X) 以關閉 [上傳 Blob] 頁面。

保持 [adftutorial] 容器頁面開啟。 您可以在本快速入門結尾處使用它來確認輸出。

Visual Studio

本文中的逐步解說是使用 Visual Studio 2019。 Visual Studio 2013、2015 或 2017 的程序會稍有不同。

在 Microsoft Entra ID 中建立應用程式

作法:使用入口網站來建立可存取資源的 Microsoft Entra 應用程式和服務主體 (部分機器翻譯) 中的章節,遵循其中的指示以執行下列工作:

  1. 建立 Microsoft Entra 應用程式 (部分機器翻譯) 中,建立代表您要在本教學課程中建立之 .NET 應用程式的應用程式。 針對登入 URL,您可以提供虛擬 URL,如文章中所示 (https://contoso.org/exampleapp)。
  2. 取得值以便登入中,取得應用程式識別碼租用戶識別碼,並記下這些值以稍後在本教學課程中使用。
  3. 憑證和祕密中,取得驗證金鑰,並記下此值以稍後在本教學課程中使用。
  4. 指派角色給應用程式中,將應用程式指派給訂用帳戶層級的參與者角色,讓應用程式可以在訂用帳戶中建立資料處理站。

建立 Visual Studio 專案

接下來,在 Visual Studio 中建立 C# .NET 主控台應用程式:

  1. 啟動 Visual Studio
  2. 在 [開始] 視窗中,選取 [建立新專案]>[主控台應用程式 (.NET Framework)]。 需要 .NET 4.5.2 版或更新版本。
  3. 在 [專案名稱] 中,輸入 ADFv2QuickStart
  4. 選取 [建立] 以建立專案。

安裝 NuGet 套件

  1. 選取 [工具]>[NuGet 套件管理員]>[套件管理員主控台]

  2. 在 [套件管理員主控台] 窗格中,執行下列命令以安裝套件。 如需詳細資訊,請參閱 Azure.ResourceManager.DataFactory NuGet 套件 (英文)。

    Install-Package Azure.ResourceManager.DataFactory -IncludePrerelease
    Install-Package Azure.Identity 
    

建立資料處理站

  1. 開啟 Program.cs,併入下列陳述式以將參考新增至命名空間。

    using Azure;
    using Azure.Core;
    using Azure.Core.Expressions.DataFactory;
    using Azure.Identity;
    using Azure.ResourceManager;
    using Azure.ResourceManager.DataFactory;
    using Azure.ResourceManager.DataFactory.Models;
    using Azure.ResourceManager.Resources;
    using System;
    using System.Collections.Generic;
    
  2. 將下列程式碼新增至 Main 方法,以設定變數。 使用您自己的值取代預留位置。 如需目前可使用 Data Factory 的 Azure 區域清單,請在下列頁面上選取您感興趣的區域,然後展開 [分析] 以找出 [Data Factory]依區域提供的產品。 資料處理站所使用的資料存放區 (Azure 儲存體、Azure SQL Database 等) 和計算 (HDInsight 等) 可位於其他區域。

    // Set variables
    string tenantID = "<your tenant ID>";
    string applicationId = "<your application ID>";
    string authenticationKey = "<your authentication key for the application>";
    string subscriptionId = "<your subscription ID where the data factory resides>";
    string resourceGroup = "<your resource group where the data factory resides>";
    string region = "<the location of your resource group>";
    string dataFactoryName = 
        "<specify the name of data factory to create. It must be globally unique.>";
    string storageAccountName = "<your storage account name to copy data>";
    string storageKey = "<your storage account key>";
    // specify the container and input folder from which all files 
    // need to be copied to the output folder. 
    string inputBlobContainer = "<blob container to copy data from, e.g. containername>";
    string inputBlobPath = "<path to existing blob(s) to copy data from, e.g. inputdir/file>";
    //specify the contains and output folder where the files are copied
    string outputBlobContainer = "<blob container to copy data from, e.g. containername>";
    string outputBlobPath = "<the blob path to copy data to, e.g. outputdir/file>";
    
    // name of the Azure Storage linked service, blob dataset, and the pipeline
    string storageLinkedServiceName = "AzureStorageLinkedService";
    string blobDatasetName = "BlobDataset";
    string pipelineName = "Adfv2QuickStartPipeline";
    
  3. 將下列程式碼新增至 Main 方法,以建立資料處理站

    ArmClient armClient = new ArmClient(
        new ClientSecretCredential(tenantID, applicationId, authenticationKey, new TokenCredentialOptions
        {
            AuthorityHost = AzureAuthorityHosts.AzurePublicCloud
        }), 
        subscriptionId, 
        new ArmClientOptions { Environment = ArmEnvironment.AzurePublicCloud }
    );
    
    ResourceIdentifier resourceIdentifier = SubscriptionResource.CreateResourceIdentifier(subscriptionId);
    SubscriptionResource subscriptionResource = armClient.GetSubscriptionResource(resourceIdentifier);
    
    Console.WriteLine("Get an existing resource group " + resourceGroupName + "...");
    var resourceGroupOperation = subscriptionResource.GetResourceGroups().Get(resourceGroupName);
    ResourceGroupResource resourceGroupResource = resourceGroupOperation.Value;
    
    Console.WriteLine("Create a data factory " + dataFactoryName + "...");
    DataFactoryData dataFactoryData = new DataFactoryData(AzureLocation.EastUS2);
    var dataFactoryOperation = resourceGroupResource.GetDataFactories().CreateOrUpdate(WaitUntil.Completed, dataFactoryName, dataFactoryData);
    Console.WriteLine(dataFactoryOperation.WaitForCompletionResponse().Content);
    
    // Get the data factory resource
    DataFactoryResource dataFactoryResource = dataFactoryOperation.Value;
    

建立連結的服務

將下列程式碼新增至 Main 方法,以建立 Azure 儲存體連結服務

您在資料處理站中建立的連結服務會將您的資料存放區和計算服務連結到資料處理站。 在本快速入門中,您只需要為複製來源與接收存放區建立一個 Azure Blob 儲存體連結服務;其在此範例中名為 "AzureBlobStorageLinkedService"。

// Create an Azure Storage linked service
Console.WriteLine("Create a linked service " + storageLinkedServiceName + "...");
AzureBlobStorageLinkedService azureBlobStorage = new AzureBlobStorageLinkedService()
{
    ConnectionString = azureBlobStorageConnectionString
};

DataFactoryLinkedServiceData linkedServiceData = new DataFactoryLinkedServiceData(azureBlobStorage);

var linkedServiceOperation = dataFactoryResource.GetDataFactoryLinkedServices().CreateOrUpdate(WaitUntil.Completed, storageLinkedServiceName, linkedServiceData);
Console.WriteLine(linkedServiceOperation.WaitForCompletionResponse().Content);

建立資料集

將下列程式碼新增至 Main 方法,以建立分隔符號文字資料集

您可以定義資料集來代表要從來源複製到接收的資料。 在此範例中,此分隔符號文字資料集會參考您在上一個步驟中建立的 Azure Blob 儲存體連結服務。 資料集會使用兩個參數,其值在取用資料集的活動中設定。 該參數是用來建構 "container" 和 "folderPath",以指向資料位於/儲存所在的位置。

// Create an Azure Blob dataset
DataFactoryLinkedServiceReference linkedServiceReference = new DataFactoryLinkedServiceReference(DataFactoryLinkedServiceReferenceType.LinkedServiceReference, storageLinkedServiceName);
DelimitedTextDataset delimitedTextDataset = new DelimitedTextDataset(linkedServiceReference)
{
    DataLocation = new AzureBlobStorageLocation
    {
        Container = DataFactoryElement<string>.FromExpression("@dataset().container"),
        FileName = DataFactoryElement<string>.FromExpression("@dataset().path")
    },
    Parameters =
    {
        new KeyValuePair<string, EntityParameterSpecification>("container",new EntityParameterSpecification(EntityParameterType.String)),
        new KeyValuePair<string, EntityParameterSpecification>("path",new EntityParameterSpecification(EntityParameterType.String))
    },
    FirstRowAsHeader = false,
    QuoteChar = "\"",
    EscapeChar = "\\",
    ColumnDelimiter = ","
};

DataFactoryDatasetData datasetData = new DataFactoryDatasetData(delimitedTextDataset);

var datasetOperation = dataFactoryResource.GetDataFactoryDatasets().CreateOrUpdate(WaitUntil.Completed, blobDatasetName, datasetData);
Console.WriteLine(datasetOperation.WaitForCompletionResponse().Content);

建立新管線

將下列程式碼新增至 Main 方法,以建立具有複製活動的管線

在此範例中,此管線包含一個活動,並使用四個參數:輸入 Blob 容器和路徑和輸出 Blob 容器和路徑。 這些參數的值是在觸發/執行管線時設定。 複製活動指的是在前一個步驟中建立作為輸入和輸出的同一 Blob 資料集。 當將該資料集用作輸入資料集時,即會指定輸入容器和路徑。 並且,當將該資料集用作輸出資料集時,即會指定輸出容器和路徑。

// Create a pipeline with a copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
DataFactoryPipelineData pipelineData = new DataFactoryPipelineData()
{
    Parameters =
    {
        new KeyValuePair<string, EntityParameterSpecification>("inputContainer",new EntityParameterSpecification(EntityParameterType.String)),
        new KeyValuePair<string, EntityParameterSpecification>("inputPath",new EntityParameterSpecification(EntityParameterType.String)),
        new KeyValuePair<string, EntityParameterSpecification>("outputContainer",new EntityParameterSpecification(EntityParameterType.String)),
        new KeyValuePair<string, EntityParameterSpecification>("outputPath",new EntityParameterSpecification(EntityParameterType.String))
    },
    Activities =
    {
        new CopyActivity("CopyFromBlobToBlob",new DataFactoryBlobSource(),new DataFactoryBlobSink())
        {
            Inputs =
            {
                new DatasetReference(DatasetReferenceType.DatasetReference,blobDatasetName)
                {
                    Parameters =
                    {
                        new KeyValuePair<string, BinaryData>("container", BinaryData.FromString("\"@pipeline().parameters.inputContainer\"")),
                        new KeyValuePair<string, BinaryData>("path", BinaryData.FromString("\"@pipeline().parameters.inputPath\""))
                    }
                }
            },
            Outputs =
            {
                new DatasetReference(DatasetReferenceType.DatasetReference,blobDatasetName)
                {
                    Parameters =
                    {
                        new KeyValuePair<string, BinaryData>("container", BinaryData.FromString("\"@pipeline().parameters.outputContainer\"")),
                        new KeyValuePair<string, BinaryData>("path", BinaryData.FromString("\"@pipeline().parameters.outputPath\""))
                    }
                }
            }
        }
    }
};

var pipelineOperation = dataFactoryResource.GetDataFactoryPipelines().CreateOrUpdate(WaitUntil.Completed, pipelineName, pipelineData);
Console.WriteLine(pipelineOperation.WaitForCompletionResponse().Content);

建立管線執行

將下列程式碼新增至 Main 方法,以觸發管線執行

此程式碼也會以來源和接收 Blob 路徑的實際值,設定在管線中指定的 inputContainerinputPathoutputContaineroutputPath 參數的值。

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, BinaryData> parameters = new Dictionary<string, BinaryData>()
{
    { "inputContainer",BinaryData.FromObjectAsJson(inputBlobContainer) },
    { "inputPath",BinaryData.FromObjectAsJson(inputBlobPath) },
    { "outputContainer",BinaryData.FromObjectAsJson(outputBlobContainer) },
    { "outputPath",BinaryData.FromObjectAsJson(outputBlobPath) }
};

var pipelineResource = dataFactoryResource.GetDataFactoryPipeline(pipelineName);
var runResponse = pipelineResource.Value.CreateRun(parameters);
Console.WriteLine("Pipeline run ID: " + runResponse.Value.RunId);

監視管線執行

  1. 將下列程式碼新增至 Main 方法,以持續檢查狀態,直到完成複製資料為止。

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    DataFactoryPipelineRunInfo pipelineRun;
    while (true)
    {
        pipelineRun = dataFactoryResource.GetPipelineRun(runResponse.Value.RunId.ToString());
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress" || pipelineRun.Status == "Queued")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    
  2. 將下列程式碼新增至 Main 方法,以取出複製活動執行的詳細資料,例如被讀取或寫入的資料大小。

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    var queryResponse = dataFactoryResource.GetActivityRun(pipelineRun.RunId.ToString(), 
        new RunFilterContent(DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)));
    
    var enumerator = queryResponse.GetEnumerator();
    enumerator.MoveNext();
    
    if (pipelineRun.Status == "Succeeded")
        Console.WriteLine(enumerator.Current.Output);
    else
        Console.WriteLine(enumerator.Current.Error);
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

執行程式碼

建置並啟動應用程式,然後確認管線執行。

主控台會印出建立資料處理站、連結服務、資料集、管線和管線執行的進度。 然後會檢查管線執行狀態。 等待出現複製活動執行詳細資料,以及讀取/寫入資料的大小。 然後,使用 Azure 儲存體總管之類的工具,檢查 Blob 已從 "inputBlobPath" 被複製到 "outputBlobPath" (如您在變數中所指定)。

範例輸出

Create a data factory quickstart-adf...
{
  "name": "quickstart-adf",
  "type": "Microsoft.DataFactory/factories",
  "properties": {
    "provisioningState": "Succeeded",
    "version": "2018-06-01"
  },
  "location": "eastus2"
}
Create a linked service AzureBlobStorage...
{
  "name": "AzureBlobStorage",
  "type": "Microsoft.DataFactory/factories/linkedservices",
  "properties": {
    "type": "AzureBlobStorage",
    "typeProperties": {
      "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;",
      "encryptedCredential": "<encryptedCredential>"
    }
  }
}
Creating dataset BlobDelimitedDataset...
{
  "name": "BlobDelimitedDataset",
  "type": "Microsoft.DataFactory/factories/datasets",
  "properties": {
    "type": "DelimitedText",
    "linkedServiceName": {
      "type": "LinkedServiceReference",
      "referenceName": "AzureBlobStorage"
    },
    "parameters": {
      "container": {
        "type": "String"
      },
      "path": {
        "type": "String"
      }
    },
    "typeProperties": {
      "location": {
        "container": {
          "type": "Expression",
          "value": "@dataset().container"
        },
        "type": "AzureBlobStorageLocation",
        "fileName": {
          "type": "Expression",
          "value": "@dataset().path"
        }
      },
      "columnDelimiter": ",",
      "quoteChar": "\"",
      "escapeChar": "\\",
      "firstRowAsHeader": false
    }
  }
}
Creating pipeline Adfv2QuickStartPipeline...
{
  "properties": {
    "activities": [
      {
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "BlobDelimitedDataset",
            "parameters": {
              "container": "@pipeline().parameters.inputContainer",
              "path": "@pipeline().parameters.inputPath"
            }
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "BlobDelimitedDataset",
            "parameters": {
              "container": "@pipeline().parameters.outputContainer",
              "path": "@pipeline().parameters.outputPath"
            }
          }
        ],
        "name": "CopyFromBlobToBlob",
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        }
      }
    ],
    "parameters": {
      "inputContainer": {
        "type": "String"
      },
      "inputPath": {
        "type": "String"
      },
      "outputContainer": {
        "type": "String"
      },
      "outputPath": {
        "type": "String"
      }
    }
  }
}
Creating pipeline run...
Pipeline run ID: 3aa26ffc-5bee-4db9-8bac-ccbc2d7b51c1
Checking pipeline run status...
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 1048,
  "dataWritten": 1048,
  "filesRead": 1,
  "filesWritten": 1,
  "sourcePeakConnections": 1,
  "sinkPeakConnections": 1,
  "copyDuration": 8,
  "throughput": 1.048,
  "errors": [],
  "effectiveIntegrationRuntime": "AutoResolveIntegrationRuntime (East US 2)",
  "usedDataIntegrationUnits": 4,
  "billingReference": {
    "activityType": "DataMovement",
    "billableDuration": [
      {
        "meterType": "AzureIR",
        "duration": 0.06666666666666667,
        "unit": "DIUHours"
      }
    ],
    "totalBillableDuration": [
      {
        "meterType": "AzureIR",
        "duration": 0.06666666666666667,
        "unit": "DIUHours"
      }
    ]
  },
  "usedParallelCopies": 1,
  "executionDetails": [
    {
      "source": {
        "type": "AzureBlobStorage"
      },
      "sink": {
        "type": "AzureBlobStorage"
      },
      "status": "Succeeded",
      "start": "2023-12-15T10:25:33.9991558Z",
      "duration": 8,
      "usedDataIntegrationUnits": 4,
      "usedParallelCopies": 1,
      "profile": {
        "queue": {
          "status": "Completed",
          "duration": 5
        },
        "transfer": {
          "status": "Completed",
          "duration": 1,
          "details": {
            "listingSource": {
              "type": "AzureBlobStorage",
              "workingDuration": 0
            },
            "readingFromSource": {
              "type": "AzureBlobStorage",
              "workingDuration": 0
            },
            "writingToSink": {
              "type": "AzureBlobStorage",
              "workingDuration": 0
            }
          }
        }
      },
      "detailedDurations": {
        "queuingDuration": 5,
        "transferDuration": 1
      }
    }
  ],
  "dataConsistencyVerification": {
    "VerificationResult": "NotVerified"
  }
}

Press any key to exit...

驗證輸出

管道會自動在 adftutorial Blob 容器中建立輸出資料夾。 然後,它會將 emp.txt 檔案從輸入資料夾複製到輸出資料夾。

  1. 在 Azure 入口網站中,於您在上述為 Blob 容器新增輸入資料夾和檔案一節中停止時所位於的 adftutorial 容器頁面上,選取 [重新整理] 以查看輸出資料夾。
  2. 在資料夾清單中,選取 [輸出]
  3. 確認 emp.txt 已複製到輸出資料夾。

清除資源

若要以程式設計方式刪除資料處理站,請將下列程式碼新增至程式:

Console.WriteLine("Deleting the data factory");
dataFactoryResource.Delete(WaitUntil.Completed);

下一步

在此範例中的管線會將資料從 Azure Blob 儲存體中的一個位置複製到其他位置。 瀏覽教學課程以了解使用 Data Factory 的更多案例。