分享方式:


在 Azure Data Factory 或 Azure Synapse Analytics 管線中使用自訂活動

適用於:Azure Data Factory Azure Synapse Analytics

提示

試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用

您在 Azure Data Factory 或 Synapse 管線中可以使用兩種活動。

若要將資料移入/移出服務不支援的資料存放區,或以服務不支援的方式轉換/處理資料,您可以利用自己的資料移動或轉換邏輯建立自訂活動,然後在管線中使用活動。 自訂活動會在虛擬機器的 Azure Batch 集區上執行自訂程式碼邏輯。

注意

建議您使用 Azure Az PowerShell 模組來與 Azure 互動。 若要開始使用,請參閱 安裝 Azure PowerShell。 若要了解如何移轉至 Az PowerShell 模組,請參閱將 Azure PowerShell 從 AzureRM 移轉至 Az

如果您不熟悉 Azure Batch 服務,請參閱下列文章:

重要

建立新的 Azure Batch 集區時,必須使用 'VirtualMachineConfiguration',而「不是」'CloudServiceConfiguration'。 如需詳細資訊,請參閱 Azure Batch 集區移轉指導

使用 UI 將自訂活動新增至管線

若要在管線中使用自訂活動,請完成下列步驟:

  1. 在管線 [活動] 窗格中搜尋「自訂」,然後將自訂活動拖曳至管線畫布。

  2. 在畫布上選取新的自訂活動 (如果尚未選取)。

  3. 選取 [Azure Batch] 索引標籤,以選取或建立新的 Azure Batch 連結服務,用以執行自訂活動。

    顯示自訂活動的UI。

  4. 選取 [設定] 索引標籤,並指定要在 Azure Batch 上執行的命令,以及選用的進階詳細資料。

    顯示自訂活動的 [設定] 索引標籤 UI。

Azure Batch 已連結的服務

下列 JSON 會定義範例 Azure Batch 已連結的服務。 如需詳細資訊,請參閱支援的計算環境

{
    "name": "AzureBatchLinkedService",
    "properties": {
        "type": "AzureBatch",
        "typeProperties": {
            "accountName": "batchaccount",
            "accessKey": {
                "type": "SecureString",
                "value": "access key"
            },
            "batchUri": "https://batchaccount.region.batch.azure.com",
            "poolName": "poolname",
            "linkedServiceName": {
                "referenceName": "StorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
}

若要深入了解 Azure Batch 已連結的服務,請參閱計算已連結的服務一文。

自訂活動

下列 JSON 片段會定義具有範例自訂活動的管線。 活動定義具有對 Azure Batch 已連結的服務之參考。

{
  "name": "MyCustomActivityPipeline",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "helloworld.exe",
        "folderPath": "customactv2/helloworld",
        "resourceLinkedService": {
          "referenceName": "StorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }]
  }
}

在此範例中,helloworld.exe 是自訂應用程式,儲存在 resourceLinkedService 中使用之 Azure 儲存體帳戶的 customactv2/helloworld 資料夾。 自訂活動會提交此自訂應用程式以在 Azure Batch 上執行。 您可以取代在 Azure Batch 集區節點之目標作業系統上執行之任何偏好應用程式的命令。

下表描述此活動特有的屬性之名稱和描述。

屬性 描述 必要
NAME 管線中的活動名稱 Yes
description 說明活動用途的文字。 No
type 針對自訂活動,活動類型是自訂 Yes
linkedServiceName Azure Batch 的已連結的服務。 若要深入了解此已連結的服務,請參閱計算已連結的服務一文。 Yes
命令 要執行的自訂應用程式命令。 如果應用程式已經可以在 Azure Batch 集區節點上使用,則可以略過 resourceLinkedService 和 folderPath。 例如,您可以將命令指定為 cmd /c dir,該命令原生受 Windows Batch 集區節點支援。 Yes
resourceLinkedService 對儲存體帳戶 (自訂應用程式儲存所在) 的 Azure 儲存體已連結的服務 否 *
folderPath 自訂應用程式及其所有相依項目的資料夾路徑

如果您將相依性儲存在子資料夾中 (也就是 folderPath 下的階層式資料夾結構),當您將檔案複製到 Azure Batch 時,目前的資料夾結構會遭到壓平合併。 也就是所有檔案會複製到沒有子資料夾的單一資料夾中。 若要解決這個問題行為,請考慮壓縮檔案並複製壓縮的檔案,然後在所需位置中以自訂程式碼來將其解壓縮。
否 *
referenceObjects 現有已連結的服務和資料集的陣列。 參考的連結服務和資料集以 JSON 格式傳遞至自訂應用程式,讓您的自訂程式碼可以參考服務的資源 No
extendedProperties 使用者定義的屬性,可以傳遞至 JSON 格式的自訂應用程式,讓您的自訂程式碼可以參考其他屬性 No
retentionTimeInDays 檔案 (提交給自訂活動) 的保留時間。 預設值為 30 天。 No

* resourceLinkedServicefolderPath 屬性必須兩者都指定或都省略。

注意

如果您要在自訂活動中將連結服務當作 referenceObjects 傳遞,較安全的作法是傳遞已啟用 Azure Key Vault 的連結服務 (因為不含任何安全字串),並在程式碼中使用祕密名稱直接從Key Vault 擷取認證。 這裡有一個範例參考已啟用 AKV 的連結服務、從 Key Vault 擷取認證,然後在程式碼中存取儲存體。

注意

自訂活動中目前僅支援 resourceLinkedService 的 Azure Blob 儲存體,而且它是預設唯一建立的連結服務,沒有選擇其他連接器的選項,例如 ADLS Gen2。

自訂活動權限

自訂活動會將 Azure Batch 自動使用者帳戶設定為具有工作範圍 (預設的自動使用者規格) 的非系統管理員存取權。 您無法變更自動使用者帳戶的權限等級。 如需詳細資訊,請參閱在 Batch 中的使用者帳戶執行工作 | 自動使用者帳戶

執行命令

您可以使用自訂活動直接執行命令。 下列範例會在目標 Azure Batch 集區的節點上執行 "echo hello world" 命令,並將輸出列印至 stdout。

{
  "name": "MyCustomActivity",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "cmd /c echo hello world"
      }
    }]
  }
}

傳遞物件和屬性

此範例示範如何使用 referenceObjects 和 extendedProperties,將物件和使用者定義的屬性從服務傳遞至自訂應用程式。

{
  "name": "MyCustomActivityPipeline",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "SampleApp.exe",
        "folderPath": "customactv2/SampleApp",
        "resourceLinkedService": {
          "referenceName": "StorageLinkedService",
          "type": "LinkedServiceReference"
        },
        "referenceObjects": {
          "linkedServices": [{
            "referenceName": "AzureBatchLinkedService",
            "type": "LinkedServiceReference"
          }]
        },
        "extendedProperties": {          
          "connectionString": {
            "type": "SecureString",
            "value": "aSampleSecureString"
          },
          "PropertyBagPropertyName1": "PropertyBagValue1",
          "propertyBagPropertyName2": "PropertyBagValue2",
          "dateTime1": "2015-04-12T12:13:14Z"
        }
      }
    }]
  }
}

當活動執行時,referenceObjects 和 extendedProperties 會儲存在部署至與 SampleApp.exe 相同執行資料夾的下列檔案:

  • activity.json

    儲存 extendedProperties 和自訂活動的屬性。

  • linkedServices.json

    儲存 referenceObjects 屬性中定義之已連結的服務的陣列。

  • datasets.json

    儲存 referenceObjects 屬性中定義之資料集的陣列。

下列範例程式碼示範 SampleApp.exe 如何從 JSON 檔案存取必要的資訊:

using Newtonsoft.Json;
using System;
using System.IO;

namespace SampleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            //From Extend Properties
            dynamic activity = JsonConvert.DeserializeObject(File.ReadAllText("activity.json"));
            Console.WriteLine(activity.typeProperties.extendedProperties.connectionString.value);

            // From LinkedServices
            dynamic linkedServices = JsonConvert.DeserializeObject(File.ReadAllText("linkedServices.json"));
            Console.WriteLine(linkedServices[0].properties.typeProperties.accountName);
        }
    }
}

擷取執行輸出

您可以使用下列 PowerShell 命令啟動管線的執行:

$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName

管線正在執行時,您可以使用下列命令檢查執行的輸出:

while ($True) {
    $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)

    if(!$result) {
        Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
    }
    elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
        Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
    }
    else {
        Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
        $result
        break
    }
    ($result | Format-List | Out-String)
    Start-Sleep -Seconds 15
}

Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
$result.Output -join "`r`n"

Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
$result.Error -join "`r`n"

您的自訂應用程式的 stdoutstderr 會儲存至您在使用工作的 GUID 建立 Azure Batch 已連結的服務時,定義的 Azure 儲存體已連結的服務中的 adfjobs 容器。 您可以從活動執行輸出取得詳細路徑,如下列程式碼片段所示:

Pipeline ' MyCustomActivity' run finished. Result:

ResourceGroupName : resourcegroupname
DataFactoryName   : datafactoryname
ActivityName      : MyCustomActivity
PipelineRunId     : xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
PipelineName      : MyCustomActivity
Input             : {command}
Output            : {exitcode, outputs, effectiveIntegrationRuntime}
LinkedServiceName :
ActivityRunStart  : 10/5/2017 3:33:06 PM
ActivityRunEnd    : 10/5/2017 3:33:28 PM
DurationInMs      : 21203
Status            : Succeeded
Error             : {errorCode, message, failureType, target}

Activity Output section:
"exitcode": 0
"outputs": [
  "https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stdout.txt",
  "https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stderr.txt"
]
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
Activity Error section:
"errorCode": ""
"message": ""
"failureType": ""
"target": "MyCustomActivity"

如果要在下游活動中取用 stdout.txt 的內容,您可以在 "@activity('MyCustomActivity').output.outputs[0]" 運算式中取得 stdout.txt 檔案的路徑。

重要

  • activity.json、linkedServices.json 和 datasets.json 會儲存在 Batch 工作的執行階段資料夾。 在此範例中,activity.json、linkedServices.json 及 datasets.json 儲存在 https://adfv2storage.blob.core.windows.net/adfjobs/<GUID>/runtime/ 路徑中。 您必須視需要個別加以清除。
  • 如果已連結的服務使用自我裝載整合執行階段,機密資訊 (例如金鑰或密碼) 就會由自我裝載整合執行階段進行加密,以確保認證會保留在客戶定義的私人網路環境中。 某些機密欄位由您的自訂應用程式以這樣的方式參考時,可能會遺失。 視需要在 extendedProperties 中使用 SecureString,而不是使用已連結的服務參考。

將輸出傳遞到其他活動

您可以在自訂活動中從程式碼將自訂值傳回給服務。 您可以從應用程式中將它們寫入到 outputs.json執行此動作。 服務複製 outputs.json 的內容,並當作 customOutput 屬性的值附加至活動輸出。 (大小限制為 2 MB)。如果您想要在下游活動中取用 outputs.json 的內容,您可以使用運算式 @activity('<MyCustomActivity>').output.customOutput 來取得值。

擷取 SecureString 輸出

指定為 SecureString 型別的敏感性屬性值 (如本文中的部分範例所示),在使用者介面上的 [監視] 索引標籤中都加上遮罩。 但是,在實際的管線執行中,SecureString 屬性會在 activity.json 檔案中序列化為純文字形式的 JSON。 例如:

"extendedProperties": {
  "connectionString": {
    "type": "SecureString",
    "value": "aSampleSecureString"
  }
}

此序列化不是真正的安全,而且也不是以安全為目的。 用意是提示服務將 [監視] 索引標籤中的值加上遮罩。

若要從自訂活動中存取 SecureString 型別的屬性,請讀取 activity.json 檔案 (與您的 .EXE 位在相同資料夾中)、將 JSON 還原序列化,然後存取 JSON 屬性 (extendedProperties => [propertyName] => value)。

Azure Batch 的自動調整

您也可以建立具有 自動調整 功能的 Azure Batch 集區。 例如,您可以用 0 個專用 VM 和基於暫止工作數目的自動調整公式,建立 Azure Batch 集區。

這裡的範例公式會有下列行為:當集區一開始建立時,它會以 1 部 VM 啟動。 $PendingTasks 計量會定義執行中 + 作用中 (已排入佇列) 狀態的工作數目。 公式會尋找過去 180 秒內的平均擱置中工作數目,並據以設定 TargetDedicated。 它會確保 TargetDedicated 一律不會超過 25 部 VM。 因此,當提交新工作時,集區會自動成長而且工作會完成,VM 會依序成為可用,而且自動調整規模功能會壓縮那些 VM。 您可以視需要調整 startingNumberOfVMs 及 maxNumberofVMs。

自動調整公式:

startingNumberOfVMs = 1;
maxNumberofVMs = 25;
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
$TargetDedicated=min(maxNumberofVMs,pendingTaskSamples);

如需詳細資訊,請參閱 自動調整 Azure Batch 集區中的計算節點

如果集區使用預設的 autoScaleEvaluationInterval,Batch 服務在執行自訂活動之前,可能需要 15-30 分鐘的時間準備 VM。 如果集區使用不同的 autoScaleEvaluationInterval,Batch 服務可能需要 autoScaleEvaluationInterval + 10 分鐘。

請參閱下列文章,其說明如何以其他方式轉換資料: