Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
DOTYCZY:
Azure Data Factory
Azure Synapse Analytics
Napiwek
Wypróbuj usługę Data Factory w usłudze Microsoft Fabric — rozwiązanie analityczne typu all-in-one dla przedsiębiorstw. Usługa Microsoft Fabric obejmuje wszystko, od przenoszenia danych do nauki o danych, analizy w czasie rzeczywistym, analizy biznesowej i raportowania. Dowiedz się, jak bezpłatnie rozpocząć nową wersję próbną !
Działanie ForEach określa powtarzający się przepływ sterowania w potoku usługi Azure Data Factory lub Synapse. To działanie służy do przeprowadzania iteracji po kolekcji i wykonuje określone działania w pętli. Implementacja pętli tego działania przypomina strukturę pętli Foreach w językach programowania.
Tworzenie działania ForEach za pomocą interfejsu użytkownika
Aby użyć aktywności ForEach w potokach, wykonaj następujące kroki:
Możesz użyć dowolnej zmiennej typu tablicy lub danych wyjściowych z innych działań jako danych wejściowych dla działania ForEach. Aby utworzyć zmienną tablicową, wybierz tło kanwy potoku, a następnie wybierz kartę Zmienne , aby dodać zmienną typu tablicy, jak pokazano poniżej.
Znajdź element ForEach w okienku Aktywności potoku i przeciągnij aktywność ForEach na kanwę potoku.
Wybierz nową aktywność ForEach w obszarze roboczym, jeśli nie została jeszcze wybrana, a następnie jego zakładkę Ustawienia, aby edytować szczegóły.
Wybierz pole Elementy, a następnie wybierz link Dodaj zawartość dynamiczną, aby otworzyć okienko edytora zawartości dynamicznej.
Wybierz tablicę wejściową, która ma być filtrowana w edytorze zawartości dynamicznej. W tym przykładzie wybieramy zmienną utworzoną w pierwszym kroku.
Wybierz edytor działań w działaniu ForEach, aby dodać co najmniej jedno działanie do wykonania dla każdego elementu w tablicy Elementy wejściowe.
W dowolnych działaniach, które tworzysz w ramach działania ForEach, możesz odwoływać się do bieżącego elementu, po którym działanie ForEach iteruje z listy Elementy. Możesz odwoływać się do bieżącego elementu w dowolnym miejscu, w którym można użyć wyrażenia dynamicznego, aby określić wartość właściwości. W edytorze zawartości dynamicznej wybierz iterator ForEach, aby zwrócić bieżący element.
Składnia
Właściwości są opisane w dalszej części tego artykułu. Właściwość items jest kolekcją, a każdy element w kolekcji jest określany przy użyciu @item(), jak pokazano w następującej składni:
{
"name":"MyForEachActivityName",
"type":"ForEach",
"typeProperties":{
"isSequential":"true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
"type": "Expression"
},
"activities":[
{
"name":"MyCopyActivity",
"type":"Copy",
"typeProperties":{
...
},
"inputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@item()"
}
}
]
}
]
}
}
Właściwości typu
| Nieruchomość | opis | Dozwolone wartości | Wymagane |
|---|---|---|---|
| nazwa | Nazwa działania dla każdego. | String | Tak |
| typ | Musi być ustawiona na ForEach | String | Tak |
| jestSekwencyjny | Określa, czy pętla powinna być wykonywana sekwencyjnie, czy równolegle. Maksymalnie 50 iteracji pętli można wykonywać jednocześnie. Na przykład, jeśli masz działanie ForEach iterujące nad działaniem kopiowania z 10 różnymi zestawami danych źródłowych i docelowych, przy ustawieniu isSequential na False, wszystkie kopie są wykonywane jednocześnie. Wartość domyślna to False. Jeśli parametr "isSequential" ma wartość False, upewnij się, że istnieje poprawna konfiguracja uruchamiania wielu plików wykonywalnych. W przeciwnym razie ta właściwość powinna być używana z ostrożnością, aby uniknąć konfliktów zapisu. Aby uzyskać więcej informacji, zobacz sekcję Równoległe wykonywanie . |
Wartość logiczna | Nie Wartość domyślna to False. |
| batchCount | Liczba partii używana do kontrolowania liczby równoległych wykonań (gdy parametr isSequential jest ustawiony na false). Jest to górny limit współbieżności, ale aktywność typu "for-each" nie zawsze będzie wykonywana przy tym limicie. | Liczba całkowita (maksymalnie 50) | Nr Wartość domyślna to 20. |
| Elementy | Wyrażenie, które zwraca tablicę JSON do przeiterowania. | Wyrażenie (które zwraca tablicę JSON) | Tak |
| Działania | Działania do wykonania. | Lista działań | Tak |
Wykonywanie równoległe
Jeśli parametr isSequential ma wartość false, działanie iteruje równolegle z maksymalnie 50 współbieżnymi iteracjami. To ustawienie powinno być używane z ostrożnością. Jeśli iteracje współbieżne zapisują w tym samym folderze, ale do różnych plików, takie podejście jest w porządku. Jeśli iteracje współbieżne zapisują jednocześnie do dokładnie tego samego pliku, to podejście najprawdopodobniej powoduje błąd.
Język wyrażeń iteracji
W działaniu ForEach podaj tablicę do iteracji dla właściwości items. Użyj @item() do iteracji w ramach pojedynczej enumeracji w działaniu ForEach. Jeśli na przykład elementy są tablicą: [1, 2, 3], @item() zwraca wartość 1 w pierwszej iteracji, 2 w drugiej iteracji i 3 w trzeciej iteracji. Możesz również użyć wyrażenia @range(0,10) podobnego do iterowania dziesięć razy, zaczynając od 0 i kończąc na 9.
Iterowanie pojedynczego działania
Scenariusz: Skopiuj z tego samego pliku źródłowego w usłudze Azure Blob do wielu plików docelowych w usłudze Azure Blob.
Definicja rurociągu
{
"name": "<MyForEachPipeline>",
"properties": {
"activities": [
{
"name": "<MyForEachActivity>",
"type": "ForEach",
"typeProperties": {
"isSequential": "true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPath",
"type": "Expression"
},
"activities": [
{
"name": "MyCopyActivity",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": "false"
},
"sink": {
"type": "BlobSink",
"copyBehavior": "PreserveHierarchy"
}
},
"inputs": [
{
"referenceName": "<MyDataset>",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs": [
{
"referenceName": "MyDataset",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@item()"
}
}
]
}
]
}
}
],
"parameters": {
"mySourceDatasetFolderPath": {
"type": "String"
},
"mySinkDatasetFolderPath": {
"type": "String"
}
}
}
}
Definicja zestawu danych obiektów blob
{
"name":"<MyDataset>",
"properties":{
"type":"AzureBlob",
"typeProperties":{
"folderPath":{
"value":"@dataset().MyFolderPath",
"type":"Expression"
}
},
"linkedServiceName":{
"referenceName":"StorageLinkedService",
"type":"LinkedServiceReference"
},
"parameters":{
"MyFolderPath":{
"type":"String"
}
}
}
}
Uruchamianie wartości parametrów
{
"mySourceDatasetFolderPath": "input/",
"mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}
Iterowanie wielu działań
Istnieje możliwość iterowania wielu działań (na przykład kopiowania i działań internetowych) w działaniu ForEach. W tym scenariuszu zalecamy oddzielenie wielu działań w osobnym przebiegu. Następnie możesz użyć działania ExecutePipeline w potoku z działaniem ForEach, aby wywołać oddzielny potok z wieloma działaniami. Podczas iteracji wielu działań może wystąpić opóźnienie w zakończeniu pętli z powodu procesów agregacji i oczyszczania wykonywanych przez potok danych.
Składnia
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "<MyForEachMultipleActivities>"
"typeProperties": {
"isSequential": true,
"items": {
...
},
"activities": [
{
"type": "ExecutePipeline",
"name": "<MyInnerPipeline>"
"typeProperties": {
"pipeline": {
"referenceName": "<copyHttpPipeline>",
"type": "PipelineReference"
},
"parameters": {
...
},
"waitOnCompletion": true
}
}
]
}
}
],
"parameters": {
...
}
}
}
Przykład
Scenariusz: iteracja nad elementem InnerPipeline w działaniu ForEach za pomocą działania Execute Pipeline (Wykonywanie potoku). Wewnętrzny potok kopiuje schematy z sparametryzowanymi definicjami.
Definicja potoku głównego
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "MyForEachActivity",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.inputtables",
"type": "Expression"
},
"activities": [
{
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "InnerCopyPipeline",
"type": "PipelineReference"
},
"parameters": {
"sourceTableName": {
"value": "@item().SourceTable",
"type": "Expression"
},
"sourceTableStructure": {
"value": "@item().SourceTableStructure",
"type": "Expression"
},
"sinkTableName": {
"value": "@item().DestTable",
"type": "Expression"
},
"sinkTableStructure": {
"value": "@item().DestTableStructure",
"type": "Expression"
}
},
"waitOnCompletion": true
},
"name": "ExecuteCopyPipeline"
}
]
}
}
],
"parameters": {
"inputtables": {
"type": "Array"
}
}
}
}
Definicja potoku wewnętrznego
{
"name": "InnerCopyPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource",
}
},
"sink": {
"type": "SqlSink"
}
},
"name": "CopyActivity",
"inputs": [
{
"referenceName": "sqlSourceDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sourceTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sourceTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "sqlSinkDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sinkTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sinkTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
]
}
],
"parameters": {
"sourceTableName": {
"type": "String"
},
"sourceTableStructure": {
"type": "String"
},
"sinkTableName": {
"type": "String"
},
"sinkTableStructure": {
"type": "String"
}
}
}
}
Definicja źródłowego zestawu danych
{
"name": "sqlSourceDataset",
"properties": {
"type": "SqlServerTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "sqlserverLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Definicja zestawu danych zbiornika
{
"name": "sqlSinkDataSet",
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "azureSqlLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Parametry potoku głównego
{
"inputtables": [
{
"SourceTable": "department",
"SourceTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
],
"DestTable": "department2",
"DestTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
]
}
]
}
Agregowanie danych wyjściowych
Aby zagregować dane wyjściowe działania foreach , skorzystaj ze zmiennych i działania Dołącz zmienną .
Najpierw zadeklaruj zmienną array w potoku. Następnie wywołaj działanie Dołącz zmienną wewnątrz każdej pętli foreach . Następnie można pobrać agregację z tablicy.
Ograniczenia i rozwiązania
Poniżej przedstawiono pewne ograniczenia działania ForEach i sugerowane obejścia.
| Ograniczenie | Rozwiązanie |
|---|---|
| Nie można zagnieżdżać pętli ForEach wewnątrz innej pętli ForEach (lub pętli Until). | Zaprojektuj dwuwymiarowy potok, w którym zewnętrzny potok z zewnętrzną pętlą ForEach iteruje nad wewnętrznym potokiem zagnieżdżonym. |
Działanie ForEach ma maksymalnie batchCount 50 dla przetwarzania równoległego i maksymalnie 100 000 elementów. |
Zaprojektuj dwu poziomowy potok, w którym potok zewnętrzny z działaniem ForEach iteruje przez potok wewnętrzny. |
| Funkcji SetVariable nie można używać wewnątrz działania ForEach, które jest uruchamiane równolegle, ponieważ zmienne są globalne dla całego potoku, nie są one ograniczone do działania ForEach ani innych działań. | Rozważ użycie sekwencyjnego ForEach lub użyj Execute Pipeline w ramach ForEach (zmienna/parametr obsługiwany w podrzędnym Pipeline). |
Powiązana zawartość
Zobacz inne obsługiwane działania przepływu sterowania: