Datenflussaktivität in Azure Data Factory und Azure Synapse Analytics
GILT FÜR: Azure Data Factory Azure Synapse Analytics
Tipp
Testen Sie Data Factory in Microsoft Fabric, eine All-in-One-Analyselösung für Unternehmen. Microsoft Fabric deckt alle Aufgaben ab, von der Datenverschiebung bis hin zu Data Science, Echtzeitanalysen, Business Intelligence und Berichterstellung. Erfahren Sie, wie Sie kostenlos eine neue Testversion starten!
Verwenden Sie die Datenflussaktivität, um Daten mithilfe von Mapping Data Flow zu transformieren und zu verschieben. Wenn Sie mit Datenflüssen noch nicht vertraut sind, finden Sie weitere Informationen in der Übersicht über Mapping Data Flow.
Erstellen einer Datenflussaktivität über die Benutzeroberfläche
Führen Sie die folgenden Schritte aus, um eine Datenflussaktivität in einer Pipeline zu verwenden:
Suchen Sie im Bereich mit den Pipelineaktivitäten nach Datenfluss, und ziehen Sie eine Datenflussaktivität in den Pipelinebereich.
Wählen Sie die neue Datenflussaktivität auf der Canvas aus (wenn sie nicht bereits ausgewählt ist), und wählen Sie anschließend die Registerkarte Einstellungen aus, um die Details zu bearbeiten.
Mit dem Prüfpunktschlüssel wird der Prüfpunkt festgelegt, wenn der Datenfluss für die geänderte Datenerfassung verwendet wird. Sie können ihn überschreiben. Datenflussaktivitäten verwenden einen GUID-Wert als Prüfpunktschlüssel anstelle von „Pipelinename + Aktivitätsname“, damit der CDC-Status (Change Data Capture) des Kunden auch dann nachverfolgt werden kann, wenn Umbenennungsaktionen durchgeführt werden. Alle vorhandenen Datenflussaktivitäten verwenden aus Gründen der Abwärtskompatibilität den Schlüssel mit dem alten Muster. Die Option für Prüfpunktschlüssel nach der Veröffentlichung einer neuen Datenflussaktivität mit aktivierter Change Data Capture-Datenflussressource ist nachfolgend dargestellt:
Wählen Sie einen vorhandenen Datenfluss aus, oder erstellen Sie mithilfe der Schaltfläche „Neu“ einen neuen. Wählen Sie nach Bedarf weitere Optionen aus, um Ihre Konfiguration abzuschließen.
Syntax
{
"name": "MyDataFlowActivity",
"type": "ExecuteDataFlow",
"typeProperties": {
"dataflow": {
"referenceName": "MyDataFlow",
"type": "DataFlowReference"
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "Fine",
"runConcurrently": true,
"continueOnError": true,
"staging": {
"linkedService": {
"referenceName": "MyStagingLinkedService",
"type": "LinkedServiceReference"
},
"folderPath": "my-container/my-folder"
},
"integrationRuntime": {
"referenceName": "MyDataFlowIntegrationRuntime",
"type": "IntegrationRuntimeReference"
}
}
Typeigenschaften
Eigenschaft | BESCHREIBUNG | Zulässige Werte | Erforderlich |
---|---|---|---|
dataflow | Der Verweis auf den Datenfluss, der ausgeführt wird. | DataFlowReference | Ja |
integrationRuntime | Die Computeumgebung, in der der Datenfluss ausgeführt wird. Wenn hier nichts angegeben wird, wird die Azure Integration Runtime mit automatischer Auflösung verwendet. | IntegrationRuntimeReference | Nein |
compute.coreCount | Die Anzahl von Kernen, die im Spark-Cluster verwendet werden. Kann nur angegeben werden, wenn die Azure Integration Runtime mit automatischer Auflösung verwendet wird. | 8, 16, 32, 48, 80, 144, 272 | Nein |
compute.computeType | Der Computetyp, der im Spark-Cluster verwendet wird. Kann nur angegeben werden, wenn die Azure Integration Runtime mit automatischer Auflösung verwendet wird. | „Allgemein“ | No |
staging.linkedService | Geben Sie das für PolyBase-Staging verwendete Speicherkonto an, wenn Sie eine Azure Synapse Analytics-Quelle oder -Senke verwenden. Wenn Ihre Azure Storage-Instanz mit einem VNET-Dienstendpunkt konfiguriert ist, müssen Sie die Authentifizierung per verwalteter Identität mit für das Speicherkonto aktivierter Option „Vertrauenswürdigen Microsoft-Diensten den Zugriff auf dieses Speicherkonto erlauben“ verwenden. Informationen hierzu finden Sie unter Auswirkungen der Verwendung von VNET-Dienstendpunkten mit Azure Storage. Außerdem erhalten Sie Informationen zu den erforderlichen Konfigurationen für Azure Blob bzw. Azure Data Lake Storage Gen2. |
LinkedServiceReference | Nur wenn der Datenfluss Daten in Azure Synapse Analytics liest oder schreibt |
staging.folderPath | Der für das PolyBase-Staging verwendete Ordnerpfad im Blobspeicherkonto, wenn Sie eine Azure Synapse Analytics-Quelle oder -Senke verwenden. | String | Nur wenn der Datenfluss Daten in Azure Synapse Analytics liest oder schreibt |
traceLevel | Festlegen des Protokolliergrads für die Ausführung Ihrer Datenflussaktivität | Fein, Grob, Keine | Nein |
Dynamisches Bestimmen der Größe der Computekapazität für den Datenfluss zur Laufzeit
Die Eigenschaften „Anzahl Kerne“ und „Computetyp“ können dynamisch festgelegt werden, um die Größe der eingehenden Quelldaten zur Laufzeit anzupassen. Verwenden Sie Pipelineaktivitäten wie „Nachschlagen“ oder „Metadaten abrufen“, um die Größe der Daten im Quelldatenset zu bestimmen. Verwenden Sie dann in den Eigenschaften der Datenflussaktivität „Dynamischen Inhalt hinzufügen“. Sie können kleine, mittlere oder große Computegrößen auswählen. Wählen Sie optional "Benutzerdefiniert" aus, und konfigurieren Sie die Berechnungstypen und die Anzahl der Kerne manuell.
Hier finden Sie ein kurzes Videotutorial, das diese Technik erläutert.
Datenfluss-Integration Runtime
Wählen Sie die Integration Runtime aus, die für die Ausführung Ihrer Datenflussaktivität verwendet werden soll. Standardmäßig verwendet der Dienst die Azure Integration Runtime mit automatischer Auflösung. Diese Integration Runtime weist einen allgemeinen Computetyp auf und wird in derselben Region wie Ihre Dienstinstanz ausgeführt. Für operationalisierte Pipelines wird dringend empfohlen, dass Sie Ihre eigene Azure Integration Runtime erstellen, die bestimmte Regionen, den Computetyp, die Kernanzahl und die Gültigkeitsdauer (TTL) für die Ausführung Ihrer Datenflussaktivität definiert.
Für die meisten Produktionsworkloads wird mindestens ein Computetyp „Universell“ mit einer Konfiguration von 8+8 (16 virtuellen Kernen insgesamt) und einer Gültigkeitsdauer (TTL) von zehn Minuten empfohlen. Durch Festlegen eines niedrigen TTL-Werts kann die Azure IR einen „warmen“ Cluster beibehalten, der nicht mehrere Minuten zum Starten benötigt, so wie ein „kalter“ Cluster. Weitere Informationen finden Sie unter Azure Integration Runtime.
Wichtig
Die Auswahl von Integration Runtime in der Datenflussaktivität bezieht sich nur auf getriggerte Ausführungen Ihrer Pipeline. Das Debuggen Ihrer Pipeline mit Datenflüssen wird auf dem in der Debugsitzung angegebenen Cluster ausgeführt.
PolyBase
Wenn Sie Azure Synapse Analytics als Senke oder Quelle verwenden, müssen Sie einen Stagingspeicherort für Ihren PolyBase-Batchladevorgang auswählen. PolyBase ermöglicht das Batchladen per Massenvorgang, anstatt die Daten zeilenweise zu laden. PolyBase verkürzt die Ladezeit in Azure Synapse Analytics erheblich.
Prüfpunktschlüssel
Wenn Sie die Änderungserfassungsoption für Datenflussquellen verwenden, verwaltet ADF den Prüfpunkt automatisch für Sie. Der Standardprüfpunktschlüssel ist ein Hash des Datenflussnamens und des Pipelinenamens. Wenn Sie ein dynamisches Muster für Ihre Quelltabellen oder -ordner verwenden, können Sie hier diesen Hash überschreiben und selbst einen Wert für den Prüfpunktschlüssel festlegen.
Protokolliergrad
Wenn es nicht erforderlich ist, dass jede Pipelineausführung Ihrer Datenflussaktivitäten alle ausführlichen Telemetrieprotokolle vollständig protokolliert, können Sie den Protokolliergrad optional auf „Standard“ oder „Kein“ festlegen. Wenn Sie Ihre Datenflüsse im Modus „Ausführlich“ (Standard) ausführen, fordern Sie vom Dienst, dass die Aktivität während der Datentransformation auf den einzelnen Partitionsebenen vollständig protokolliert wird. Da dies ein kostspieliger Vorgang sein kann, kann nur die ausschließliche Aktivierung von „Ausführlich“ bei der Problembehandlung den gesamten Datenfluss und die Pipelineleistung verbessern. Im Modus „Standard“ wird nur die Dauer der Transformation protokolliert, während der Modus „Kein“ lediglich eine Zusammenfassung der Werte für die Dauer bereitstellt.
Senkeneigenschaften
Mit der Gruppierungsfunktion in Datenflüssen können Sie sowohl die Ausführungsreihenfolge der Senken festlegen als auch Senken unter Verwendung derselben Gruppennummer gruppieren. Um die Verwaltung von Gruppen zu erleichtern, können Sie den Dienst anweisen, Senken aus der gleichen Gruppe parallel auszuführen. Sie können die Senkengruppe auch so festlegen, dass sie selbst dann fortgesetzt wird, wenn bei einer der Senken ein Fehler auftritt.
Beim Standardverhalten von Datenflusssenken wird jede Senke sequenziell nacheinander ausgeführt, und der Datenfluss schlägt fehl, wenn ein Fehler in der Senke auftritt. Außerdem werden alle Senken standardmäßig der gleichen Gruppe zugeordnet, es sei denn, Sie bearbeiten die Datenflusseigenschaften und legen unterschiedliche Prioritäten für die Senken fest.
Nur erste Zeile
Diese Option ist nur für Datenflüsse verfügbar, für die Cachesenken für "Ausgabe an Aktivität" aktiviert sind. Die Ausgabe des Datenflusses, der direkt in Ihre Pipeline eingefügt wird, ist auf 2 MB beschränkt. Wenn Sie "Nur erste Zeile" festlegen, können Sie die Datenausgabe des Datenflusses einschränken, wenn Sie die Ausgabe der Datenflussaktivität direkt in Ihre Pipeline einfügen.
Parametrisieren von Datenflüssen
Parametrisierte Datasets
Wenn im Datenfluss parametrisierte Datasets verwendet werden, legen Sie die Parameterwerte auf der Registerkarte Einstellungen fest.
Parametrisierte Datenflüsse
Wenn Ihr Datenfluss parametrisiert ist, legen Sie die dynamischen Werte der Datenflussparameter auf der Registerkarte Parameter fest. Sie können entweder die Pipelineausdruckssprache oder die Datenflussausdruckssprache verwenden, um dynamische oder literale Parameterwerte zuzuweisen. Weitere Informationen finden Sie unter Datenflussparameter.
Parametrisierte Compute-Eigenschaften.
Sie können die Anzahl von Kernen oder den Computetyp parametrisieren, wenn Sie die Azure Integration Runtime mit automatischer Auflösung verwenden, und Werte für „compute.corecount“ und „compute.computetype“ angeben.
Debuggen der Pipeline der Datenflussaktivität
Wenn Sie ein Debugging der Pipeline mit einer Datenflussaktivität ausführen möchten, müssen Sie den Datenfluss-Debugmodus über den Schieberegler Datenfluss debuggen auf der oberen Leiste aktivieren. Im Debugmodus können Sie den Datenfluss für einen aktiven Spark-Cluster ausführen. Weitere Informationen finden Sie unter Debugmodus.
Das Debuggen der Pipeline wird für den aktiven Debugcluster und nicht für die Integration Runtime-Umgebung ausgeführt, die in den Einstellungen für die Datenflussaktivität angegeben ist. Beim Starten des Debugmodus können Sie die Computeumgebung für das Debuggen auswählen.
Überwachen der Datenflussaktivität
Die Datenflussaktivität verfügt über eine besondere Überwachungsoberfläche, auf der Sie Informationen zu Partitionierung, Phasenzeit und Datenherkunft anzeigen können. Sie öffnen den Überwachungsbereich über das Brillensymbol unter Aktionen. Weitere Informationen finden Sie unter Überwachen von Datenflüssen.
Verwenden der Ergebnisse der Datenflussaktivität in einer nachfolgenden Aktivität
Die Datenflussaktivität gibt Metriken bezüglich der Anzahl der Zeilen aus, die in jeder Senke geschrieben werden und die in jeder Quelle gelesen werden. Diese Ergebnisse werden im Abschnitt output
des Ergebnisses der Aktivitätsausführung zurückgegeben. Die zurückgegebenen Metriken weisen das Format des folgenden JSON auf.
{
"runStatus": {
"metrics": {
"<your sink name1>": {
"rowsWritten": <number of rows written>,
"sinkProcessingTime": <sink processing time in ms>,
"sources": {
"<your source name1>": {
"rowsRead": <number of rows read>
},
"<your source name2>": {
"rowsRead": <number of rows read>
},
...
}
},
"<your sink name2>": {
...
},
...
}
}
}
Verwenden Sie beispielsweise @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten
, um die Anzahl der in eine Senke namens „sink1“ in einer Aktivität namens „dataflowActivity“ geschriebenen Zeile zu erhalten.
Verwenden Sie @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead
, um die Anzahl der von einer Quelle namens „source1“ gelesenen Zeile abzurufen, die in dieser Senke verwendet wurde.
Hinweis
Wenn für eine Senke keine Zeilen geschrieben wurden, wird sie in Metriken nicht angezeigt. Vorhandene Zeilen können durch die contains
-Funktion überprüft werden. Beispielsweise überprüft contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1')
, ob in „sink1“ Zeilen geschrieben wurden.
Zugehöriger Inhalt
Weitere Informationen zu unterstützten Ablaufsteuerungsaktivitäten: