Поделиться через


Активность Execute Pipeline в Azure Data Factory и Synapse Analytics

ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

Действие Execute Pipeline позволяет конвейеру Фабрики данных или Synapse вызвать другой конвейер.

Создание действия Execute Pipeline с помощью пользовательского интерфейса

Чтобы использовать действие Execute Pipeline в конвейере, выполните следующие действия.

  1. Найдите конвейер в области действий конвейера и перетащите действие Execute Pipeline на холст конвейера.

  2. Выберите новое действие Execute Pipeline на холсте, если оно еще не выбрано, и перейдите на вкладку Параметры, чтобы изменить сведения о нем.

    Отображает интерфейс для активности выполнения конвейера.

  3. Выберите существующий конвейер или создайте новый с помощью кнопки "Создать". Выберите другие параметры и настройте необходимые параметры для конвейера, чтобы завершить настройку.

Синтаксис

{
    "name": "MyPipeline",
    "properties": {
        "activities": [
            {
                "name": "ExecutePipelineActivity",
                "type": "ExecutePipeline",
                "typeProperties": {
                    "parameters": {                        
                        "mySourceDatasetFolderPath": {
                            "value": "@pipeline().parameters.mySourceDatasetFolderPath",
                            "type": "Expression"
                        }
                    },
                    "pipeline": {
                        "referenceName": "<InvokedPipelineName>",
                        "type": "PipelineReference"
                    },
                    "waitOnCompletion": true
                 }
            }
        ],
        "parameters": [
            {
                "mySourceDatasetFolderPath": {
                    "type": "String"
                }
            }
        ]
    }
}

Свойства типа

Свойство Описание Допустимые значения Обязательное поле
имя Имя действия исполнения конвейера. Строка Да
тип Должно иметь значение ExecutePipeline. Строка Да
конвейер Ссылка на зависимый конвейер, который вызывает этот конвейер. Объект ссылки конвейера имеет два свойства: referenceName и type. Свойство referenceName указывает имя эталонного конвейера. Для свойства type необходимо задать значение PipelineReference. PipelineReference Да
параметры Параметры для передачи в вызванный конвейер Объект JSON, сопоставляющий имена параметров со значениями аргументов Нет
waitOnCompletion Определяет, будет ли при выполнении действия ожидаться завершение выполнения зависимого конвейера. Значение по умолчанию — true. Логический Нет

Пример

В этом сценарии есть два конвейера:

  • Главный конвейер. Этот конвейер содержит одно действие выполнения конвейера, вызывающее вызванный конвейер. Главный конвейер принимает два параметра: masterSourceBlobContainer, masterSinkBlobContainer.
  • Вызванный конвейер - В этом конвейере выполняется одна операция копирования, которая копирует данные из источника Azure Blob в приемник Azure Blob. Вызванный конвейер принимает два параметра: sourceBlobContainer, sinkBlobContainer.

Определение главного конвейера

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ExecutePipeline",
        "typeProperties": {
          "pipeline": {
            "referenceName": "invokedPipeline",
            "type": "PipelineReference"
          },
          "parameters": {
            "sourceBlobContainer": {
              "value": "@pipeline().parameters.masterSourceBlobContainer",
              "type": "Expression"
            },
            "sinkBlobContainer": {
              "value": "@pipeline().parameters.masterSinkBlobContainer",
              "type": "Expression"
            }
          },
          "waitOnCompletion": true
        },
        "name": "MyExecutePipelineActivity"
      }
    ],
    "parameters": {
      "masterSourceBlobContainer": {
        "type": "String"
      },
      "masterSinkBlobContainer": {
        "type": "String"
      }
    }
  }
}

Определение вызываемого конвейера

{
  "name": "invokedPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "name": "CopyBlobtoBlob",
        "inputs": [
          {
            "referenceName": "SourceBlobDataset",
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sinkBlobDataset",
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceBlobContainer": {
        "type": "String"
      },
      "sinkBlobContainer": {
        "type": "String"
      }
    }
  }
}

Связанная служба

{
    "name": "BlobStorageLinkedService",
    "properties": {
    "type": "AzureStorage",
    "typeProperties": {
      "connectionString": "DefaultEndpointsProtocol=https;AccountName=*****;AccountKey=*****"
    }
  }
}

Исходный набор данных

{
    "name": "SourceBlobDataset",
    "properties": {
    "type": "AzureBlob",
    "typeProperties": {
      "folderPath": {
        "value": "@pipeline().parameters.sourceBlobContainer",
        "type": "Expression"
      },
      "fileName": "salesforce.txt"
    },
    "linkedServiceName": {
      "referenceName": "BlobStorageLinkedService",
      "type": "LinkedServiceReference"
    }
  }
}

Набор данных приемника

{
    "name": "sinkBlobDataset",
    "properties": {
    "type": "AzureBlob",
    "typeProperties": {
      "folderPath": {
        "value": "@pipeline().parameters.sinkBlobContainer",
        "type": "Expression"
      }
    },
    "linkedServiceName": {
      "referenceName": "BlobStorageLinkedService",
      "type": "LinkedServiceReference"
    }
  }
}

Запуск конвейера

Чтобы запустить главный конвейер в этом примере, для параметров masterSourceBlobContainer и masterSinkBlobContainer нужно передать следующие значения:

{
  "masterSourceBlobContainer": "executetest",
  "masterSinkBlobContainer": "executesink"
}

Главный конвейер переадресовывает эти значения в вызванный конвейер, как показано в следующем примере:

{
    "type": "ExecutePipeline",
    "typeProperties": {
      "pipeline": {
        "referenceName": "invokedPipeline",
        "type": "PipelineReference"
      },
      "parameters": {
        "sourceBlobContainer": {
          "value": "@pipeline().parameters.masterSourceBlobContainer",
          "type": "Expression"
        },
        "sinkBlobContainer": {
          "value": "@pipeline().parameters.masterSinkBlobContainer",
          "type": "Expression"
        }
      },

      ....
}

Предупреждение

Выполнение действия конвейера передает параметр массива в виде строки дочернему конвейеру. Это связано с тем, что полезные данные передаются из родительского конвейера дочернему >объекту в виде строки. Мы можем увидеть это, когда проверяем входные данные, переданные в дочерний конвейер. Дополнительные сведения см. в этом разделе .

Ознакомьтесь с другими поддерживаемыми действиями потока управления: