Azure Data Factory 和 Azure Synapse Analytics 中的管線及活動
適用於:Azure Data Factory Azure Synapse Analytics
提示
試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用!
重要
對於 Azure Machine Learning 工作室 (傳統) 的支援將於 2024 年 8 月 31 日結束。 建議您在該日期之前轉換成 Azure Machine Learning。
自 2021 年 12 月 1 日起,您就無法建立新的 Machine Learning 工作室 (傳統) 資源 (工作區與 Web 服務方案)。 在 2024 年 8 月 31 日之前,您可以繼續使用現有的 Machine Learning 工作室 (傳統) 實驗與 Web 服務。 如需詳細資訊,請參閱
Machine Learning 工作室 (傳統) 文件即將淘汰,未來將不再更新。
本文協助您了解 Azure Data Factory 和 Azure Synapse Analytics 中的管線和活動,並使用這些項目來為您的資料移動和資料處理案例建構端對端的資料導向工作流程。
概觀
Data Factory 或 Synapse 工作區可有一或多個管線。 管線是共同執行一項工作的多個活動邏輯群組。 例如,管線可能包含內嵌和清理記錄資料的一組活動,然後啟動對應資料流程來分析記錄資料。 管線可供以集合而不是個別的方式來管理活動。 您可部署和排程管線,而不是個別的活動。
管線中活動會定義要對資料執行的動作。 例如,您可以使用複製活動將資料從 SQL Server 複製到 Azure Blob 儲存體。 然後,使用資料流程活動或 Databricks 筆記本活動來處理 Blob 儲存體中的資料,並將其轉換成 Azure Synapse Analytics 集區,以作為建置商業智慧報表解決方案的基礎。
Azure Data Factory 和 Azure Synapse Analytics 有三種活動群組︰資料移動活動、資料轉換活動,以及控制活動。 一個活動可以接受零個或多個輸入資料集,並且會產生一個或多個輸出資料集。 下圖顯示管線、活動及資料集之間的關聯性:
輸入資料集代表管線中活動的輸入,輸出資料集則代表活動的輸出。 資料集會識別不同資料存放區中的資料,例如資料表、檔案、資料夾和文件。 建立資料集之後,您可以將其與管線中的活動搭配使用。 例如,資料集可以是複製活動或 HDInsightHive 活動的輸入/輸出資料集。 如需有關資料集的詳細資訊,請參閱 Azure Data Factory 中的資料集一文。
注意
每個管道的預設軟性限制活動數目上限為 80 個,包含容器的內部活動。
資料移動活動
Data Factory 中的複製活動會將資料從來源資料存放區複製到接收資料存放區。 Data Factory 支援本節的表格中所列的資料存放區。 可將來自任何來源的資料寫入任何接收器。
如需詳細資訊,請參閱複製活動 - 概觀一文。
按一下資料存放區,即可了解如何將資料複製到該存放區,以及從該存放區複製資料。
注意
如果連接器標示為「預覽」則您可以試用並提供意見反應給我們。 如果您需要依賴解決方案中的預覽連接器,請連絡 Azure 支援。
資料轉換活動
Azure Data Factory 和 Azure Synapse Analytics 支援下列可個別新增或與其他活動鏈結的轉換活動。
如需詳細資訊,請參閱資料轉換活動一文。
資料轉換活動 | 計算環境 |
---|---|
資料流程 | 由 Azure Data Factory 管理的 Apache Spark 叢集 |
Azure 函式 | Azure Functions |
Hive | HDInsight [Hadoop] |
Pig | HDInsight [Hadoop] |
MapReduce | HDInsight [Hadoop] |
Hadoop 串流 | HDInsight [Hadoop] |
Spark | HDInsight [Hadoop] |
ML 工作室 (傳統) 活動︰批次執行和更新資源 | Azure VM |
預存程序 | Azure SQL、Azure Synapse Analytics 或 SQL Server |
U-SQL | Azure Data Lake Analytics |
自訂活動 | Azure Batch |
Databricks Notebook | Azure Databricks |
Databricks Jar 活動 | Azure Databricks |
Databricks Python 活動 | Azure Databricks |
Synapse Notebook 活動 | Azure Synapse Analytics |
控制流程活動
支援下列的控制流程活動:
控制活動 | 描述 |
---|---|
附加變數 | 將值新增至現有的陣列變數。 |
執行管線 | 執行管線活動允許 Data Factory 或 Synapse 管線叫用另一個管線。 |
Filter | 將篩選條件運算式套用至輸入陣列 |
For Each | ForEach 活動會定義管線中重複的控制流程。 此活動用於逐一查看整個集合,然後以迴圈執行指定的活動。 此活動的迴圈實作與程式設計語言中的 Foreach 迴圈結構相似。 |
取得中繼資料 | GetMetadata 活動可以用來擷取 Data Factory 或 Synapse 管線中任何資料的中繼資料。 |
If Condition 活動 | 「If 條件」可用於根據評估為 True 或 False 的條件進行分支。 If 條件活動所提供的功能,與 If 陳述式在程式設計語言中提供的功能相同。 其會在條件評估為 true 時執行一系列的活動,並在條件評估為 false. 時執行另一系列的活動 |
查閱活動 | 「查閱活動」可用於讀取或查閱任何外部來源的記錄/資料表名稱/值。 此輸出可供後續活動進一步參考。 |
設定變數 | 設定現有變數的值。 |
Until 活動 | 實作 Do-Until 迴圈,類似於程式設計語言中的 Do-Until 迴圈結構。 它會以迴圈的方式執行一系列活動,直到與該活動相關聯的條件評估為 True 為止。 您可以指定 Until 活動的逾時值。 |
驗證活動 | 確定管線只有在參考資料集存在、符合指定的準則或已達到逾時,才會繼續執行。 |
Wait 活動 | 在管線中使用 Wait (等待) 活動時,管線便會等待指定的時間,然後再繼續執行後續的活動。 |
Web 活動 | 使用 Web 活動可以從管線呼叫自訂的 REST 端點。 您可以傳遞資料集和連結服務,以供活動取用和存取。 |
Webhook 活動 | 使用 Webhook 活動,呼叫端點並傳遞回呼 URL。 管線執行會等候要叫用的回呼,然後再繼續進行下一個活動。 |
使用 UI 建立管線
若要建立新的管線,請瀏覽至 Data Factory Studio 中的 [作者] 索引標籤 (以鉛筆圖示表示),接著按一下加號,並從功能表中選擇 [管線],然後從子功能表中再次選擇 [管線]。
Data Factory 會顯示管線編輯器,您可以在其中尋找:
- 可以在管線內使用的所有活動。
- 管線編輯器畫布,而活動會在新增至管線時出現在其中。
- 管線設定窗格,包括參數、變數、一般設定和輸出。
- 管線屬性窗格,其中可以設定管線名稱、選擇性描述和註釋。 此窗格也會顯示資料處理站內管線的任何相關項目。
管線 JSON
以 JSON 格式定義管線的方式如下:
{
"name": "PipelineName",
"properties":
{
"description": "pipeline description",
"activities":
[
],
"parameters": {
},
"concurrency": <your max pipeline concurrency>,
"annotations": [
]
}
}
標記 | 描述 | 類型 | 必要 |
---|---|---|---|
NAME | 管線的名稱。 指定代表管線所執行之動作的名稱。
|
String | Yes |
description | 指定說明管線用途的文字。 | String | No |
activities | [ 活動 ] 區段內可以有一或多個已定義的活動。 如需活動 JSON 元素的詳細資料,請參閱活動 JSON 一節。 | 陣列 | Yes |
parameters | parameters 區段可以在管道內定義一或多個參數,讓管道變得更有彈性而可重複使用。 | 清單 | No |
concurrency | 管線可以具有的並行執行數目上限。 根據預設,沒有上限。 如果達到並行限制,則會將額外的管線執行排入佇列,等到之前的管線完成後再執行 | 數字 | No |
annotations | 與管線相關聯的標籤清單 | 陣列 | No |
活動 JSON
[ 活動 ] 區段內可以有一或多個已定義的活動。 主要的活動類型有兩種:執行和控制活動。
執行活動
執行活動包括資料移動活動和資料轉換活動。 這些活動具有下列最上層結構:
{
"name": "Execution Activity Name",
"description": "description",
"type": "<ActivityType>",
"typeProperties":
{
},
"linkedServiceName": "MyLinkedService",
"policy":
{
},
"dependsOn":
{
}
}
下表說明活動 JSON 定義內的屬性:
標記 | 描述 | 必要 |
---|---|---|
NAME | 活動名稱。 指定代表活動所執行之動作的名稱。
|
Yes |
description | 說明活動用途的文字 | Yes |
type | 活動的類型。 如需了解不同類型的活動,請參閱資料移動活動、資料轉換活動和控制活動各節。 | Yes |
linkedServiceName | 活動所使用的連結服務名稱。 活動可能需要您指定可連結至所需計算環境的連結服務。 |
是:HDInsight 活動、ML 工作室 (傳統) 批次評分活動和預存程序活動。 否:所有其他 |
typeProperties | typeProperties 區段中的屬性依每個活動的類型而定。 若要查看活動的類型屬性,請按一下先前小節中的活動連結。 | No |
原則 | 會影響活動之執行階段行為的原則。 這個屬性包含逾時和重試行為。 如果未指定,則會使用預設值。 如需詳細資訊,請參閱活動原則一節。 | No |
dependsOn | 此屬性用於定義活動相依性,以及後續活動如何相依於先前活動。 如需詳細資訊,請參閱活動相依性 | No |
活動原則
原則會影響活動的執行階段行為,也提供設定選項。 「活動原則」僅適用於執行活動。
活動原則 JSON 定義
{
"name": "MyPipelineName",
"properties": {
"activities": [
{
"name": "MyCopyBlobtoSqlActivity",
"type": "Copy",
"typeProperties": {
...
},
"policy": {
"timeout": "00:10:00",
"retry": 1,
"retryIntervalInSeconds": 60,
"secureOutput": true
}
}
],
"parameters": {
...
}
}
}
JSON 名稱 | 描述 | 允許的值 | 必要 |
---|---|---|---|
timeout | 指定活動執行的逾時。 | Timespan | 否。 預設逾時為 12 個小時,最小值為 10 分鐘。 |
retry | 重試次數上限 | 整數 | 否。 預設值為 0 |
retryIntervalInSeconds | 重試嘗試之間的延遲 (秒) | 整數 | 否。 預設值為 30 秒 |
secureOutput | 設定為 true 時,活動的輸出會被視為安全,且不會將其記錄以進行監視。 | 布林值 | 否。 預設為 False。 |
控制活動
控制活動具有下列最上層結構:
{
"name": "Control Activity Name",
"description": "description",
"type": "<ActivityType>",
"typeProperties":
{
},
"dependsOn":
{
}
}
標記 | 描述 | 必要 |
---|---|---|
NAME | 活動名稱。 指定代表活動所執行之動作的名稱。
|
Yes |
description | 說明活動用途的文字 | Yes |
type | 活動的類型。 如需了解不同類型的活動,請參閱資料移動活動、資料轉換活動和控制活動各節。 | Yes |
typeProperties | typeProperties 區段中的屬性依每個活動的類型而定。 若要查看活動的類型屬性,請按一下先前小節中的活動連結。 | No |
dependsOn | 這個屬性用來定義活動相依性,以及後續活動如何相依於先前活動。 如需詳細資訊,請參閱活動相依性。 | No |
活動相依性
活動相依性可定義後續活動如何相依於先前活動,根據條件以決定是否繼續執行下一項工作。 一個活動可以根據不同的相依性條件,而相依於一或多個先前活動。
各種相依性條件包括:成功、失敗、略過、完成。
例如,如果管道有活動 A -> 活動 B,則可能發生的各種案例如下:
- 活動 B 對於活動 A 的相依性條件為成功:只有當活動 A 的最終狀態為成功時,活動 B 才會執行
- 活動 B 對於活動 A 的相依性條件為失敗:只有當活動 A 的最終狀態為失敗時,活動 B 才會執行
- 活動 B 對於活動 A 的相依性條件為完成:如果活動 A 的最終狀態為成功或失敗,活動 B 會執行
- 活動 B 對於活動 A 的相依性條件為略過:如果活動 A 的最終狀態為略過,活動 B 會執行。 「略過」發生於活動 X -> 活動 Y -> 活動 Z 的案例中,每個活動只有在前一個活動成功時才會執行。 如果活動 X 失敗,則活動 Y 的狀態為「略過」,因為永遠不會執行。 同樣地,活動 Z 的狀態也是「略過」。
範例:活動 2 相依於活動 1 成功
{
"name": "PipelineName",
"properties":
{
"description": "pipeline description",
"activities": [
{
"name": "MyFirstActivity",
"type": "Copy",
"typeProperties": {
},
"linkedServiceName": {
}
},
{
"name": "MySecondActivity",
"type": "Copy",
"typeProperties": {
},
"linkedServiceName": {
},
"dependsOn": [
{
"activity": "MyFirstActivity",
"dependencyConditions": [
"Succeeded"
]
}
]
}
],
"parameters": {
}
}
}
範例複製管線
在以下的範例管線中, Copy in the 活動 類型的活動。 在此範例中,複製活動會將資料從 Azure Blob 儲存體複製到 Azure SQL Database 中的資料庫。
{
"name": "CopyPipeline",
"properties": {
"description": "Copy data from a blob to Azure SQL table",
"activities": [
{
"name": "CopyFromBlobToSQL",
"type": "Copy",
"inputs": [
{
"name": "InputDataset"
}
],
"outputs": [
{
"name": "OutputDataset"
}
],
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "SqlSink",
"writeBatchSize": 10000,
"writeBatchTimeout": "60:00:00"
}
},
"policy": {
"retry": 2,
"timeout": "01:00:00"
}
}
]
}
}
請注意下列幾點:
- 在活動區段中,只會有一個 type 設為 Copy 的活動。
- 活動的輸入設定為 InputDataset,活動的輸出則設定為 OutputDataset。 若要了解如何以 JSON 定義資料集,請參閱資料集一文。
- 在 typeProperties 區段中,來源類型指定為 BlobSource,接收類型指定為 SqlSink。 在資料移動活動區段中,按一下要作為來源或接收的資料存放區,以深入了解如何將資料移入/移出該資料存放區。
如需有關建立此管線的完整逐步解說,請參閱快速入門:建立資料處理站。
範例轉換管線
在以下的範例管線中, CopyActivity in the 活動 類型的活動。 在此範例中, HDInsight Hive 活動 會執行 Azure HDInsight Hadoop 叢集上的 Hive 指令碼檔案,來轉換 Azure Blob 儲存體的資料。
{
"name": "TransformPipeline",
"properties": {
"description": "My first Azure Data Factory pipeline",
"activities": [
{
"type": "HDInsightHive",
"typeProperties": {
"scriptPath": "adfgetstarted/script/partitionweblogs.hql",
"scriptLinkedService": "AzureStorageLinkedService",
"defines": {
"inputtable": "wasb://adfgetstarted@<storageaccountname>.blob.core.windows.net/inputdata",
"partitionedtable": "wasb://adfgetstarted@<storageaccountname>.blob.core.windows.net/partitioneddata"
}
},
"inputs": [
{
"name": "AzureBlobInput"
}
],
"outputs": [
{
"name": "AzureBlobOutput"
}
],
"policy": {
"retry": 3
},
"name": "RunSampleHiveActivity",
"linkedServiceName": "HDInsightOnDemandLinkedService"
}
]
}
}
請注意下列幾點:
- 在活動區段中,只會有一個 type 設為 HDInsightHive 的活動。
- Hive 指令碼檔案 partitionweblogs.hql 儲存於 Azure 儲存體帳戶 (由名為 AzureStorageLinkedService 的 scriptLinkedService 指定),且位於
adfgetstarted
容器的 script 資料夾中。 defines
區段可用來指定執行階段設定,這些設定會傳遞給 Hive 指令碼作為 Hive 設定值 (例如,${hiveconf:inputtable}
、${hiveconf:partitionedtable}
)。
每個轉換活動的 typeProperties 區段都不同。 若要了解轉換活動支援的 type 屬性,請在資料轉換活動中按一下該轉換活動。
如需有關建立此管道的完整逐步解說,請參閱教學課程:使用 Spark 轉換資料。
管線中的多個活動
前兩個範例管線都只包含一個活動。 您可以在一個管線中包含多個活動。 如果您在管線中有多個活動,而且後續活動不相依於先前活動,則活動可能會平行執行。
您可以使用活動相依性來鏈結兩個活動,以定義後續活動如何相依於先前活動,根據條件以決定是否繼續執行下一項工作。 一個活動可以根據不同的相依性條件,而相依於一或多個先前活動。
排程管道
管道由觸發程序排程。 有各種不同類型的觸發程序 (排程器觸發程序:可依時鐘排程來觸發管道;手動觸發程序:依需求觸發管道)。 如需觸發程序的詳細資訊,請參閱管道執行和觸發程序一文。
若要讓觸發程序啟動管道執行,您必須在觸發程序定義中包含特定管道的管道參考。 管道和觸發程序具有 n-m 關聯性。 多個觸發程序可以啟動單一管道,而相同的觸發程序可以啟動多個管道。 定義觸發程序之後,您必須啟動觸發程序,才會開始觸發管道。 如需觸發程序的詳細資訊,請參閱管道執行和觸發程序一文。
例如,假設您有排程器觸發程序「觸發程序 A」,而且我想要使用其來啟動我的管線 "MyCopyPipeline"。您可以如下列範例所示來定義觸發程序:
觸發程序 A 定義
{
"name": "TriggerA",
"properties": {
"type": "ScheduleTrigger",
"typeProperties": {
...
}
},
"pipeline": {
"pipelineReference": {
"type": "PipelineReference",
"referenceName": "MyCopyPipeline"
},
"parameters": {
"copySourceName": "FileSource"
}
}
}
}