Действие ForEach в Фабрике данных Azure и Azure Synapse Analytics

Область применения:Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

Действие ForEach определяет повторяющийся поток управления в конвейере Фабрики данных Azure или Synapse. Это действие используется для выполнения итерации коллекции и выполняет указанные в цикле действия. Реализация цикла этого действия аналогична структуре цикла Foreach на языках программирования.

Создание действия ForEach с помощью пользовательского интерфейса

Чтобы использовать действие ForEach в конвейере, выполните следующие действия:

  1. В качестве входных данных для действия ForEach можно использовать любую переменную типа массива или выходные данные из других действий. Чтобы создать переменную массива, выберите фон холста конвейера, а затем перейдите на вкладку Переменные, чтобы добавить переменную типа массива, как показано ниже.

    Shows an empty pipeline canvas with an array type variable added to the pipeline.

  2. Выполните поиск элемента ForEach на панели конвейера "Действия" и перетащите действие ForEach на холст конвейера.

  3. Выберите новое действие ForEach на панели холста, если оно еще не выбрано, и перейдите на вкладку Параметры, чтобы изменить сведения о нем.

    Shows the UI for a Filter activity.

  4. Выберите поле Элементы, а затем щелкните ссылку Добавить динамическое содержимое, чтобы открыть панель редактора динамического содержимого.

    Shows the  Add dynamic content  link for the Items property.

  5. Выберите входной массив для фильтрации в редакторе динамического содержимого. В этом примере мы выбираем переменную, созданную в первом шаге.

    Shows the dynamic content editor with the variable created in the first step selected

  6. Выберите редактор действий в действии ForEach, чтобы добавить одно или несколько действий, выполняемых для каждого элемента в массиве входных элементов.

    Shows the Activities editor button on the ForEach activity in the pipeline editor window.

  7. В любых действиях, создаваемых в рамках действия ForEach, можно ссылаться на текущий элемент, который многократно используется действием ForEach, из списка Элементы. Вы можете ссылаться на текущий элемент в любом месте, где можно использовать динамическое выражение для указания значения свойства. В редакторе динамического содержимого выберите итератор ForEach для возврата текущего элемента.

    Shows the dynamic content editor with the ForEach iterator selected.

Синтаксис

Свойства описаны далее в этой статье. Свойство элементов является коллекцией, а каждый элемент в коллекции определяется с помощью @item(), как показано в следующем синтаксисе:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Свойства типа

Свойство Description Допустимые значения Обязательное поле
name Имя действия ForEach. Строка Да
type Должно быть задано значение ForEach. Строка Да
isSequential Определяет, как следует выполнять цикл: последовательно или параллельно. В параллельном режиме может выполняться одновременно не более 50 итераций цикла. Например, если действие ForEach выполняет итерацию действия копирования с 10 разными наборами данных источника и приемника и isSequential имеет значение false, все копии будут выполняться одновременно. Значение по умолчанию — False.

Если isSequential имеет значение false, убедитесь в наличии правильной конфигурации для запуска нескольких исполняемых файлов. В противном случае это свойство следует использовать с осторожностью, чтобы исключить конфликты записи. Дополнительные сведения см. в разделе Параллельное выполнение.
Логический № Значение по умолчанию — False.
batchCount Число пакетов, которое должно использоваться для управления количеством параллельного выполнения (в случае, если isSequential имеет значение false). Это максимальный предел параллелизма, однако для действий ForEach не всегда используется это число. Целое число (максимум 50) № Значение по умолчанию — 20.
Товаров Выражение, возвращающее массив JSON для итерации. Выражение (возвращающее массив JSON) Да
Процедуры Действия для выполнения. Список действий Да

Параллельное выполнение

Если isSequential имеет значение false, итерация действия выполняется параллельно (с максимум 50 параллельными итерациями). Этот параметр следует использовать с осторожностью. Этот метод подходит, если параллельные итерации выполняют запись в ту же папку, но в разные файлы. Если параллельные итерации выполняют запись в тот же файл, такой подход обычно вызывает ошибку.

Язык выражений итерации

В действии ForEach можно указать массив для итерации по свойству Элементы. Используйте @item() для итерации по одному перечислению в действии ForEach. Например, если items представляет собой массив: [1, 2, 3], @item() возвращает 1 в первой итерации, 2 во второй итерации и 3 в третьей итерации. Можно также использовать выражение наподобие @range(0,10), чтобы выполнять десять итераций, начиная с 0 и заканчивая 9.

Итерация отдельного действия

Сценарий: копирование из того же исходного файла в большом двоичном объекте Azure в несколько конечных файлов в большом двоичном объекте Azure.

Определение конвейера

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Определение набора данных большого двоичного объекта

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Значения параметров запуска

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Итерация нескольких действий

Возможно выполнять итерацию нескольких действий (например, копирование и веб-действие) в действии ForEach. В этом сценарии рекомендуется абстрагировать несколько действий в отдельном конвейере. Затем с помощью действия ExecutePipeline в конвейере с действием ForEach вызовите отдельный конвейер с несколькими действиями.

Синтаксис

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Пример

Сценарий: итерация по внутреннему конвейеру в действии ForEach с действием выполнения конвейера. Внутренний конвейер копируется с параметризованными определениями схемы.

Определение главного конвейера

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

Определение внутреннего конвейера

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Определение исходного набора данных

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Определение набора данных приемника

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Параметры главного конвейера

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Агрегирование данных вывода

Чтобы агрегировать выходные данные для каждого действия foreach, используйте действия Variables (переменные) и Append Variable (добавить переменную).

Сначала объявите в конвейере переменнуюarray. После этого вызывайте действие Добавления переменной внутри каждого цикла foreach. Впоследствии вы можете получить агрегат из вашего массива.

Ограничения и методы обхода

Ниже описаны некоторые ограничения для действия ForEach и рекомендуемые обходные решения.

Ограничение Обходное решение
Не вкладывайте цикл ForEach в другой цикл ForEach (или в цикл Until). Разработайте двухуровневый конвейер, где внешний конвейер с внешним циклом ForEach выполняет итерацию по внутреннему конвейеру с вложенным циклом.
Количество операций параллельной обработки batchCount для действия ForEach не может превышать 50, максимальное число элементов — 100 000. Разработайте двухуровневый конвейер, где внешний конвейер с действием ForEach выполняет итерацию по внутреннему конвейеру.
SetVariable нельзя использовать внутри действия ForEach, которое выполняется параллельно, так как переменные являются глобальными для всего конвейера и не входят в область ForEach и другие действия. Можно использовать ForEach последовательно или действие Execute Pipeline внутри ForEach (переменную или параметр, обрабатываемые в дочернем конвейере).

Ознакомьтесь с другими поддерживаемыми действиями потока управления: