Udostępnij przez


Działanie ForEach w usługach Azure Data Factory i Azure Synapse Analytics

DOTYCZY: Azure Data Factory Azure Synapse Analytics

Napiwek

Wypróbuj usługę Data Factory w usłudze Microsoft Fabric — rozwiązanie analityczne typu all-in-one dla przedsiębiorstw. Usługa Microsoft Fabric obejmuje wszystko, od przenoszenia danych do nauki o danych, analizy w czasie rzeczywistym, analizy biznesowej i raportowania. Dowiedz się, jak bezpłatnie rozpocząć nową wersję próbną !

Działanie ForEach określa powtarzający się przepływ sterowania w potoku usługi Azure Data Factory lub Synapse. To działanie służy do przeprowadzania iteracji po kolekcji i wykonuje określone działania w pętli. Implementacja pętli tego działania przypomina strukturę pętli Foreach w językach programowania.

Tworzenie działania ForEach za pomocą interfejsu użytkownika

Aby użyć aktywności ForEach w potokach, wykonaj następujące kroki:

  1. Możesz użyć dowolnej zmiennej typu tablicy lub danych wyjściowych z innych działań jako danych wejściowych dla działania ForEach. Aby utworzyć zmienną tablicową, wybierz tło kanwy potoku, a następnie wybierz kartę Zmienne , aby dodać zmienną typu tablicy, jak pokazano poniżej.

    Pokazuje pustą kanwę potoku ze zmienną typu tablicy dodaną do potoku.

  2. Znajdź element ForEach w okienku Aktywności potoku i przeciągnij aktywność ForEach na kanwę potoku.

  3. Wybierz nową aktywność ForEach w obszarze roboczym, jeśli nie została jeszcze wybrana, a następnie jego zakładkę Ustawienia, aby edytować szczegóły.

    Pokazuje interfejs użytkownika dla działania Filtr.

  4. Wybierz pole Elementy, a następnie wybierz link Dodaj zawartość dynamiczną, aby otworzyć okienko edytora zawartości dynamicznej.

    Pokazuje łącze „Dodaj zawartość dynamiczną” dla właściwości Items.

  5. Wybierz tablicę wejściową, która ma być filtrowana w edytorze zawartości dynamicznej. W tym przykładzie wybieramy zmienną utworzoną w pierwszym kroku.

    Pokazuje edytor zawartości dynamicznej ze zmienną utworzoną w pierwszym kroku wybranym

  6. Wybierz edytor działań w działaniu ForEach, aby dodać co najmniej jedno działanie do wykonania dla każdego elementu w tablicy Elementy wejściowe.

    Pokazuje przycisk Edytor aktywności w działaniu ForEach w oknie edytora potoku.

  7. W dowolnych działaniach, które tworzysz w ramach działania ForEach, możesz odwoływać się do bieżącego elementu, po którym działanie ForEach iteruje z listy Elementy. Możesz odwoływać się do bieżącego elementu w dowolnym miejscu, w którym można użyć wyrażenia dynamicznego, aby określić wartość właściwości. W edytorze zawartości dynamicznej wybierz iterator ForEach, aby zwrócić bieżący element.

    Wyświetla edytor zawartości dynamicznej z wybranym iteratorem ForEach.

Składnia

Właściwości są opisane w dalszej części tego artykułu. Właściwość items jest kolekcją, a każdy element w kolekcji jest określany przy użyciu @item(), jak pokazano w następującej składni:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Właściwości typu

Nieruchomość opis Dozwolone wartości Wymagane
nazwa Nazwa działania dla każdego. String Tak
typ Musi być ustawiona na ForEach String Tak
jestSekwencyjny Określa, czy pętla powinna być wykonywana sekwencyjnie, czy równolegle. Maksymalnie 50 iteracji pętli można wykonywać jednocześnie. Na przykład, jeśli masz działanie ForEach iterujące nad działaniem kopiowania z 10 różnymi zestawami danych źródłowych i docelowych, przy ustawieniu isSequential na False, wszystkie kopie są wykonywane jednocześnie. Wartość domyślna to False.

Jeśli parametr "isSequential" ma wartość False, upewnij się, że istnieje poprawna konfiguracja uruchamiania wielu plików wykonywalnych. W przeciwnym razie ta właściwość powinna być używana z ostrożnością, aby uniknąć konfliktów zapisu. Aby uzyskać więcej informacji, zobacz sekcję Równoległe wykonywanie .
Wartość logiczna Nie Wartość domyślna to False.
batchCount Liczba partii używana do kontrolowania liczby równoległych wykonań (gdy parametr isSequential jest ustawiony na false). Jest to górny limit współbieżności, ale aktywność typu "for-each" nie zawsze będzie wykonywana przy tym limicie. Liczba całkowita (maksymalnie 50) Nr Wartość domyślna to 20.
Elementy Wyrażenie, które zwraca tablicę JSON do przeiterowania. Wyrażenie (które zwraca tablicę JSON) Tak
Działania Działania do wykonania. Lista działań Tak

Wykonywanie równoległe

Jeśli parametr isSequential ma wartość false, działanie iteruje równolegle z maksymalnie 50 współbieżnymi iteracjami. To ustawienie powinno być używane z ostrożnością. Jeśli iteracje współbieżne zapisują w tym samym folderze, ale do różnych plików, takie podejście jest w porządku. Jeśli iteracje współbieżne zapisują jednocześnie do dokładnie tego samego pliku, to podejście najprawdopodobniej powoduje błąd.

Język wyrażeń iteracji

W działaniu ForEach podaj tablicę do iteracji dla właściwości items. Użyj @item() do iteracji w ramach pojedynczej enumeracji w działaniu ForEach. Jeśli na przykład elementy są tablicą: [1, 2, 3], @item() zwraca wartość 1 w pierwszej iteracji, 2 w drugiej iteracji i 3 w trzeciej iteracji. Możesz również użyć wyrażenia @range(0,10) podobnego do iterowania dziesięć razy, zaczynając od 0 i kończąc na 9.

Iterowanie pojedynczego działania

Scenariusz: Skopiuj z tego samego pliku źródłowego w usłudze Azure Blob do wielu plików docelowych w usłudze Azure Blob.

Definicja rurociągu

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Definicja zestawu danych obiektów blob

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Uruchamianie wartości parametrów

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Iterowanie wielu działań

Istnieje możliwość iterowania wielu działań (na przykład kopiowania i działań internetowych) w działaniu ForEach. W tym scenariuszu zalecamy oddzielenie wielu działań w osobnym przebiegu. Następnie możesz użyć działania ExecutePipeline w potoku z działaniem ForEach, aby wywołać oddzielny potok z wieloma działaniami. Podczas iteracji wielu działań może wystąpić opóźnienie w zakończeniu pętli z powodu procesów agregacji i oczyszczania wykonywanych przez potok danych.

Składnia

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Przykład

Scenariusz: iteracja nad elementem InnerPipeline w działaniu ForEach za pomocą działania Execute Pipeline (Wykonywanie potoku). Wewnętrzny potok kopiuje schematy z sparametryzowanymi definicjami.

Definicja potoku głównego

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

Definicja potoku wewnętrznego

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Definicja źródłowego zestawu danych

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Definicja zestawu danych zbiornika

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Parametry potoku głównego

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Agregowanie danych wyjściowych

Aby zagregować dane wyjściowe działania foreach , skorzystaj ze zmiennych i działania Dołącz zmienną .

Najpierw zadeklaruj zmienną array w potoku. Następnie wywołaj działanie Dołącz zmienną wewnątrz każdej pętli foreach . Następnie można pobrać agregację z tablicy.

Ograniczenia i rozwiązania

Poniżej przedstawiono pewne ograniczenia działania ForEach i sugerowane obejścia.

Ograniczenie Rozwiązanie
Nie można zagnieżdżać pętli ForEach wewnątrz innej pętli ForEach (lub pętli Until). Zaprojektuj dwuwymiarowy potok, w którym zewnętrzny potok z zewnętrzną pętlą ForEach iteruje nad wewnętrznym potokiem zagnieżdżonym.
Działanie ForEach ma maksymalnie batchCount 50 dla przetwarzania równoległego i maksymalnie 100 000 elementów. Zaprojektuj dwu poziomowy potok, w którym potok zewnętrzny z działaniem ForEach iteruje przez potok wewnętrzny.
Funkcji SetVariable nie można używać wewnątrz działania ForEach, które jest uruchamiane równolegle, ponieważ zmienne są globalne dla całego potoku, nie są one ograniczone do działania ForEach ani innych działań. Rozważ użycie sekwencyjnego ForEach lub użyj Execute Pipeline w ramach ForEach (zmienna/parametr obsługiwany w podrzędnym Pipeline).

Zobacz inne obsługiwane działania przepływu sterowania: