在 Azure Data Factory 或 Azure Synapse Analytics 管線中使用自訂活動

適用於: Azure Data Factory Azure Synapse Analytics

提示

Data Factory in Microsoft Fabric 是下一代的 Azure Data Factory,擁有更簡單的架構、內建 AI 及新功能。 如果你是資料整合新手,建議先從 Fabric Data Factory 開始。 現有的 ADF 工作負載可升級至 Fabric,以存取資料科學、即時分析與報告等新能力。

你可以在 Azure Data Factory 或 Synapse 管線中使用兩種活動。

若要將資料移入/移出服務不支援的資料存放區,或以服務不支援的方式轉換/處理資料,您可以利用自己的資料移動或轉換邏輯建立自訂活動,然後在管線中使用活動。 自訂活動會在一個Azure Batch虛擬機池上執行你自訂的程式碼邏輯。

備註

我們建議你使用 Azure Az PowerShell 模組來與 Azure 互動。 要開始,請參考 安裝 Azure PowerShell。 想了解如何遷移到 Az PowerShell 模組,請參考 Migrate Azure PowerShell from AzureRM to Az

如果您是 Azure Batch 服務的新手,請參考以下文章:

重要

建立新的 Azure Batch 池時,必須使用「VirtualMachineConfiguration」,而非「CloudServiceConfiguration」。

使用 UI 將自訂活動新增至管線

若要在管線中使用自訂活動,請完成下列步驟:

  1. 在管線 [活動] 窗格中搜尋「自訂」,然後將自訂活動拖曳至管線畫布。

  2. 在畫布上選取新的自訂活動項目(如果尚未選取的話)。

  3. 請選擇 Azure Batch 標籤,選擇或建立一個新的 Azure Batch 連結服務,該服務將執行自訂活動。

    顯示自訂活動的UI。

  4. 選擇 Settings 標籤,指定要在Azure Batch執行的指令,並可選進階細節。

    顯示 [自訂活動] 的 [設定] 索引標籤介面。

Azure Batch 連結服務

以下 JSON 定義了一個範例的 Azure Batch 連結服務。 如需詳細資訊,請參閱支援的計算環境

{
    "name": "AzureBatchLinkedService",
    "properties": {
        "type": "AzureBatch",
        "typeProperties": {
            "accountName": "batchaccount",
            "accessKey": {
                "type": "SecureString",
                "value": "access key"
            },
            "batchUri": "https://batchaccount.region.batch.azure.com",
            "poolName": "poolname",
            "linkedServiceName": {
                "referenceName": "StorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
}

欲了解更多關於Azure Batch連結服務,請參閱 Compute 連結服務 文章。

自訂活動

下列 JSON 片段定義了一個具有簡單自訂活動的管線。 活動定義中參考了 Azure Batch 連結服務。

{
  "name": "MyCustomActivityPipeline",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "helloworld.exe",
        "folderPath": "customactv2/helloworld",
        "resourceLinkedService": {
          "referenceName": "StorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }]
  }
}

在這個範例中,helloworld.exe 是一個自訂應用程式,儲存在 resourceLinkedService 中使用的 Azure Storage 帳號的 customactv2/helloworld 資料夾中。 自訂活動會提交這個自訂應用程式,讓它在 Azure Batch 上執行。 你可以將指令替換到任何可在 Azure Batch Pool 節點目標作業系統上執行的偏好應用程式。

下表描述此活動特有的屬性之名稱和描述。

屬性 描述 必要
名稱 管線中的活動名稱 Yes
描述 說明活動用途的文字。
型別 針對自訂活動,活動類型是自訂 Yes
linkedServiceName 將服務連結至 Azure Batch。 若要深入了解此已連結的服務,請參閱計算已連結的服務一文。 Yes
命令 要執行的自訂應用程式命令。 如果應用程式已經在 Azure Batch 池節點上可用,則可以跳過 resourceLinkedService 和 folderPath。 例如,你可以指定指令為 cmd /c dir,這是 Windows Batch Pool 節點原生支援的。 Yes
resourceLinkedService Azure Storage 連結服務,用於連結存放自訂應用程式的儲存帳戶 否 *
資料夾路徑 自訂應用程式及其所有相依項目的資料夾路徑

如果你有相依性儲存在子資料夾裡——也就是在 folderPath 下的階層式資料夾結構中——當檔案複製到 Azure Batch 時,資料夾結構目前會被壓扁。 也就是所有檔案會複製到沒有子資料夾的單一資料夾中。 若要解決這個問題行為,請考慮壓縮檔案並複製壓縮的檔案,然後在所需位置中以自訂程式碼來將其解壓縮。
否 *
參考物件 現有已連結的服務和資料集的陣列。 參考的連結服務和資料集以 JSON 格式傳遞至自訂應用程式,讓您的自訂程式碼可以參考服務的資源
extendedProperties 使用者定義的屬性,可以傳遞至 JSON 格式的自訂應用程式,讓您的自訂程式碼可以參考其他屬性
保留時間(天) 提交給自訂活動的檔案的保留時間。 預設值為 30 天。

* resourceLinkedServicefolderPath 屬性必須兩者都指定或都省略。

備註

如果您在自訂活動中將連結服務作為 referenceObjects 傳遞,建議您將已啟用 Azure Key Vault 的連結服務傳遞進去 (因為其不包含任何安全字串),並在程式碼中直接使用祕密名稱從 Key Vault 擷取認證,這是良好的安全性做法。 你可以找到一個範例 here,它引用啟用 AKV 的連結服務,從 Key Vault 取得憑證,然後存取程式碼中的儲存空間。

備註

目前 resourceLinkedService 在自訂活動中只支援 Azure Blob 儲存,且它是唯一預設建立的連結服務,且無法選擇像 ADLS Gen2 這類其他連接器。

自訂活動權限

自訂活動將Azure Batch自動使用者帳號設定為非管理員存取,任務範圍(預設自動使用者規範)。 您無法變更自動使用者帳戶的權限等級。 如需詳細資訊,請參閱在 Batch 中的使用者帳戶執行工作 | 自動使用者帳戶

執行命令

您可以使用自訂活動直接執行命令。 以下範例在目標 Azure Batch Pool 節點執行「echo hello world」指令,並將輸出輸出輸出到 stdout。

{
  "name": "MyCustomActivity",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "cmd /c echo hello world"
      }
    }]
  }
}

傳遞物件和屬性

此範例示範如何使用 referenceObjects 和 extendedProperties,將物件和使用者定義的屬性從服務傳遞至自訂應用程式。

{
  "name": "MyCustomActivityPipeline",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "SampleApp.exe",
        "folderPath": "customactv2/SampleApp",
        "resourceLinkedService": {
          "referenceName": "StorageLinkedService",
          "type": "LinkedServiceReference"
        },
        "referenceObjects": {
          "linkedServices": [{
            "referenceName": "AzureBatchLinkedService",
            "type": "LinkedServiceReference"
          }]
        },
        "extendedProperties": {          
          "connectionString": {
            "type": "SecureString",
            "value": "aSampleSecureString"
          },
          "PropertyBagPropertyName1": "PropertyBagValue1",
          "propertyBagPropertyName2": "PropertyBagValue2",
          "dateTime1": "2015-04-12T12:13:14Z"
        }
      }
    }]
  }
}

當活動執行時,referenceObjects 和 extendedProperties 會儲存在部署至與 SampleApp.exe 相同執行資料夾的下列檔案:

  • activity.json

    儲存延伸屬性與自訂活動的屬性。

  • linkedServices.json

    儲存 referenceObjects 屬性中定義之已連結的服務的陣列。

  • datasets.json

    儲存 referenceObjects 屬性中定義之資料集的陣列。

下列範例程式碼示範 SampleApp.exe 如何從 JSON 檔案存取必要的資訊:

using Newtonsoft.Json;
using System;
using System.IO;

namespace SampleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            //From Extend Properties
            dynamic activity = JsonConvert.DeserializeObject(File.ReadAllText("activity.json"));
            Console.WriteLine(activity.typeProperties.extendedProperties.connectionString.value);

            // From LinkedServices
            dynamic linkedServices = JsonConvert.DeserializeObject(File.ReadAllText("linkedServices.json"));
            Console.WriteLine(linkedServices[0].properties.typeProperties.accountName);
        }
    }
}

擷取執行輸出

您可以使用下列 PowerShell 命令啟動管線的執行:

$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName

管道運行中時,您可以使用下列命令檢視執行結果:

while ($True) {
    $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)

    if(!$result) {
        Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
    }
    elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
        Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
    }
    else {
        Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
        $result
        break
    }
    ($result | Format-List | Out-String)
    Start-Sleep -Seconds 15
}

Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
$result.Output -join "`r`n"

Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
$result.Error -join "`r`n"

您的自訂應用程式的 stdoutstderr 會儲存至您在建立 Azure Batch 連結服務時所定義的 Azure 儲存體連結服務中的 adfjobs 容器,並以該工作的 GUID 命名。 您可以從活動執行輸出取得詳細路徑,如下列程式碼片段所示:

Pipeline ' MyCustomActivity' run finished. Result:

ResourceGroupName : resourcegroupname
DataFactoryName   : datafactoryname
ActivityName      : MyCustomActivity
PipelineRunId     : xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
PipelineName      : MyCustomActivity
Input             : {command}
Output            : {exitcode, outputs, effectiveIntegrationRuntime}
LinkedServiceName :
ActivityRunStart  : 10/5/2017 3:33:06 PM
ActivityRunEnd    : 10/5/2017 3:33:28 PM
DurationInMs      : 21203
Status            : Succeeded
Error             : {errorCode, message, failureType, target}

Activity Output section:
"exitcode": 0
"outputs": [
  "https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stdout.txt",
  "https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stderr.txt"
]
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
Activity Error section:
"errorCode": ""
"message": ""
"failureType": ""
"target": "MyCustomActivity"

如果要在下游活動中取用 stdout.txt 的內容,您可以在 "@activity('MyCustomActivity').output.outputs[0]" 運算式中取得 stdout.txt 檔案的路徑。

重要

  • activity.json、linkedServices.json 和 datasets.json 會儲存在 Batch 工作的執行階段資料夾。 在此範例中,activity.json、linkedServices.json 及 datasets.json 儲存在 https://adfv2storage.blob.core.windows.net/adfjobs/<GUID>/runtime/ 路徑中。 如有需要,您必須個別清除它們。
  • 對於使用 Self-Hosted Integration Runtime 的連結服務,敏感資訊如金鑰或密碼會被 Self-Hosted Integration Runtime 加密,以確保憑證留在客戶定義的私人網路環境中。 某些機密欄位由您的自訂應用程式以這樣的方式參考時,可能會遺失。 視需要在 extendedProperties 中使用 SecureString,而不是使用 Linked Service 參考。

將輸出傳遞到其他活動

您可以在自訂活動中從程式碼將自訂值傳回給服務。 您可以從應用程式中將它們寫入到 outputs.json執行此動作。 服務複製 outputs.json 的內容,並當作 customOutput 屬性的值附加至活動輸出。 (大小限制為 2 MB)。如果您想要在下游活動中取用 outputs.json 的內容,您可以使用運算式 @activity('<MyCustomActivity>').output.customOutput 來取得值。

擷取 SecureString 輸出

指定為 SecureString 類型的敏感性屬性值 (如本文中的部分範例所示),在使用者介面上的 [監視] 索引標籤中會被遮罩。 但是,在實際的管線執行中,SecureString 屬性會以純文字表示的 JSON 形式序列化在 activity.json 檔案中。 例如:

"extendedProperties": {
  "connectionString": {
    "type": "SecureString",
    "value": "aSampleSecureString"
  }
}

此序列化不是真正的安全,而且也不是以安全為目的。 此意圖會提示服務在監視索引標籤中遮罩該值。

若要從自訂活動中存取 SecureString 型別的屬性,請讀取 activity.json 檔案 (與您的 .EXE 位在相同資料夾中)、將 JSON 還原序列化,然後存取 JSON 屬性 (extendedProperties => [propertyName] => value)。

自動調整的 Azure Batch

你也可以用 autoscale 功能建立一個 Azure Batch 池。 例如,你可以建立一個 Azure 批次池,裡面有 0 台專用虛擬機,並根據待處理任務數量自動調整比例。

此處的範例公式可達成下列行為:當集區最初建立時,會以 1 部 VM 啟動。 $PendingTasks 計量會定義執行中 + 作用中 (已排入佇列) 狀態的工作數目。 公式會尋找過去 180 秒內的平均擱置中工作數目,並據以設定 TargetDedicated。 它會確保 TargetDedicated 一律不會超過 25 部 VM。 因此,當提交新的工作時,集區會自動擴充;當工作完成時,VM 會逐一釋出,而自動調整會縮減這些 VM。 您可以視需要調整 startingNumberOfVMs 及 maxNumberofVMs。

自動調整公式:

startingNumberOfVMs = 1;
maxNumberofVMs = 25;
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
$TargetDedicated=min(maxNumberofVMs,pendingTaskSamples);

詳情請參見在 Azure Batch 池中自動調整計算節點

如果集區使用預設的 autoScaleEvaluationInterval,Batch 服務在執行自訂活動之前,可能需要 15-30 分鐘的時間準備 VM。 如果集區使用不同的 autoScaleEvaluationInterval,Batch 服務可能需要 autoScaleEvaluationInterval + 10 分鐘。

請參閱下列文章,其說明如何以其他方式轉換資料: