適用於:
Azure Data Factory
Azure Synapse Analytics
秘訣
Data Factory in Microsoft Fabric 是下一代的 Azure Data Factory,擁有更簡單的架構、內建 AI 及新功能。 如果你是資料整合新手,建議先從 Fabric Data Factory 開始。 現有的 ADF 工作負載可升級至 Fabric,以存取資料科學、即時分析與報告等新能力。
在這個教學中,你會建立一個資料工廠管線,將資料從 Azure Blob Storage 複製到 Azure SQL Database。 本教學課程中的設定模式從以檔案為基礎的資料存放區複製到關聯式資料存放區。 如需支援作為來源和接收的資料存放區清單,請參閱支援的資料存放區和格式。
您會在本教學課程中執行下列步驟:
- 建立資料處理站。
- 創建 Azure 儲存和 Azure SQL 資料庫的連接服務。
- 建立 Azure Blob 和 Azure SQL Database 資料集。
- 建立包含複製活動的管線。
- 啟動管道運行。
- 監視管線和活動執行。
這個教學使用 .NET SDK。 你也可以用其他機制來與Azure Data Factory互動,並參考Quickstarts下的樣本。
如果你沒有Azure訂閱,請在開始前建立一個free Azure帳號。
必要條件
- Azure Storage account。 您會使用 Blob 儲存體作為來源資料存放區。 如果你沒有Azure儲存帳號,請參考 建立一個通用儲存帳號。
- Azure SQL Database。 您會使用資料庫作為匯入資料存放區。 如果你在Azure SQL Database沒有資料庫,請參考在Azure SQL Database建立資料庫。
- Visual Studio。 本文的攻略使用 Visual Studio 2019。
- 針對.NET的Azure SDK。
- Microsoft Entra application。 如果您沒有 Microsoft Entra 應用程式,請參閱《如何:使用入口網站建立 Microsoft Entra 應用程式》的建立 Microsoft Entra 應用程式部分。 複製這些值,以便在後續步驟中使用:應用程式 (用戶端) 識別碼、驗證金鑰和目錄 (租用戶) 識別碼。 依照同一篇文章中的指示,將應用程式指派給「參與者」角色。
建立 Blob 和 SQL 資料表
現在,請建立來源 Blob 和接收端 SQL 資料表,為本教學課程準備 Azure Blob 和 Azure SQL Database。
建立來源 Blob
首先,請建立容器並將輸入文字檔上傳到其中,以建立來源 Blob:
開啟記事本。 複製下列文字並將其儲存在名為 inputEmp.txt 的本機檔案。
John|Doe Jane|Doe使用像 Azure Storage Explorer 這類工具建立 adfv2tutorial 容器,並將 inputEmp.txt 檔案上傳到容器中。
建立匯入 SQL 資料表
接著,建立接收端 SQL 資料表:
請使用以下 SQL 腳本在你的Azure SQL Database建立 dbo.emp 表格。
CREATE TABLE dbo.emp ( ID int IDENTITY(1,1) NOT NULL, FirstName varchar(50), LastName varchar(50) ) GO CREATE CLUSTERED INDEX IX_emp_ID ON dbo.emp (ID);允許 Azure 服務存取 SQL 資料庫。 請確保你允許存取伺服器中的 Azure 服務,讓資料工廠服務能將資料寫入 SQL 資料庫。 若要確認並開啟此設定,請執行下列步驟:
請到 Azure portal 管理你的 SQL 伺服器。 搜尋並選取 [SQL 伺服器]。
選取您的伺服器。
在 [SQL 伺服器] 功能表的 [安全性] 標題下,選取 [防火牆和虛擬網路]。
在 防火牆與虛擬網路頁面,允許Azure服務與資源存取此伺服器,選擇 ON。
建立一個 Visual Studio 專案
使用 Visual Studio,建立一個 C# .NET 控制台應用程式。
- 打開 Visual Studio。
- 在 [開始] 視窗中,選取 [建立新專案]。
- 在 Create a new project 視窗中,從專案類型列表中選擇 Console App (.NET Framework) 的 C# 版本。 然後選取下一步。
- 在配置你的新專案視窗中,輸入一個專案名稱為ADFv2Tutorial。 針對位置,瀏覽至要儲存專案的目錄,及/或建立該目錄。 然後選取建立。 新專案出現在 Visual Studio IDE 中。
安裝 NuGet 套件
接下來,使用 NuGet 套件管理員安裝必要的程式庫套件。
在選單列中,選擇 Tools>NuGet Package Manager>Package Manager Console。
在 Package Manager Console 面板中,執行以下指令來安裝套件。 關於 Azure Data Factory NuGet 套件的資訊,請參見 Microsoft.Azure。Management.DataFactory.
Install-Package Microsoft.Azure.Management.DataFactory Install-Package Microsoft.Azure.Management.ResourceManager -PreRelease Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
建立資料處理站用戶端
請依照下列步驟,建立資料處理站用戶端。
開啟 Program.cs,然後使用下列程式碼來覆寫現有的
using陳述式,以新增命名空間的參考。using System; using System.Collections.Generic; using System.Linq; using Microsoft.Rest; using Microsoft.Rest.Serialization; using Microsoft.Azure.Management.ResourceManager; using Microsoft.Azure.Management.DataFactory; using Microsoft.Azure.Management.DataFactory.Models; using Microsoft.IdentityModel.Clients.ActiveDirectory;將下列程式碼新增至
Main方法,以設定變數。 將 14 個佔位符替換為您自己的值。目前 Data Factory 可用的 Azure 區域列表可在 依地區提供的產品 中查看。 在 [產品] 下拉式清單下,選擇 [瀏覽]>[分析]>[Data Factory]。 然後在 [區域] 下拉式清單中,選擇您感興趣的區域。 方格隨即出現,其中顯示您所選區域的 Data Factory 產品可用性狀態。
注意
Data Factory 使用的資料儲存區,例如 Azure Storage 和 Azure SQL Database,以及運算工具,例如 HDInsight,可能位於你選擇的 Data Factory 以外的其他區域。
// Set variables string tenantID = "<your tenant ID>"; string applicationId = "<your application ID>"; string authenticationKey = "<your authentication key for the application>"; string subscriptionId = "<your subscription ID to create the factory>"; string resourceGroup = "<your resource group to create the factory>"; string region = "<location to create the data factory in, such as East US>"; string dataFactoryName = "<name of data factory to create (must be globally unique)>"; // Specify the source Azure Blob information string storageAccount = "<your storage account name to copy data>"; string storageKey = "<your storage account key>"; string inputBlobPath = "adfv2tutorial/"; string inputBlobName = "inputEmp.txt"; // Specify the sink Azure SQL Database information string azureSqlConnString = "Server=tcp:<your server name>.database.windows.net,1433;" + "Database=<your database name>;" + "User ID=<your username>@<your server name>;" + "Password=<your password>;" + "Trusted_Connection=False;Encrypt=True;Connection Timeout=30"; string azureSqlTableName = "dbo.emp"; string storageLinkedServiceName = "AzureStorageLinkedService"; string sqlDbLinkedServiceName = "AzureSqlDbLinkedService"; string blobDatasetName = "BlobDataset"; string sqlDatasetName = "SqlDataset"; string pipelineName = "Adfv2TutorialBlobToSqlCopy";將下列程式碼新增至
Main方法,以建立DataFactoryManagementClient類別的執行個體。 您會使用此物件來建立資料處理站、連結服務、資料集和管道。 您也可以使用此物件來監視管線執行的詳細資訊。// Authenticate and create a data factory management client var context = new AuthenticationContext("https://login.windows.net/" + tenantID); ClientCredential cc = new ClientCredential(applicationId, authenticationKey); AuthenticationResult result = context.AcquireTokenAsync( "https://management.azure.com/", cc ).Result; ServiceClientCredentials cred = new TokenCredentials(result.AccessToken); var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
建立資料處理站
將下列程式碼新增至 Main 方法,以建立「資料處理站」。
// Create a data factory
Console.WriteLine("Creating a data factory " + dataFactoryName + "...");
Factory dataFactory = new Factory
{
Location = region,
Identity = new FactoryIdentity()
};
client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
Console.WriteLine(
SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings)
);
while (
client.Factories.Get(
resourceGroup, dataFactoryName
).ProvisioningState == "PendingCreation"
)
{
System.Threading.Thread.Sleep(1000);
}
建立連結服務
在本教學課程中,您會分別為來源和接收端建立兩個連結服務。
建立一個 Azure Storage 連結服務
將以下程式碼加入 Main 方法,建立一個 Azure Storage 連結服務。 有關支援屬性與細節,請參見 Azure Blob linked service properties。
// Create an Azure Storage linked service
Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
LinkedServiceResource storageLinkedService = new LinkedServiceResource(
new AzureStorageLinkedService
{
ConnectionString = new SecureString(
"DefaultEndpointsProtocol=https;AccountName=" + storageAccount +
";AccountKey=" + storageKey
)
}
);
client.LinkedServices.CreateOrUpdate(
resourceGroup, dataFactoryName, storageLinkedServiceName, storageLinkedService
);
Console.WriteLine(
SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings)
);
建立一個 Azure SQL Database 連結服務
將以下程式碼加入 Main 方法,建立一個 Azure SQL Database 連結服務。 有關支援屬性與細節,請參閱 Azure SQL Database 連結服務屬性。
// Create an Azure SQL Database linked service
Console.WriteLine("Creating linked service " + sqlDbLinkedServiceName + "...");
LinkedServiceResource sqlDbLinkedService = new LinkedServiceResource(
new AzureSqlDatabaseLinkedService
{
ConnectionString = new SecureString(azureSqlConnString)
}
);
client.LinkedServices.CreateOrUpdate(
resourceGroup, dataFactoryName, sqlDbLinkedServiceName, sqlDbLinkedService
);
Console.WriteLine(
SafeJsonConvert.SerializeObject(sqlDbLinkedService, client.SerializationSettings)
);
建立資料集
在本節中,您將建立兩個資料集:一個用於源,另一個用於匯。
為來源 Azure Blob 儲存體建立資料集
將以下程式碼加入 Main 方法,該方法建立 Azure blob 資料集。 有關支援屬性與細節,請參見 Azure Blob dataset properties。
你定義一個代表 Azure Blob 來源資料的資料集。 這個 Blob 資料集指的是你在前一步建立的 Azure Storage 連結服務,並描述:
- 複製的來源 Blob 位置:
FolderPath和FileName - Blob 格式顯示如何解析內容:
TextFormat及其設定,例如欄位分隔符號。 - 資料結構,包括資料行名稱和資料類型,在此範例中會對應到接收端 SQL 資料表
// Create an Azure Blob dataset
Console.WriteLine("Creating dataset " + blobDatasetName + "...");
DatasetResource blobDataset = new DatasetResource(
new AzureBlobDataset
{
LinkedServiceName = new LinkedServiceReference {
ReferenceName = storageLinkedServiceName
},
FolderPath = inputBlobPath,
FileName = inputBlobName,
Format = new TextFormat { ColumnDelimiter = "|" },
Structure = new List<DatasetDataElement>
{
new DatasetDataElement { Name = "FirstName", Type = "String" },
new DatasetDataElement { Name = "LastName", Type = "String" }
}
}
);
client.Datasets.CreateOrUpdate(
resourceGroup, dataFactoryName, blobDatasetName, blobDataset
);
Console.WriteLine(
SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings)
);
為接收端 Azure SQL Database 建立資料集
將以下程式碼加入 Main 方法,建立 Azure SQL Database資料集。 有關支援屬性與細節,請參閱 Azure SQL Database dataset properties。
你定義一個代表 Azure SQL Database 匯入資料的資料集。 這個資料集指的是你在前一步建立的 Azure SQL Database 連結服務。 它也會指定 SQL 資料表,其中保存已複製的資料。
// Create an Azure SQL Database dataset
Console.WriteLine("Creating dataset " + sqlDatasetName + "...");
DatasetResource sqlDataset = new DatasetResource(
new AzureSqlTableDataset
{
LinkedServiceName = new LinkedServiceReference
{
ReferenceName = sqlDbLinkedServiceName
},
TableName = azureSqlTableName
}
);
client.Datasets.CreateOrUpdate(
resourceGroup, dataFactoryName, sqlDatasetName, sqlDataset
);
Console.WriteLine(
SafeJsonConvert.SerializeObject(sqlDataset, client.SerializationSettings)
);
建立新管線
將下列程式碼新增至 Main 方法,以建立含有複製活動的管線。 在本教學課程中,此管道包含一個活動:CopyActivity,可接受 Blob 資料集作為來源,也接受 SQL 資料集作為接收。 關於複製活動的細節,請參見Azure Data Factory中的複製活動。
// Create a pipeline with copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
PipelineResource pipeline = new PipelineResource
{
Activities = new List<Activity>
{
new CopyActivity
{
Name = "CopyFromBlobToSQL",
Inputs = new List<DatasetReference>
{
new DatasetReference() { ReferenceName = blobDatasetName }
},
Outputs = new List<DatasetReference>
{
new DatasetReference { ReferenceName = sqlDatasetName }
},
Source = new BlobSource { },
Sink = new SqlSink { }
}
}
};
client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);
Console.WriteLine(
SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings)
);
建立管線執行
將下列程式碼新增至 Main 方法中,以觸發管線執行 。
// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
resourceGroup, dataFactoryName, pipelineName
).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
監視管線執行
現在插入程式碼以檢查管線執行狀態,並取得複製活動執行的詳細資料。
將下列程式碼新增至
Main方法,以持續檢查管線執行的狀態,直到完成複製資料為止。// Monitor the pipeline run Console.WriteLine("Checking pipeline run status..."); PipelineRun pipelineRun; while (true) { pipelineRun = client.PipelineRuns.Get( resourceGroup, dataFactoryName, runResponse.RunId ); Console.WriteLine("Status: " + pipelineRun.Status); if (pipelineRun.Status == "InProgress") System.Threading.Thread.Sleep(15000); else break; }將下列程式碼新增至
Main方法,以取出複製活動執行的詳細資料,例如被讀取或寫入的資料大小。// Check the copy activity run details Console.WriteLine("Checking copy activity run details..."); RunFilterParameters filterParams = new RunFilterParameters( DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10) ); ActivityRunsQueryResponse queryResponse = client.ActivityRuns.QueryByPipelineRun( resourceGroup, dataFactoryName, runResponse.RunId, filterParams ); if (pipelineRun.Status == "Succeeded") { Console.WriteLine(queryResponse.Value.First().Output); } else Console.WriteLine(queryResponse.Value.First().Error); Console.WriteLine("\nPress any key to exit..."); Console.ReadKey();
執行程式碼
選擇 [建置]>[建置解決方案] 來建置應用程式。 請選擇 [偵錯]>[開始偵錯],以啟動應用程式,然後確認管線執行。
主控台會列印建立 Data Factory、連結服務、資料集、管線及管線執行的進度。 然後檢查管線的運行狀態。 請等到您看見複製活動執行詳細資料,其中顯示讀取/寫入的資料大小。 接著,使用 SQL Server Management Studio(SSMS)或 Visual Studio 等工具,您可以連接到目的地的 Azure SQL Database,檢查您指定的目標資料表是否包含複製的資料。
範例輸出
Creating a data factory AdfV2Tutorial...
{
"identity": {
"type": "SystemAssigned"
},
"location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": {
"type": "SecureString",
"value": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
}
}
}
}
Creating linked service AzureSqlDbLinkedService...
{
"properties": {
"type": "AzureSqlDatabase",
"typeProperties": {
"connectionString": {
"type": "SecureString",
"value": "Server=tcp:<servername>.database.windows.net,1433;Database=<databasename>;User ID=<username>@<servername>;Password=<password>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
}
}
}
}
Creating dataset BlobDataset...
{
"properties": {
"type": "AzureBlob",
"typeProperties": {
"folderPath": "adfv2tutorial/",
"fileName": "inputEmp.txt",
"format": {
"type": "TextFormat",
"columnDelimiter": "|"
}
},
"structure": [
{
"name": "FirstName",
"type": "String"
},
{
"name": "LastName",
"type": "String"
}
],
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureStorageLinkedService"
}
}
}
Creating dataset SqlDataset...
{
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": "dbo.emp"
},
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureSqlDbLinkedService"
}
}
}
Creating pipeline Adfv2TutorialBlobToSqlCopy...
{
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "SqlSink"
}
},
"inputs": [
{
"type": "DatasetReference",
"referenceName": "BlobDataset"
}
],
"outputs": [
{
"type": "DatasetReference",
"referenceName": "SqlDataset"
}
],
"name": "CopyFromBlobToSQL"
}
]
}
}
Creating pipeline run...
Pipeline run ID: 1cd03653-88a0-4c90-aabc-ae12d843e252
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
"dataRead": 18,
"dataWritten": 28,
"rowsCopied": 2,
"copyDuration": 2,
"throughput": 0.01,
"errors": [],
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)",
"usedDataIntegrationUnits": 2,
"billedDuration": 2
}
Press any key to exit...
相關內容
本範例中的管線會將資料從一個位置複製到 Azure Blob 儲存體中的另一個位置。 您已了解如何︰
- 建立資料處理站。
- 創建 Azure 儲存和 Azure SQL 資料庫的連接服務。
- 建立 Azure Blob 和 Azure SQL Database 資料集。
- 建立包含複製活動的管線。
- 啟動管道運行。
- 監視管線和活動執行。
進入下列教學課程,以了解如何將資料從內部部署複製到雲端: