Attività ForEach in Azure Data Factory e Azure Synapse Analytics

SI APPLICA A: Azure Data Factory Azure Synapse Analytics

Suggerimento

Provare Data Factory in Microsoft Fabric, una soluzione di analisi completa per le aziende. Microsoft Fabric copre tutti gli elementi, dallo spostamento dei dati all'analisi scientifica dei dati, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Scopri come avviare gratuitamente una nuova versione di valutazione .

L'attività ForEach definisce un flusso di controllo ripetuto in una pipeline di Azure Data Factory o Synapse. Questa attività viene usata per eseguire l'iterazione di una raccolta e attività specifiche in un ciclo. L'implementazione di cicli di questa attività è simile alla struttura di esecuzione in ciclo Foreach nei linguaggi di programmazione.

Creare un'attività ForEach con l'interfaccia utente

Per usare un'attività ForEach in una pipeline, seguire questa procedura:

  1. È possibile usare qualsiasi variabile di tipo di matrice o output di altre attività come input per l'attività ForEach. Per creare una variabile di matrice, selezionare lo sfondo dell'area di disegno della pipeline e quindi selezionare la scheda Variabili per aggiungere una variabile di tipo matrice, come illustrato di seguito.

    Shows an empty pipeline canvas with an array type variable added to the pipeline.

  2. Cercare ForEach nel riquadro Attività pipeline e trascinare un'attività ForEach nell'area di disegno della pipeline.

  3. Selezionare la nuova attività ForEach nell'area di disegno se non è già selezionata e la relativa scheda Impostazioni per modificarne i dettagli.

    Shows the UI for a Filter activity.

  4. Selezionare il campo Elementi e quindi selezionare il collegamento Aggiungi contenuto dinamico per aprire il riquadro editor di contenuto dinamico.

    Shows the  Add dynamic content  link for the Items property.

  5. Selezionare la matrice di input da filtrare nell'editor di contenuto dinamico. In questo esempio viene selezionata la variabile creata nel primo passaggio.

    Shows the dynamic content editor with the variable created in the first step selected

  6. Selezionare l'editor attività nell'attività ForEach per aggiungere una o più attività da eseguire per ogni elemento nella matrice di elementi di input.

    Shows the Activities editor button on the ForEach activity in the pipeline editor window.

  7. In tutte le attività create all'interno dell'attività ForEach, è possibile fare riferimento all'elemento corrente durante l'iterazione dell'attività ForEach dall'elenco Elementi . È possibile fare riferimento all'elemento corrente ovunque sia possibile usare un'espressione dinamica per specificare un valore della proprietà. Nell'editor di contenuto dinamico selezionare l'iteratore ForEach per restituire l'elemento corrente.

    Shows the dynamic content editor with the ForEach iterator selected.

Sintassi

Le proprietà sono descritte più avanti in questo articolo. La proprietà items rappresenta la raccolta e ogni elemento della raccolta viene definito tramite @item(), come illustrato nella sintassi seguente:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Proprietà del tipo

Proprietà Descrizione Valori consentiti Obbligatoria
name Nome dell'attività ForEach. String
type Deve essere impostato su ForEach String
isSequential Specifica se il ciclo deve essere eseguito in sequenza o in parallelo. È possibile eseguire al massimo 50 iterazioni del ciclo contemporaneamente in parallelo. Se si dispone ad esempio di un'iterazione di attività ForEach su un'attività di copia con 10 set di dati di origine e sink diversi con isSequential impostato su False, tutte le copie vengono eseguite simultaneamente. L'impostazione predefinita è False.

Se "isSequential" è impostato su False, assicurarsi che sia presente una configurazione corretta per usare più eseguibili. In caso contrario, questa proprietà deve essere usata con attenzione per evitare di incorrere in conflitti di scrittura. Per altre informazioni, vedere la sezione Esecuzione parallela.
Booleano No. L'impostazione predefinita è False.
batchCount Numero di batch da usare per controllare il numero di esecuzione parallela (quando isSequential è impostato su Falso). Questo è il limite di concorrenza superiore, ma l'attività for-each non viene sempre eseguita in questo numero Valore intero (massimo 50) No. Il valore predefinito è 20.
Articoli Un'espressione che restituisce una matrice JSON su cui eseguire un'iterazione. Espressione (che restituisce una matrice JSON)
Attività Le attività da eseguire. Elenco di attività

Esecuzione parallela

Se isSequential è impostato su false, l'attività esegue l'iterazione in parallelo con un massimo di 50 iterazioni simultanee. Questa impostazione deve essere usata con cautela. Se le iterazioni simultanee scrivono nella stessa cartella, ma in file diversi, non ci sono problemi. Se le iterazioni simultanee scrivono contemporaneamente in esattamente lo stesso file, questo approccio causa un errore.

Linguaggio delle espressioni di iterazione

Nell'attività ForEach specificare una matrice su cui eseguire l'iterazione per gli elementi della proprietà." Usare @item() per scorrere una singola enumerazione nell'attività ForEach. Ad esempio, se items è una matrice: [1, 2, 3], @item() restituisce 1 nella prima iterazione, 2 nella seconda iterazione e 3 nella terza iterazione. È anche possibile usare l'espressione @range(0,10) like per scorrere dieci volte a partire da 0 terminando con 9.

Iterazione su una singola attività

Scenario: copia dello stesso file di origine del BLOB di Azure in più file di destinazione nel BLOB di Azure.

Definizione della pipeline

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Definizione del set di dati BLOB

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Valori del parametro di esecuzione

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Eseguire l'iterazione su più attività

È possibile eseguire l'iterazione su più attività (ad esempio attività di copia e web) in un'attività ForEach. In questo scenario, si consiglia di astrarre più attività in una pipeline distinta. È quindi possibile usare l'attività ExecutePipeline nella pipeline con l'attività ForEach per richiamare la pipeline distinta con più attività.

Sintassi

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Esempio

Scenario: iterazione su un oggetto InnerPipeline in un'attività ForEach con l'attività di ExecutePipeline. La pipeline interna viene copiata con le definizioni dello schema parametrizzate.

Definizione della pipeline master

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

Definizione della pipeline interna

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Definizione del set di dati di origine

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Definizione del set di dati sink

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Parametri della pipeline master

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Aggregazione di output

Per aggregare gli output dell'attività foreach , usare variabili e attività Append Variable .

Innanzitutto, dichiarare una arrayvariabile nella pipeline. Quindi, richiamare l'attività Aggiungi variabile all'interno di ogni ciclo foreach. Successivamente, è possibile recuperare l'aggregazione dall'array.

Limitazioni e soluzioni alternative

Di seguito vengono descritte alcune limitazioni dell'attività ForEach con le soluzioni alternative suggerite.

Limitazione Soluzione alternativa
Non è possibile annidare un ciclo ForEach all'interno di un altro ciclo ForEach (o un ciclo Until). Progettare una pipeline a due livelli, in cui la pipeline esterna con il ciclo ForEach esterno esegue l'iterazione su una pipeline interna con il ciclo annidato.
Per l'attività ForEach sono previsti un batchCount massimo di 50 per l'elaborazione parallela e un massimo di 100.000 elementi. Progettare una pipeline a due livelli in cui la pipeline esterna con l'attività ForEach esegue l'iterazione su una pipeline interna.
SetVariable non può essere usato all'interno di un'attività ForEach eseguita in parallelo perché le variabili sono globali per l'intera pipeline, non hanno ambito per forEach o altre attività. È consigliabile usare ForEach sequenziale o usare Execute Pipeline in ForEach (variabile/parametro gestito nella pipeline figlio).

Vedere altre attività del flusso di controllo supportate: