Attività ForEach in Azure Data Factory e Azure Synapse Analytics
SI APPLICA A: Azure Data Factory Azure Synapse Analytics
Suggerimento
Provare Data Factory in Microsoft Fabric, una soluzione di analisi completa per le aziende. Microsoft Fabric copre tutti gli elementi, dallo spostamento dei dati all'analisi scientifica dei dati, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Scopri come avviare gratuitamente una nuova versione di valutazione .
L'attività ForEach definisce un flusso di controllo ripetuto in una pipeline di Azure Data Factory o Synapse. Questa attività viene usata per eseguire l'iterazione di una raccolta e attività specifiche in un ciclo. L'implementazione di cicli di questa attività è simile alla struttura di esecuzione in ciclo Foreach nei linguaggi di programmazione.
Creare un'attività ForEach con l'interfaccia utente
Per usare un'attività ForEach in una pipeline, seguire questa procedura:
È possibile usare qualsiasi variabile di tipo di matrice o output di altre attività come input per l'attività ForEach. Per creare una variabile di matrice, selezionare lo sfondo dell'area di disegno della pipeline e quindi selezionare la scheda Variabili per aggiungere una variabile di tipo matrice, come illustrato di seguito.
Cercare ForEach nel riquadro Attività pipeline e trascinare un'attività ForEach nell'area di disegno della pipeline.
Selezionare la nuova attività ForEach nell'area di disegno se non è già selezionata e la relativa scheda Impostazioni per modificarne i dettagli.
Selezionare il campo Elementi e quindi selezionare il collegamento Aggiungi contenuto dinamico per aprire il riquadro editor di contenuto dinamico.
Selezionare la matrice di input da filtrare nell'editor di contenuto dinamico. In questo esempio viene selezionata la variabile creata nel primo passaggio.
Selezionare l'editor attività nell'attività ForEach per aggiungere una o più attività da eseguire per ogni elemento nella matrice di elementi di input.
In tutte le attività create all'interno dell'attività ForEach, è possibile fare riferimento all'elemento corrente durante l'iterazione dell'attività ForEach dall'elenco Elementi . È possibile fare riferimento all'elemento corrente ovunque sia possibile usare un'espressione dinamica per specificare un valore della proprietà. Nell'editor di contenuto dinamico selezionare l'iteratore ForEach per restituire l'elemento corrente.
Sintassi
Le proprietà sono descritte più avanti in questo articolo. La proprietà items rappresenta la raccolta e ogni elemento della raccolta viene definito tramite @item()
, come illustrato nella sintassi seguente:
{
"name":"MyForEachActivityName",
"type":"ForEach",
"typeProperties":{
"isSequential":"true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
"type": "Expression"
},
"activities":[
{
"name":"MyCopyActivity",
"type":"Copy",
"typeProperties":{
...
},
"inputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@item()"
}
}
]
}
]
}
}
Proprietà del tipo
Proprietà | Descrizione | Valori consentiti | Obbligatoria |
---|---|---|---|
name | Nome dell'attività ForEach. | String | Sì |
type | Deve essere impostato su ForEach | String | Sì |
isSequential | Specifica se il ciclo deve essere eseguito in sequenza o in parallelo. È possibile eseguire al massimo 50 iterazioni del ciclo contemporaneamente in parallelo. Se si dispone ad esempio di un'iterazione di attività ForEach su un'attività di copia con 10 set di dati di origine e sink diversi con isSequential impostato su False, tutte le copie vengono eseguite simultaneamente. L'impostazione predefinita è False. Se "isSequential" è impostato su False, assicurarsi che sia presente una configurazione corretta per usare più eseguibili. In caso contrario, questa proprietà deve essere usata con attenzione per evitare di incorrere in conflitti di scrittura. Per altre informazioni, vedere la sezione Esecuzione parallela. |
Booleano | No. L'impostazione predefinita è False. |
batchCount | Numero di batch da usare per controllare il numero di esecuzione parallela (quando isSequential è impostato su Falso). Questo è il limite di concorrenza superiore, ma l'attività for-each non viene sempre eseguita in questo numero | Valore intero (massimo 50) | No. Il valore predefinito è 20. |
Articoli | Un'espressione che restituisce una matrice JSON su cui eseguire un'iterazione. | Espressione (che restituisce una matrice JSON) | Sì |
Attività | Le attività da eseguire. | Elenco di attività | Sì |
Esecuzione parallela
Se isSequential è impostato su false, l'attività esegue l'iterazione in parallelo con un massimo di 50 iterazioni simultanee. Questa impostazione deve essere usata con cautela. Se le iterazioni simultanee scrivono nella stessa cartella, ma in file diversi, non ci sono problemi. Se le iterazioni simultanee scrivono contemporaneamente in esattamente lo stesso file, questo approccio causa un errore.
Linguaggio delle espressioni di iterazione
Nell'attività ForEach specificare una matrice su cui eseguire l'iterazione per gli elementi della proprietà." Usare @item()
per scorrere una singola enumerazione nell'attività ForEach. Ad esempio, se items è una matrice: [1, 2, 3], @item()
restituisce 1 nella prima iterazione, 2 nella seconda iterazione e 3 nella terza iterazione. È anche possibile usare l'espressione @range(0,10)
like per scorrere dieci volte a partire da 0 terminando con 9.
Iterazione su una singola attività
Scenario: copia dello stesso file di origine del BLOB di Azure in più file di destinazione nel BLOB di Azure.
Definizione della pipeline
{
"name": "<MyForEachPipeline>",
"properties": {
"activities": [
{
"name": "<MyForEachActivity>",
"type": "ForEach",
"typeProperties": {
"isSequential": "true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPath",
"type": "Expression"
},
"activities": [
{
"name": "MyCopyActivity",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": "false"
},
"sink": {
"type": "BlobSink",
"copyBehavior": "PreserveHierarchy"
}
},
"inputs": [
{
"referenceName": "<MyDataset>",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs": [
{
"referenceName": "MyDataset",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@item()"
}
}
]
}
]
}
}
],
"parameters": {
"mySourceDatasetFolderPath": {
"type": "String"
},
"mySinkDatasetFolderPath": {
"type": "String"
}
}
}
}
Definizione del set di dati BLOB
{
"name":"<MyDataset>",
"properties":{
"type":"AzureBlob",
"typeProperties":{
"folderPath":{
"value":"@dataset().MyFolderPath",
"type":"Expression"
}
},
"linkedServiceName":{
"referenceName":"StorageLinkedService",
"type":"LinkedServiceReference"
},
"parameters":{
"MyFolderPath":{
"type":"String"
}
}
}
}
Valori del parametro di esecuzione
{
"mySourceDatasetFolderPath": "input/",
"mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}
Eseguire l'iterazione su più attività
È possibile eseguire l'iterazione su più attività (ad esempio attività di copia e web) in un'attività ForEach. In questo scenario, si consiglia di astrarre più attività in una pipeline distinta. È quindi possibile usare l'attività ExecutePipeline nella pipeline con l'attività ForEach per richiamare la pipeline distinta con più attività.
Sintassi
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "<MyForEachMultipleActivities>"
"typeProperties": {
"isSequential": true,
"items": {
...
},
"activities": [
{
"type": "ExecutePipeline",
"name": "<MyInnerPipeline>"
"typeProperties": {
"pipeline": {
"referenceName": "<copyHttpPipeline>",
"type": "PipelineReference"
},
"parameters": {
...
},
"waitOnCompletion": true
}
}
]
}
}
],
"parameters": {
...
}
}
}
Esempio
Scenario: iterazione su un oggetto InnerPipeline in un'attività ForEach con l'attività di ExecutePipeline. La pipeline interna viene copiata con le definizioni dello schema parametrizzate.
Definizione della pipeline master
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "MyForEachActivity",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.inputtables",
"type": "Expression"
},
"activities": [
{
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "InnerCopyPipeline",
"type": "PipelineReference"
},
"parameters": {
"sourceTableName": {
"value": "@item().SourceTable",
"type": "Expression"
},
"sourceTableStructure": {
"value": "@item().SourceTableStructure",
"type": "Expression"
},
"sinkTableName": {
"value": "@item().DestTable",
"type": "Expression"
},
"sinkTableStructure": {
"value": "@item().DestTableStructure",
"type": "Expression"
}
},
"waitOnCompletion": true
},
"name": "ExecuteCopyPipeline"
}
]
}
}
],
"parameters": {
"inputtables": {
"type": "Array"
}
}
}
}
Definizione della pipeline interna
{
"name": "InnerCopyPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource",
}
},
"sink": {
"type": "SqlSink"
}
},
"name": "CopyActivity",
"inputs": [
{
"referenceName": "sqlSourceDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sourceTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sourceTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "sqlSinkDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sinkTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sinkTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
]
}
],
"parameters": {
"sourceTableName": {
"type": "String"
},
"sourceTableStructure": {
"type": "String"
},
"sinkTableName": {
"type": "String"
},
"sinkTableStructure": {
"type": "String"
}
}
}
}
Definizione del set di dati di origine
{
"name": "sqlSourceDataset",
"properties": {
"type": "SqlServerTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "sqlserverLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Definizione del set di dati sink
{
"name": "sqlSinkDataSet",
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "azureSqlLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Parametri della pipeline master
{
"inputtables": [
{
"SourceTable": "department",
"SourceTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
],
"DestTable": "department2",
"DestTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
]
}
]
}
Aggregazione di output
Per aggregare gli output dell'attività foreach , usare variabili e attività Append Variable .
Innanzitutto, dichiarare una array
variabile nella pipeline. Quindi, richiamare l'attività Aggiungi variabile all'interno di ogni ciclo foreach. Successivamente, è possibile recuperare l'aggregazione dall'array.
Limitazioni e soluzioni alternative
Di seguito vengono descritte alcune limitazioni dell'attività ForEach con le soluzioni alternative suggerite.
Limitazione | Soluzione alternativa |
---|---|
Non è possibile annidare un ciclo ForEach all'interno di un altro ciclo ForEach (o un ciclo Until). | Progettare una pipeline a due livelli, in cui la pipeline esterna con il ciclo ForEach esterno esegue l'iterazione su una pipeline interna con il ciclo annidato. |
Per l'attività ForEach sono previsti un batchCount massimo di 50 per l'elaborazione parallela e un massimo di 100.000 elementi. |
Progettare una pipeline a due livelli in cui la pipeline esterna con l'attività ForEach esegue l'iterazione su una pipeline interna. |
SetVariable non può essere usato all'interno di un'attività ForEach eseguita in parallelo perché le variabili sono globali per l'intera pipeline, non hanno ambito per forEach o altre attività. | È consigliabile usare ForEach sequenziale o usare Execute Pipeline in ForEach (variabile/parametro gestito nella pipeline figlio). |
Contenuto correlato
Vedere altre attività del flusso di controllo supportate: