Verwenden benutzerdefinierter Aktivitäten in einer Pipeline von Azure Data Factory oder Azure Synapse Analytics

GILT FÜR: Azure Data Factory Azure Synapse Analytics

Tipp

Testen Sie Data Factory in Microsoft Fabric, eine All-in-One-Analyselösung für Unternehmen. Microsoft Fabric deckt alle Aufgaben ab, von der Datenverschiebung bis hin zu Data Science, Echtzeitanalysen, Business Intelligence und Berichterstellung. Erfahren Sie, wie Sie kostenlos eine neue Testversion starten!

Es gibt zwei Aktivitätstypen, die Sie in einer Pipeline in Azure Data Factory oder Synapse verwenden können.

Wenn Sie Daten in einen bzw. aus einem von dem Dienst nicht unterstützten Datenspeicher verschieben oder auf eine Weise transformieren/verarbeiten müssen, die von dem Dienst nicht unterstützt wird, können Sie auch eine benutzerdefinierte Aktivität mit Ihrer eigenen Datenverschiebungs- bzw. -transformationslogik erstellen und in einer Pipeline verwenden. Die benutzerdefinierte Aktivität führt Ihre angepasste Codelogik in einem Azure Batch-Pool mit virtuellen Computern aus.

Hinweis

Es wird empfohlen, das Azure Az PowerShell-Modul für die Interaktion mit Azure zu verwenden. Informationen zu den ersten Schritten finden Sie unter Installieren des Azure Az PowerShell-Moduls. Informationen zum Migrieren zum Az PowerShell-Modul finden Sie unter Migrieren von Azure PowerShell von AzureRM zum Az-Modul.

Lesen Sie die folgenden Artikel, wenn Sie noch nicht mit dem Azure Batch-Dienst vertraut sind:

Wichtig

Beim Erstellen eines neuen Azure Batch-Pools muss „VirtualMachineConfiguration“ (und NICHT „CloudServiceConfiguration“) verwendet werden. Weitere Informationen finden Sie im Azure Batch Pool migration guidance (Leitfaden zur Migration eines Azure Batch-Pools).

Hinzufügen benutzerdefinierter Aktivitäten zu einer Pipeline über die Benutzeroberfläche

Führen Sie die folgenden Schritte aus, um eine benutzerdefinierte Aktivität in einer Pipeline zu verwenden:

  1. Suchen Sie im Bereich „Aktivitäten“ der Pipeline nach Benutzerdefiniert, und ziehen Sie eine benutzerdefinierte Aktivität in die Pipelinecanvas.

  2. Wählen Sie die neue benutzerdefinierte Aktivität in der Canvas aus, wenn sie noch nicht ausgewählt ist.

  3. Wählen Sie die Registerkarte Azure Batch aus, um einen neuen verknüpften Azure Batch-Dienst auszuwählen oder zu erstellen, der die benutzerdefinierte Aktivität ausführt.

    Shows the UI for a Custom activity.

  4. Wählen Sie die Registerkarte Einstellungen aus, und geben Sie einen Befehl an, der in Azure Batch ausgeführt werden soll, sowie optionale erweiterte Details.

    Shows the UI for the Settings tab for a Custom activity.

Verknüpfter Azure Batch-Dienst

Der folgende JSON-Code definiert einen verknüpften Azure Batch-Beispieldienst. Weitere Informationen finden Sie unter Unterstützte Compute-Umgebungen

{
    "name": "AzureBatchLinkedService",
    "properties": {
        "type": "AzureBatch",
        "typeProperties": {
            "accountName": "batchaccount",
            "accessKey": {
                "type": "SecureString",
                "value": "access key"
            },
            "batchUri": "https://batchaccount.region.batch.azure.com",
            "poolName": "poolname",
            "linkedServiceName": {
                "referenceName": "StorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
}

Weitere Informationen zu diesem verknüpften Azure Batch-Dienst finden Sie im Artikel Von Azure Data Factory unterstützten Compute-Umgebungen.

Benutzerdefinierte Aktivität

Der folgende JSON-Ausschnitt definiert eine Pipeline mit einer einfachen benutzerdefinierten Aktivität. Die Aktivitätsdefinition enthält einen Verweis auf den verknüpften Azure Batch-Dienst.

{
  "name": "MyCustomActivityPipeline",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "helloworld.exe",
        "folderPath": "customactv2/helloworld",
        "resourceLinkedService": {
          "referenceName": "StorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }]
  }
}

In diesem Beispiel ist „helloworld. exe“ eine benutzerdefinierte Anwendung, die im Ordner „customactv2/helloworld“ des Azure Storage-Kontos gespeichert ist, das im „resourceLinkedService“ verwendet wird. Die benutzerdefinierte Aktivität übermittelt diese benutzerdefinierte Anwendung an Azure Batch. Sie können den Befehl in jede beliebige Anwendung ändern, die im Zielbetriebssystem auf den Knoten im Azure Batch-Pool ausgeführt werden kann.

Die folgende Tabelle beschreibt die Namen und Eigenschaften, die für diese Aktivität spezifisch sind.

Eigenschaft Beschreibung Erforderlich
name Name der Aktivität in der Pipeline Ja
description Ein Text, der beschreibt, was mit der Aktivität ausgeführt wird. Nein
type Für die benutzerdefinierte Aktivität ist der Aktivitätstyp Custom. Ja
linkedServiceName Mit Azure Batch verknüpfter Dienst. Weitere Informationen zu diesem verknüpften Dienst finden Sie im Artikel Von Azure Data Factory unterstützten Compute-Umgebungen. Ja
command Befehl der benutzerdefinierten Anwendung, der ausgeführt werden soll. Wenn die Anwendung bereits auf dem Knoten des Azure Batch-Pools verfügbar ist, können „resourceLinkedService“ und „folderPath“ übersprungen werden. Sie können beispielsweise den Befehl cmd /c dir angeben, was vom Knoten des Azure Batch-Pools nativ unterstützt wird. Ja
resourceLinkedService Mit dem Speicherkonto verknüpfter Azure Storage-Dienst, in dem die benutzerdefinierte Anwendung gespeichert wird. Nein *
folderPath Pfad zum Ordner der benutzerdefinierten Anwendung und allen ihren abhängigen Elementen

Wenn Sie Abhängigkeiten in Unterordnern gespeichert haben (also in einer hierarchischen Ordnerstruktur unter folderPath), wird die Ordnerstruktur zurzeit abgeflacht, wenn die Dateien nach Azure Batch kopiert werden. Das heißt, alle Dateien werden in einen einzigen Ordner ohne Unterordner kopiert. Um dieses Verhalten zu umgehen, sollten Sie die Dateien komprimieren, die komprimierte Datei kopieren und dann mit benutzerdefiniertem Code am gewünschten Speicherort entpacken.
Nein *
referenceObjects Array vorhandener verknüpfter Dienste und Datasets. Die verknüpften Dienste und Datasets auf die verwiesen wird, werden im JSON-Format an die benutzerdefinierte Anwendung übergeben, sodass Ihr benutzerdefinierter Code auf die Dienstressourcen verweisen kann Nein
extendedProperties Benutzerdefinierte Eigenschaften, die im JSON-Format an die benutzerdefinierte Anwendung übergeben werden können, sodass Ihr benutzerdefinierter Code auf zusätzliche Eigenschaften verweisen kann. Nein
retentionTimeInDays Die Vermerkdauer für die Dateien, die für die benutzerdefinierte Aktivität übermittelt werden. Der Standardwert ist 30 Tage. Nein

* Die Eigenschaften resourceLinkedService und folderPath müssen entweder beide angegeben oder beide weggelassen werden.

Hinweis

Wenn Sie in der benutzerdefinierten Aktivität verknüpfte Dienste als referenceObjects übergeben, wird aus Sicherheitsgründen empfohlen, einen Azure Key Vault-fähigen verknüpften Dienst zu übergeben (da keine sicheren Zeichenfolgen enthalten sind) und die Anmeldeinformationen anhand des Geheimnisnamens direkt aus dem Code von Key Vault abzurufen. Sie finden hier ein Beispiel, das auf einen mit Azure Key Vault (AKV) verknüpften Dienst verweist, die Anmeldeinformationen von Key Vault abruft und dann auf den Speicher im Code zugreift.

Hinweis

Derzeit wird nur Azure Blob Storage für „resourceLinkedService“ in benutzerdefinierten Aktivitäten unterstützt. Außerdem ist dies der einzige verknüpfte Dienst, der standardmäßig erstellt wird, und es gibt keine Option zum Auswählen anderer Connectors wie ADLS Gen2.

Berechtigungen für benutzerdefinierte Aktivitäten

Die benutzerdefinierte Aktivität legt das automatische Benutzerkonto von Azure Batch auf Zugriff ohne Administratorrechte mit Aufgabenbereich (die Standardspezifikation für den automatischen Benutzer) fest. Sie können die Berechtigungsebene für das automatische Benutzerkonto nicht ändern. Weitere Informationen finden Sie unter Ausführen von Aufgaben unter Benutzerkonten in Batch | Automatische Benutzerkonten.

Ausführen von Befehlen

Mithilfe einer benutzerdefinierten Aktivität können Sie direkt einen Befehl ausführen. Im folgenden Beispiel wird der „Echo Hello World“-Befehl auf dem Azure Batch-Poolzielknoten ausgeführt und in „stdout“ ausgegeben.

{
  "name": "MyCustomActivity",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "cmd /c echo hello world"
      }
    }]
  }
}

Übergeben von Objekten und Eigenschaften

Dieses Beispiel zeigt, wie Sie das „referenceObjects“ und das „extendedProperties“ verwenden können, um Objekte und benutzerdefinierte Eigenschaften von dem Dienst an Ihre benutzerdefinierte Anwendung zu übergeben.

{
  "name": "MyCustomActivityPipeline",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "SampleApp.exe",
        "folderPath": "customactv2/SampleApp",
        "resourceLinkedService": {
          "referenceName": "StorageLinkedService",
          "type": "LinkedServiceReference"
        },
        "referenceObjects": {
          "linkedServices": [{
            "referenceName": "AzureBatchLinkedService",
            "type": "LinkedServiceReference"
          }]
        },
        "extendedProperties": {          
          "connectionString": {
            "type": "SecureString",
            "value": "aSampleSecureString"
          },
          "PropertyBagPropertyName1": "PropertyBagValue1",
          "propertyBagPropertyName2": "PropertyBagValue2",
          "dateTime1": "2015-04-12T12:13:14Z"
        }
      }
    }]
  }
}

Wenn die Aktivität ausgeführt wird, werden „referenceObjects“ und „extendedProperties“ in folgenden Dateien gespeichert, die im selben Ausführungsordner von „SampleApp.exe“ bereitgestellt werden:

  • activity.json

    Speichert „extendedProperties“ und Eigenschaften der benutzerdefinierten Aktivität.

  • linkedServices.json

    Speichert ein Array verknüpfter Dienste, die in der „referenceObjects“-Eigenschaft definiert sind.

  • datasets.json

    Speichert ein Array von Datasets, die in der „referenceObjects“-Eigenschaft definiert sind.

Der folgende Beispielcode veranschaulicht, wie „SampleApp.exe“ auf die erforderlichen Informationen in JSON-Dateien zugreifen kann:

using Newtonsoft.Json;
using System;
using System.IO;

namespace SampleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            //From Extend Properties
            dynamic activity = JsonConvert.DeserializeObject(File.ReadAllText("activity.json"));
            Console.WriteLine(activity.typeProperties.extendedProperties.connectionString.value);

            // From LinkedServices
            dynamic linkedServices = JsonConvert.DeserializeObject(File.ReadAllText("linkedServices.json"));
            Console.WriteLine(linkedServices[0].properties.typeProperties.accountName);
        }
    }
}

Abrufen von Ausführungsausgaben

Sie können mit dem folgenden PowerShell-Befehl eine Pipelineausführung starten:

$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName

Wenn die Pipeline ausgeführt wird, können Sie mit den folgenden Befehlen die Ausgabe der Ausführung überprüfen:

while ($True) {
    $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)

    if(!$result) {
        Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
    }
    elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
        Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
    }
    else {
        Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
        $result
        break
    }
    ($result | Format-List | Out-String)
    Start-Sleep -Seconds 15
}

Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
$result.Output -join "`r`n"

Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
$result.Error -join "`r`n"

Die Angaben für stdout und stderr Ihrer benutzerdefinierten Anwendung werden im Container adfjobs im verknüpften Azure Storage-Dienst gespeichert, den Sie beim Erstellen des verknüpften Azure Batch-Diensts mit einer GUID der Aufgabe definiert haben. Den detaillierten Pfad finden Sie in der Ausgabe der Aktivitätsausführung, wie im folgenden Ausschnitt gezeigt:

Pipeline ' MyCustomActivity' run finished. Result:

ResourceGroupName : resourcegroupname
DataFactoryName   : datafactoryname
ActivityName      : MyCustomActivity
PipelineRunId     : xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
PipelineName      : MyCustomActivity
Input             : {command}
Output            : {exitcode, outputs, effectiveIntegrationRuntime}
LinkedServiceName :
ActivityRunStart  : 10/5/2017 3:33:06 PM
ActivityRunEnd    : 10/5/2017 3:33:28 PM
DurationInMs      : 21203
Status            : Succeeded
Error             : {errorCode, message, failureType, target}

Activity Output section:
"exitcode": 0
"outputs": [
  "https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stdout.txt",
  "https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stderr.txt"
]
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
Activity Error section:
"errorCode": ""
"message": ""
"failureType": ""
"target": "MyCustomActivity"

Wenn Sie den Inhalt von „stdout.txt“ in Downstreamaktivitäten nutzen möchten, können Sie den Pfad zur Datei „stdout.txt“ im Ausdruck „@activity('MyCustomActivity').output.outputs[0]“ abrufen.

Wichtig

  • Die Dateien „activity.json“, „linkedServices.json“ und „datasets.json“ werden im Ordner „runtime“ der Batch-Aufgabe gespeichert. In diesem Beispiel werden die Dateien „activity.json“, „linkedServices.json“ und „datasets.json“ im Pfad https://adfv2storage.blob.core.windows.net/adfjobs/<GUID>/runtime/ gespeichert. Bei Bedarf müssen diese separat bereinigt werden.
  • Für verknüpfte Dienste, die die selbstgehostete Integration Runtime verwenden, werden vertrauliche Informationen wie Schlüssel oder Kennwörter von der selbstgehosteten Integration Runtime verschlüsselt, um sicherzustellen, dass die Anmeldeinformationen in der vom Kunden definierten privaten Netzwerkumgebung verbleiben. Einige sensible Felder können fehlen, wenn auf sie von Ihrem eigenen Anwendungscode auf diese Weise verwiesen wird. Verwenden Sie bei Bedarf „SecureString“ in „extendedProperties“ anstelle des Verweises auf den verknüpften Dienst.

Übergeben von Ausgaben an eine andere Aktivität

Sie können benutzerdefinierte Werte aus Ihrem Code in einer benutzerdefinierten Aktivität an den Dienst zurückgeben. Schreiben Sie sie hierzu von Ihrer Anwendung aus in outputs.json. Der Dienst kopiert den Inhalt von outputs.json und fügt ihn als Wert der Eigenschaft customOutput an die Aktivitätsausgabe an. (Die maximale Größe beträgt 2 MB.) Wenn Sie den Inhalt von outputs.json in Downstreamaktivitäten nutzen möchten, können Sie den Wert unter Verwendung des Ausdrucks @activity('<MyCustomActivity>').output.customOutput abrufen.

Abrufen von SecureString-Ausgaben

Vertrauliche Eigenschaftswerte, die als Typ SecureString definiert sind (wie in einigen der Beispiele in diesem Artikel gezeigt), werden auf der Registerkarte „Überwachung“ in der Benutzeroberfläche ausgeblendet. Bei der tatsächlichen Ausführung der Pipeline wird jedoch eine SecureString-Eigenschaft als JSON innerhalb der activity.json-Datei als Nur-Text serialisiert. Beispiel:

"extendedProperties": {
  "connectionString": {
    "type": "SecureString",
    "value": "aSampleSecureString"
  }
}

Diese Serialisierung ist nicht wirklich sicher und soll auch nicht sicher sein. Die Absicht ist ein Hinweis an den Dienst zu geben, den Wert auf der Registerkarte Überwachung auszublenden.

Wenn Sie auf Eigenschaften des Typs SecureString von einer benutzerdefinierten Aktivität aus zugreifen möchten, lesen Sie die Datei activity.json, die sich in demselben Ordner wie Ihre EXE-Datei befindet, deserialisieren Sie den JSON-Code, und greifen Sie dann auf die JSON-Eigenschaft zu (extendedProperties => [propertyName] => Wert).

Automatische Skalierung von Azure Batch

Sie können einen Azure Batch-Pool auch mit dem Feature Automatisch skalieren erstellen. Sie können z. B. einen Azure Batch-Pool ohne dedizierte virtuelle Computer erstellen und dabei eine Formel für die automatische Skalierung angeben, die von der Anzahl der ausstehenden Aufgaben abhängig ist.

Mit dieser Beispielformel wird folgendes Verhalten erreicht: Nachdem der Pool erstellt wurde, wird er mit einer VM gestartet. Die Metrik „$PendingTasks“ legt die Anzahl der Aufgaben im ausgeführten und im aktiven (in der Warteschlange) Zustand fest. Die Formel sucht nach der durchschnittlichen Anzahl ausstehender Aufgaben in den letzten 180 Sekunden und legt TargetDedicated auf den entsprechenden Wert fest. Dadurch wird sichergestellt, dass TargetDedicated nie die Anzahl von 25 virtuellen Computern überschreitet. Wenn also neue Aufgaben gesendet werden, wächst der Pool automatisch an. Beim Abschluss von Aufgaben werden virtuelle Computer nacheinander frei, und durch die automatische Skalierung werden diese virtuellen Computer reduziert. startingNumberOfVMs und maxNumberofVMs können entsprechend den jeweiligen Anforderungen angepasst werden.

Formel für die automatische Skalierung:

startingNumberOfVMs = 1;
maxNumberofVMs = 25;
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
$TargetDedicated=min(maxNumberofVMs,pendingTaskSamples);

Weitere Informationen hierzu finden Sie unter Automatisches Skalieren von Computeknoten in einem Azure Batch-Pool .

Wenn der Pool die Standardeinstellung für autoScaleEvaluationIntervalverwendet, kann es 15 bis 30 Minuten dauern, bis der Batch-Dienst den virtuellen Computer vorbereitet hat und die benutzerdefinierte Aktivität ausgeführt wird. Wenn der Pool eine andere Einstellung für „autoScaleEvaluationInterval“ nutzt, könnte der Batch-Dienst „autoScaleEvaluationInterval“ + 10 Minuten verwenden.

In den folgenden Artikeln erfahren Sie, wie Daten auf andere Weisen transformiert werden: