ForEach-Aktivität in Azure Data Factory und Azure Synapse Analytics
GILT FÜR: Azure Data Factory Azure Synapse Analytics
Tipp
Testen Sie Data Factory in Microsoft Fabric, eine All-in-One-Analyselösung für Unternehmen. Microsoft Fabric deckt alle Aufgaben ab, von der Datenverschiebung bis hin zu Data Science, Echtzeitanalysen, Business Intelligence und Berichterstellung. Erfahren Sie, wie Sie kostenlos eine neue Testversion starten!
Die ForEach-Aktivität definiert eine sich wiederholende Ablaufsteuerung in einer Azure Data Factory- oder Synapse-Pipeline. Diese Aktivität wird verwendet, um eine Sammlung zu durchlaufen. Sie führt die angegebenen Aktivitäten in einer Schleife aus. Die Schleifenimplementierung dieser Aktivität ähnelt der Foreach-Schleifenstruktur in Programmiersprachen.
Erstellen einer ForEach-Aktivität mit Benutzeroberfläche
Führen Sie die folgenden Schritte aus, um eine ForEach-Aktivität in einer Pipeline zu verwenden:
Als Eingabe für Ihre ForEach-Aktivität können Sie eine beliebige Arraytypvariable oder Ausgaben anderer Aktivitäten verwenden. Wählen Sie zum Erstellen einer Arrayvariablen den Hintergrund der Pipelinecanvas und anschließend die Registerkarte Variablen aus, um wie unten gezeigt eine Arraytypvariable hinzuzufügen.
Suchen Sie im Bereich mit den Pipelineaktivitäten nach ForEach, und ziehen Sie eine ForEach-Aktivität in den Pipelinebereich.
Wählen Sie auf der Canvas die neue ForEach-Aktivität aus, sofern sie noch nicht ausgewählt ist, und wählen Sie anschließend die Registerkarte Einstellungen aus, um die Details zu bearbeiten.
Wählen Sie das Feld Elemente und anschließend den Link Dynamischen Inhalt hinzufügen aus, um den Bereich mit dem Editor für dynamische Inhalte zu öffnen.
Wählen Sie Ihr Eingabearray aus, das im Editor für dynamische Inhalte gefiltert werden soll. In diesem Beispiel wird die im ersten Schritt erstellte Variable ausgewählt.
Wählen Sie den Aktivitäten-Editor für die ForEach-Aktivität aus, um mindestens eine Aktivität hinzuzufügen, die für jedes Element im Eingabearray Elemente ausgeführt werden soll.
In allen Aktivitäten, die Sie innerhalb der ForEach-Aktivität erstellen, können Sie auf das aktuelle Element, das von der ForEach-Aktivität durchlaufen wird, über die Liste Elemente verweisen. Auf das aktuelle Element kann überall dort verwiesen werden, wo ein dynamischer Ausdruck zur Angabe eines Eigenschaftswert verwendet werden kann. Wählen Sie im Editor für dynamische Inhalte den ForEach-Iterator aus, um das aktuelle Element zurückzukehren.
Syntax
Die Eigenschaften werden weiter unten in diesem Artikel beschrieben. Die Eigenschaft „items“ ist die Sammlung, und auf die einzelnen Elemente in der Sammlung wird mit @item()
verwiesen, wie im folgenden Syntaxbeispiel gezeigt:
{
"name":"MyForEachActivityName",
"type":"ForEach",
"typeProperties":{
"isSequential":"true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
"type": "Expression"
},
"activities":[
{
"name":"MyCopyActivity",
"type":"Copy",
"typeProperties":{
...
},
"inputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@item()"
}
}
]
}
]
}
}
Typeigenschaften
Eigenschaft | BESCHREIBUNG | Zulässige Werte | Erforderlich |
---|---|---|---|
name | Name der ForEach-Aktivität. | String | Ja |
type | Muss auf ForEach festgelegt sein. | String | Ja |
isSequential | Gibt an, ob die Schleife sequenziell oder parallel ausgeführt werden soll. Es können maximal 50 Schleifeniterationen parallel ausgeführt werden. Beispiel: Bei einer ForEach-Aktivität, die eine Kopieraktivität mit 10 unterschiedlichen Quell- und Senkendatasets durchläuft, während isSequential auf „false“ festgelegt ist, werden alle Kopien gleichzeitig ausgeführt. Der Standardwert lautet False. Wenn „isSequential“ auf „false“ festgelegt ist, stellen Sie sicher, dass die Konfiguration die Ausführung mehrerer ausführbarer Dateien ermöglicht. Andernfalls sollte diese Eigenschaft vorsichtig verwendet werden, um Schreibkonflikte zu vermeiden. Weitere Informationen finden Sie im Abschnitt Parallele Ausführung. |
Boolean | Nein. Der Standardwert lautet False. |
batchCount | Batchanzahl, die zum Steuern der Anzahl der parallelen Ausführungen verwendet werden soll (wenn „isSequential“ auf „false“ festgelegt ist). Dies ist das obere Parallelitätslimit, aber die ForEach-Aktivität wird nicht immer mit dieser Zahl ausgeführt. | Ganze Zahl (maximal 50) | Nein. Der Standardwert ist 20. |
Items | Ein Ausdruck, der ein JSON-Array zurückgibt, das durchlaufen werden soll. | Ausdruck (der ein JSON-Array zurückgibt) | Ja |
activities | Die Aktivitäten, die ausgeführt werden sollen. | Liste der Aktivitäten | Ja |
Parallele Ausführung
Wenn isSequential auf FALSE festgelegt ist, werden maximal 50 Iterationen der Aktivität gleichzeitig ausgeführt. Diese Einstellung sollte vorsichtig verwendet werden. Wenn die gleichzeitigen Iterationen in den gleichen Ordner, aber in andere Dateien schreiben, ist dieser Ansatz gut. Wenn die gleichzeitigen Iterationen gleichzeitig in genau dieselbe Datei schreiben, verursacht dieser Ansatz wahrscheinlich einen Fehler.
Sprache für Iterationsausdrücke
Geben Sie in der ForEach-Aktivität ein Array an, das für die Eigenschaft items durchlaufen werden soll. Mit @item()
kann eine einzelne Enumeration in der ForEach-Aktivität durchlaufen werden. Beispiel: Wenn items ein Array wie [1, 2, 3] ist, gibt @item()
in der ersten Iteration 1 zurück, in der zweiten Iteration 2 und in der dritten Iteration 3. Sie können @range(0,10)
den auch „Gefällt mir“ Ausdruck verwenden, um zehnmal zu iterieren, beginnend mit 0 bis 9.
Durchlaufen einer einzelnen Aktivität
Szenario: Kopieren aus einer Quelldatei im Azure-Blob in mehrere Zieldateien im Azure-Blob.
Definition der Pipeline
{
"name": "<MyForEachPipeline>",
"properties": {
"activities": [
{
"name": "<MyForEachActivity>",
"type": "ForEach",
"typeProperties": {
"isSequential": "true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPath",
"type": "Expression"
},
"activities": [
{
"name": "MyCopyActivity",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": "false"
},
"sink": {
"type": "BlobSink",
"copyBehavior": "PreserveHierarchy"
}
},
"inputs": [
{
"referenceName": "<MyDataset>",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs": [
{
"referenceName": "MyDataset",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@item()"
}
}
]
}
]
}
}
],
"parameters": {
"mySourceDatasetFolderPath": {
"type": "String"
},
"mySinkDatasetFolderPath": {
"type": "String"
}
}
}
}
Definition des Blobdatasets
{
"name":"<MyDataset>",
"properties":{
"type":"AzureBlob",
"typeProperties":{
"folderPath":{
"value":"@dataset().MyFolderPath",
"type":"Expression"
}
},
"linkedServiceName":{
"referenceName":"StorageLinkedService",
"type":"LinkedServiceReference"
},
"parameters":{
"MyFolderPath":{
"type":"String"
}
}
}
}
Verwendete Parameterwerte
{
"mySourceDatasetFolderPath": "input/",
"mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}
Durchlaufen mehrerer Aktivitäten
Es ist möglich, mehrere Aktivitäten in einer ForEach-Aktivität zu durchlaufen (zum Beispiel: Kopier- und Webaktivitäten). In diesem Szenario wird empfohlen, dass Sie mehrere Aktivitäten in eine separate Pipeline abstrahieren. Anschließend können Sie die ExecutePipeline-Aktivität in der Pipeline mit der ForEach-Aktivität verwenden, um die separate Pipeline mit mehreren Aktivitäten aufzurufen.
Syntax
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "<MyForEachMultipleActivities>"
"typeProperties": {
"isSequential": true,
"items": {
...
},
"activities": [
{
"type": "ExecutePipeline",
"name": "<MyInnerPipeline>"
"typeProperties": {
"pipeline": {
"referenceName": "<copyHttpPipeline>",
"type": "PipelineReference"
},
"parameters": {
...
},
"waitOnCompletion": true
}
}
]
}
}
],
"parameters": {
...
}
}
}
Beispiel
Szenario: Durchlaufen einer inneren Pipeline in einer ForEach-Aktivität mit der Aktivität „Pipeline ausführen“. Die innere Pipeline wird mit parametrisierten Schemadefinitionen kopiert.
Definition der Masterpipeline
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "MyForEachActivity",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.inputtables",
"type": "Expression"
},
"activities": [
{
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "InnerCopyPipeline",
"type": "PipelineReference"
},
"parameters": {
"sourceTableName": {
"value": "@item().SourceTable",
"type": "Expression"
},
"sourceTableStructure": {
"value": "@item().SourceTableStructure",
"type": "Expression"
},
"sinkTableName": {
"value": "@item().DestTable",
"type": "Expression"
},
"sinkTableStructure": {
"value": "@item().DestTableStructure",
"type": "Expression"
}
},
"waitOnCompletion": true
},
"name": "ExecuteCopyPipeline"
}
]
}
}
],
"parameters": {
"inputtables": {
"type": "Array"
}
}
}
}
Definition der inneren Pipeline
{
"name": "InnerCopyPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource",
}
},
"sink": {
"type": "SqlSink"
}
},
"name": "CopyActivity",
"inputs": [
{
"referenceName": "sqlSourceDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sourceTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sourceTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "sqlSinkDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sinkTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sinkTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
]
}
],
"parameters": {
"sourceTableName": {
"type": "String"
},
"sourceTableStructure": {
"type": "String"
},
"sinkTableName": {
"type": "String"
},
"sinkTableStructure": {
"type": "String"
}
}
}
}
Definition des Quelldatasets
{
"name": "sqlSourceDataset",
"properties": {
"type": "SqlServerTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "sqlserverLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Definition des Senkendatasets
{
"name": "sqlSinkDataSet",
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "azureSqlLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Parameter der Masterpipeline
{
"inputtables": [
{
"SourceTable": "department",
"SourceTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
],
"DestTable": "department2",
"DestTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
]
}
]
}
Aggregieren von Ausgaben
Um Ausgaben der foreach-Aktivität zu aggregieren, verwenden Sie Variables und die Aktivität Variable anfügen.
Deklarieren Sie zunächst eine array
-Variable in der Pipeline. Rufen Sie anschließend die Aktivität Variable anfügen innerhalb jeder foreach-Schleife auf. Anschließend können Sie die Aggregation aus Ihrem Array abrufen.
Einschränkungen und Problemumgehungen
Hier finden Sie einige Einschränkungen der ForEach-Aktivität sowie empfohlene Problemumgehungen.
Einschränkung | Problemumgehung |
---|---|
Eine ForEach-Schleife kann nicht in einer anderen ForEach-Schleife (oder in einer Until-Schleife) geschachtelt werden. | Entwerfen Sie eine Pipeline mit zwei Ebenen, bei der die äußere Pipeline mit der äußeren ForEach-Schleife eine innere Pipeline mit der geschachtelten Schleife durchläuft. |
Bei der ForEach-Aktivität gilt eine batchCount -Obergrenze von 50 für die parallele Verarbeitung sowie eine Obergrenze von 100.000 Elementen. |
Entwerfen Sie eine Pipeline mit zwei Ebenen, bei der die äußere Pipeline mit der ForEach-Aktivität eine innere Pipeline durchläuft. |
„SetVariable“ kann nicht innerhalb einer parallel ausgeführten ForEach-Aktivität verwendet werden, da die Variablen nicht für eine ForEach-Aktivität oder für eine andere Aktivität spezifisch, sondern für die gesamte Pipeline global sind. | Verwenden Sie ggf. ein sequenzielles ForEach-Element, oder verwenden Sie „Pipeline ausführen“ innerhalb von „ForEach“ (mit Behandlung der Variable/des Parameters in einer untergeordneten Pipeline). |
Zugehöriger Inhalt
Informationen zu weiteren unterstützten Ablaufsteuerungsaktivitäten: