事件
3月31日 下午11時 - 4月2日 下午11時
最大的網狀架構、Power BI 和 SQL 學習事件。 3 月 31 日 - 4 月 2 日。 使用程式代碼 FABINSIDER 來節省 $400 美元。
立即註冊適用於:Azure Data Factory
Azure Synapse Analytics
提示
試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用 (部分機器翻譯)!
本快速入門說明如何使用 .NET SDK 來建立 Azure Data Factory。 在此資料處理站中建立的管線會將資料從 Azure Blob 儲存體中的一個資料夾複製到其他資料夾。 如需如何使用 Azure Data Factory 轉換資料的教學課程,請參閱教學課程︰使用 Spark 轉換資料。
注意
本文不提供 Data Factory 服務的詳細簡介。 如需 Azure Data Factory 服務簡介,請參閱 Azure Data Factory 簡介。
如尚未擁有 Azure 訂用帳戶,請在開始之前先建立免費帳戶。
若要建立 Data Factory 執行個體,您用來登入 Azure 的使用者帳戶必須是 參與者 或 擁有者 角色的成員,或是 Azure 訂用帳戶的 系統管理員。 若要檢視您在訂用帳戶中擁有的權限,請移至 Azure 入口網站,選取右上角的使用者名稱、選取更多選項的 "..." 圖示,然後選取 [我的權限]。 如果您有多個訂用帳戶的存取權,請選取適當的訂用帳戶。
若要建立及管理 Data factory 的子資源 (包括資料集、連結服務、管線、觸發程序和整合執行階段),必須要符合下列需求:
如需將使用者新增至角色的範例指示,請參閱新增角色一文。
如需詳細資訊,請參閱下列文章:
您可以使用一般用途的 Azure 儲存體帳戶 (特別是 Blob 儲存體) 作為本教學課程中的「來源」和「目的地」資料存放區。 如果您沒有一般用途的 Azure 儲存體帳戶,請參閱建立儲存體帳戶以建立帳戶。
您在此快速入門中需要使用 Azure 儲存體帳戶的名稱。 下列程序提供可取得儲存體帳戶名稱的步驟:
您也可以從任何頁面搜尋並選取 [儲存體帳戶]。
在這一節中,您會在 Azure Blob 儲存體中建立一個名為 adftutorial 的 Blob 容器。
從 [儲存體帳戶] 頁面中,選取 [概觀]>[容器]。
在[帳戶名稱] <> - [容器] 頁面的工具列上,選取 [容器]。
在 [新增容器] 對話方塊中,輸入 adftutorial 作為名稱,然後選取 [確定]。 [帳戶名稱] <> [容器] - 頁面已更新,以在容器清單中納入 adftutorial。
在此節中,您會在您建立的容器中建立名為 input 的資料夾,然後將範例檔案上傳到 input 資料夾。 在您開始之前,請開啟文字編輯器 (例如 [記事本]) 並使用下列內容建立名為 emp.txt 的檔案:
John, Doe
Jane, Doe
將該檔案儲存在 C:\ADFv2QuickStartPSH 資料夾中。 (如果該資料夾不存在,請予以建立。)然後返回 Azure 入口網站並依照下列步驟執行:
在您先前退出的 [帳戶名稱] <> - [容器] 頁面中,從更新的容器清單中選取 [adftutorial]。
在 [adftutorial] 容器頁面的工具列上,選取 [上傳]。
在 [上傳 Blob] 頁面上,選取 [檔案] 方塊,然後瀏覽並選取 emp.txt 檔。
展開 [進階] 標題。 現在會顯示該頁面,如下所示:
在 [上傳至資料夾] 方塊中,輸入 input。
選取上傳按鈕。 您應該會在清單中看到 emp.txt 檔案以及上傳的狀態。
選取 [關閉] 圖示 (X) 以關閉 [上傳 Blob] 頁面。
保持 [adftutorial] 容器頁面開啟。 您可以在本快速入門結尾處使用它來確認輸出。
本文中的逐步解說是使用 Visual Studio 2019。 Visual Studio 2013、2015 或 2017 的程序會稍有不同。
從作法:使用入口網站來建立可存取資源的 Microsoft Entra 應用程式和服務主體 (部分機器翻譯) 中的章節,遵循其中的指示以執行下列工作:
https://contoso.org/exampleapp
)。接下來,在 Visual Studio 中建立 C# .NET 主控台應用程式:
選取 [工具]>[NuGet 套件管理員]>[套件管理員主控台]。
在 [套件管理員主控台] 窗格中,執行下列命令以安裝套件。 如需詳細資訊,請參閱 Azure.ResourceManager.DataFactory NuGet 套件 (英文)。
Install-Package Azure.ResourceManager.DataFactory -IncludePrerelease
Install-Package Azure.Identity
開啟 Program.cs,併入下列陳述式以將參考新增至命名空間。
using Azure;
using Azure.Core;
using Azure.Core.Expressions.DataFactory;
using Azure.Identity;
using Azure.ResourceManager;
using Azure.ResourceManager.DataFactory;
using Azure.ResourceManager.DataFactory.Models;
using Azure.ResourceManager.Resources;
using System;
using System.Collections.Generic;
將下列程式碼新增至 Main 方法,以設定變數。 使用您自己的值取代預留位置。 如需目前可使用 Data Factory 的 Azure 區域清單,請在下列頁面上選取您感興趣的區域,然後展開 [分析] 以找出 [Data Factory]:依區域提供的產品。 資料處理站所使用的資料存放區 (Azure 儲存體、Azure SQL Database 等) 和計算 (HDInsight 等) 可位於其他區域。
// Set variables
string tenantID = "<your tenant ID>";
string applicationId = "<your application ID>";
string authenticationKey = "<your authentication key for the application>";
string subscriptionId = "<your subscription ID where the data factory resides>";
string resourceGroup = "<your resource group where the data factory resides>";
string region = "<the location of your resource group>";
string dataFactoryName =
"<specify the name of data factory to create. It must be globally unique.>";
string storageAccountName = "<your storage account name to copy data>";
string storageKey = "<your storage account key>";
// specify the container and input folder from which all files
// need to be copied to the output folder.
string inputBlobContainer = "<blob container to copy data from, e.g. containername>";
string inputBlobPath = "<path to existing blob(s) to copy data from, e.g. inputdir/file>";
//specify the contains and output folder where the files are copied
string outputBlobContainer = "<blob container to copy data from, e.g. containername>";
string outputBlobPath = "<the blob path to copy data to, e.g. outputdir/file>";
// name of the Azure Storage linked service, blob dataset, and the pipeline
string storageLinkedServiceName = "AzureStorageLinkedService";
string blobDatasetName = "BlobDataset";
string pipelineName = "Adfv2QuickStartPipeline";
將下列程式碼新增至 Main 方法,以建立資料處理站。
ArmClient armClient = new ArmClient(
new ClientSecretCredential(tenantID, applicationId, authenticationKey, new TokenCredentialOptions
{
AuthorityHost = AzureAuthorityHosts.AzurePublicCloud
}),
subscriptionId,
new ArmClientOptions { Environment = ArmEnvironment.AzurePublicCloud }
);
ResourceIdentifier resourceIdentifier = SubscriptionResource.CreateResourceIdentifier(subscriptionId);
SubscriptionResource subscriptionResource = armClient.GetSubscriptionResource(resourceIdentifier);
Console.WriteLine("Get an existing resource group " + resourceGroupName + "...");
var resourceGroupOperation = subscriptionResource.GetResourceGroups().Get(resourceGroupName);
ResourceGroupResource resourceGroupResource = resourceGroupOperation.Value;
Console.WriteLine("Create a data factory " + dataFactoryName + "...");
DataFactoryData dataFactoryData = new DataFactoryData(AzureLocation.EastUS2);
var dataFactoryOperation = resourceGroupResource.GetDataFactories().CreateOrUpdate(WaitUntil.Completed, dataFactoryName, dataFactoryData);
Console.WriteLine(dataFactoryOperation.WaitForCompletionResponse().Content);
// Get the data factory resource
DataFactoryResource dataFactoryResource = dataFactoryOperation.Value;
將下列程式碼新增至 Main 方法,以建立 Azure 儲存體連結服務。
您在資料處理站中建立的連結服務會將您的資料存放區和計算服務連結到資料處理站。 在本快速入門中,您只需要為複製來源與接收存放區建立一個 Azure Blob 儲存體連結服務;其在此範例中名為 "AzureBlobStorageLinkedService"。
// Create an Azure Storage linked service
Console.WriteLine("Create a linked service " + storageLinkedServiceName + "...");
AzureBlobStorageLinkedService azureBlobStorage = new AzureBlobStorageLinkedService()
{
ConnectionString = azureBlobStorageConnectionString
};
DataFactoryLinkedServiceData linkedServiceData = new DataFactoryLinkedServiceData(azureBlobStorage);
var linkedServiceOperation = dataFactoryResource.GetDataFactoryLinkedServices().CreateOrUpdate(WaitUntil.Completed, storageLinkedServiceName, linkedServiceData);
Console.WriteLine(linkedServiceOperation.WaitForCompletionResponse().Content);
將下列程式碼新增至 Main 方法,以建立分隔符號文字資料集。
您可以定義資料集來代表要從來源複製到接收的資料。 在此範例中,此分隔符號文字資料集會參考您在上一個步驟中建立的 Azure Blob 儲存體連結服務。 資料集會使用兩個參數,其值在取用資料集的活動中設定。 該參數是用來建構 "container" 和 "folderPath",以指向資料位於/儲存所在的位置。
// Create an Azure Blob dataset
DataFactoryLinkedServiceReference linkedServiceReference = new DataFactoryLinkedServiceReference(DataFactoryLinkedServiceReferenceType.LinkedServiceReference, storageLinkedServiceName);
DelimitedTextDataset delimitedTextDataset = new DelimitedTextDataset(linkedServiceReference)
{
DataLocation = new AzureBlobStorageLocation
{
Container = DataFactoryElement<string>.FromExpression("@dataset().container"),
FileName = DataFactoryElement<string>.FromExpression("@dataset().path")
},
Parameters =
{
new KeyValuePair<string, EntityParameterSpecification>("container",new EntityParameterSpecification(EntityParameterType.String)),
new KeyValuePair<string, EntityParameterSpecification>("path",new EntityParameterSpecification(EntityParameterType.String))
},
FirstRowAsHeader = false,
QuoteChar = "\"",
EscapeChar = "\\",
ColumnDelimiter = ","
};
DataFactoryDatasetData datasetData = new DataFactoryDatasetData(delimitedTextDataset);
var datasetOperation = dataFactoryResource.GetDataFactoryDatasets().CreateOrUpdate(WaitUntil.Completed, blobDatasetName, datasetData);
Console.WriteLine(datasetOperation.WaitForCompletionResponse().Content);
將下列程式碼新增至 Main 方法,以建立具有複製活動的管線。
在此範例中,此管線包含一個活動,並使用四個參數:輸入 Blob 容器和路徑和輸出 Blob 容器和路徑。 這些參數的值是在觸發/執行管線時設定。 複製活動指的是在前一個步驟中建立作為輸入和輸出的同一 Blob 資料集。 當將該資料集用作輸入資料集時,即會指定輸入容器和路徑。 並且,當將該資料集用作輸出資料集時,即會指定輸出容器和路徑。
// Create a pipeline with a copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
DataFactoryPipelineData pipelineData = new DataFactoryPipelineData()
{
Parameters =
{
new KeyValuePair<string, EntityParameterSpecification>("inputContainer",new EntityParameterSpecification(EntityParameterType.String)),
new KeyValuePair<string, EntityParameterSpecification>("inputPath",new EntityParameterSpecification(EntityParameterType.String)),
new KeyValuePair<string, EntityParameterSpecification>("outputContainer",new EntityParameterSpecification(EntityParameterType.String)),
new KeyValuePair<string, EntityParameterSpecification>("outputPath",new EntityParameterSpecification(EntityParameterType.String))
},
Activities =
{
new CopyActivity("CopyFromBlobToBlob",new DataFactoryBlobSource(),new DataFactoryBlobSink())
{
Inputs =
{
new DatasetReference(DatasetReferenceType.DatasetReference,blobDatasetName)
{
Parameters =
{
new KeyValuePair<string, BinaryData>("container", BinaryData.FromString("\"@pipeline().parameters.inputContainer\"")),
new KeyValuePair<string, BinaryData>("path", BinaryData.FromString("\"@pipeline().parameters.inputPath\""))
}
}
},
Outputs =
{
new DatasetReference(DatasetReferenceType.DatasetReference,blobDatasetName)
{
Parameters =
{
new KeyValuePair<string, BinaryData>("container", BinaryData.FromString("\"@pipeline().parameters.outputContainer\"")),
new KeyValuePair<string, BinaryData>("path", BinaryData.FromString("\"@pipeline().parameters.outputPath\""))
}
}
}
}
}
};
var pipelineOperation = dataFactoryResource.GetDataFactoryPipelines().CreateOrUpdate(WaitUntil.Completed, pipelineName, pipelineData);
Console.WriteLine(pipelineOperation.WaitForCompletionResponse().Content);
將下列程式碼新增至 Main 方法,以觸發管線執行。
此程式碼也會以來源和接收 Blob 路徑的實際值,設定在管線中指定的 inputContainer、inputPath、outputContainer 和 outputPath 參數的值。
// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, BinaryData> parameters = new Dictionary<string, BinaryData>()
{
{ "inputContainer",BinaryData.FromObjectAsJson(inputBlobContainer) },
{ "inputPath",BinaryData.FromObjectAsJson(inputBlobPath) },
{ "outputContainer",BinaryData.FromObjectAsJson(outputBlobContainer) },
{ "outputPath",BinaryData.FromObjectAsJson(outputBlobPath) }
};
var pipelineResource = dataFactoryResource.GetDataFactoryPipeline(pipelineName);
var runResponse = pipelineResource.Value.CreateRun(parameters);
Console.WriteLine("Pipeline run ID: " + runResponse.Value.RunId);
將下列程式碼新增至 Main 方法,以持續檢查狀態,直到完成複製資料為止。
// Monitor the pipeline run
Console.WriteLine("Checking pipeline run status...");
DataFactoryPipelineRunInfo pipelineRun;
while (true)
{
pipelineRun = dataFactoryResource.GetPipelineRun(runResponse.Value.RunId.ToString());
Console.WriteLine("Status: " + pipelineRun.Status);
if (pipelineRun.Status == "InProgress" || pipelineRun.Status == "Queued")
System.Threading.Thread.Sleep(15000);
else
break;
}
將下列程式碼新增至 Main 方法,以取出複製活動執行的詳細資料,例如被讀取或寫入的資料大小。
// Check the copy activity run details
Console.WriteLine("Checking copy activity run details...");
var queryResponse = dataFactoryResource.GetActivityRun(pipelineRun.RunId.ToString(),
new RunFilterContent(DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)));
var enumerator = queryResponse.GetEnumerator();
enumerator.MoveNext();
if (pipelineRun.Status == "Succeeded")
Console.WriteLine(enumerator.Current.Output);
else
Console.WriteLine(enumerator.Current.Error);
Console.WriteLine("\nPress any key to exit...");
Console.ReadKey();
建置並啟動應用程式,然後確認管線執行。
主控台會印出建立資料處理站、連結服務、資料集、管線和管線執行的進度。 然後會檢查管線執行狀態。 等待出現複製活動執行詳細資料,以及讀取/寫入資料的大小。 然後,使用 Azure 儲存體總管之類的工具,檢查 Blob 已從 "inputBlobPath" 被複製到 "outputBlobPath" (如您在變數中所指定)。
Create a data factory quickstart-adf...
{
"name": "quickstart-adf",
"type": "Microsoft.DataFactory/factories",
"properties": {
"provisioningState": "Succeeded",
"version": "2018-06-01"
},
"location": "eastus2"
}
Create a linked service AzureBlobStorage...
{
"name": "AzureBlobStorage",
"type": "Microsoft.DataFactory/factories/linkedservices",
"properties": {
"type": "AzureBlobStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;",
"encryptedCredential": "<encryptedCredential>"
}
}
}
Creating dataset BlobDelimitedDataset...
{
"name": "BlobDelimitedDataset",
"type": "Microsoft.DataFactory/factories/datasets",
"properties": {
"type": "DelimitedText",
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureBlobStorage"
},
"parameters": {
"container": {
"type": "String"
},
"path": {
"type": "String"
}
},
"typeProperties": {
"location": {
"container": {
"type": "Expression",
"value": "@dataset().container"
},
"type": "AzureBlobStorageLocation",
"fileName": {
"type": "Expression",
"value": "@dataset().path"
}
},
"columnDelimiter": ",",
"quoteChar": "\"",
"escapeChar": "\\",
"firstRowAsHeader": false
}
}
}
Creating pipeline Adfv2QuickStartPipeline...
{
"properties": {
"activities": [
{
"inputs": [
{
"type": "DatasetReference",
"referenceName": "BlobDelimitedDataset",
"parameters": {
"container": "@pipeline().parameters.inputContainer",
"path": "@pipeline().parameters.inputPath"
}
}
],
"outputs": [
{
"type": "DatasetReference",
"referenceName": "BlobDelimitedDataset",
"parameters": {
"container": "@pipeline().parameters.outputContainer",
"path": "@pipeline().parameters.outputPath"
}
}
],
"name": "CopyFromBlobToBlob",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "BlobSink"
}
}
}
],
"parameters": {
"inputContainer": {
"type": "String"
},
"inputPath": {
"type": "String"
},
"outputContainer": {
"type": "String"
},
"outputPath": {
"type": "String"
}
}
}
}
Creating pipeline run...
Pipeline run ID: 3aa26ffc-5bee-4db9-8bac-ccbc2d7b51c1
Checking pipeline run status...
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
"dataRead": 1048,
"dataWritten": 1048,
"filesRead": 1,
"filesWritten": 1,
"sourcePeakConnections": 1,
"sinkPeakConnections": 1,
"copyDuration": 8,
"throughput": 1.048,
"errors": [],
"effectiveIntegrationRuntime": "AutoResolveIntegrationRuntime (East US 2)",
"usedDataIntegrationUnits": 4,
"billingReference": {
"activityType": "DataMovement",
"billableDuration": [
{
"meterType": "AzureIR",
"duration": 0.06666666666666667,
"unit": "DIUHours"
}
],
"totalBillableDuration": [
{
"meterType": "AzureIR",
"duration": 0.06666666666666667,
"unit": "DIUHours"
}
]
},
"usedParallelCopies": 1,
"executionDetails": [
{
"source": {
"type": "AzureBlobStorage"
},
"sink": {
"type": "AzureBlobStorage"
},
"status": "Succeeded",
"start": "2023-12-15T10:25:33.9991558Z",
"duration": 8,
"usedDataIntegrationUnits": 4,
"usedParallelCopies": 1,
"profile": {
"queue": {
"status": "Completed",
"duration": 5
},
"transfer": {
"status": "Completed",
"duration": 1,
"details": {
"listingSource": {
"type": "AzureBlobStorage",
"workingDuration": 0
},
"readingFromSource": {
"type": "AzureBlobStorage",
"workingDuration": 0
},
"writingToSink": {
"type": "AzureBlobStorage",
"workingDuration": 0
}
}
}
},
"detailedDurations": {
"queuingDuration": 5,
"transferDuration": 1
}
}
],
"dataConsistencyVerification": {
"VerificationResult": "NotVerified"
}
}
Press any key to exit...
管道會自動在 adftutorial Blob 容器中建立輸出資料夾。 然後,它會將 emp.txt 檔案從輸入資料夾複製到輸出資料夾。
若要以程式設計方式刪除資料處理站,請將下列程式碼新增至程式:
Console.WriteLine("Deleting the data factory");
dataFactoryResource.Delete(WaitUntil.Completed);
在此範例中的管線會將資料從 Azure Blob 儲存體中的一個位置複製到其他位置。 瀏覽教學課程以了解使用 Data Factory 的更多案例。
事件
3月31日 下午11時 - 4月2日 下午11時
最大的網狀架構、Power BI 和 SQL 學習事件。 3 月 31 日 - 4 月 2 日。 使用程式代碼 FABINSIDER 來節省 $400 美元。
立即註冊訓練
認證
Microsoft Certified: Azure Data Engineer Associate - Certifications
展現對常見資料工程工作的了解,以使用多種 Azure 服務在 Microsoft Azure 上實作和管理資料工程工作負載。
文件
使用複製資料工具複製資料 - Azure Data Factory
建立 Azure Data Factory,然後使用「複製資料」工具,將資料從 Azure Blob 儲存體中的某個位置複製到另一個位置。
使用 Azure Data Factory 在 Blob 儲存體中複製資料 - Azure Data Factory
使用 PowerShell 建立 Azure Data Factory,以將資料從 Azure Blob 儲存體中的某個位置複製到另一個位置。
使用 REST API 建立 Azure Data Factory - Azure Data Factory
建立 Azure Data Factory 管線,以將資料從 Azure Blob 儲存體中的某個位置複製到另一個位置。