Действие Потока данных в Фабрике данных Azure и Azure Synapse Analytics

ОБЛАСТЬ ПРИМЕНЕНИЯ:Фабрика данных Azure Azure Synapse Analytics

Используйте действие потока данных для преобразования и передачи данных посредством потоков данных для сопоставления. Если вы не знакомы с потоками данных, см. раздел Общие сведения о потоке данных для сопоставления.

Создание действия потока данных с помощью пользовательского интерфейса

Чтобы использовать действие потока данных в конвейере, выполните следующие шаги:

  1. Выполните поиск элемента Поток данных на панели конвейера «Действия» и перетащите действие Потока данных на холст конвейера.

  2. Выберите новое действие потока данных на панели холста, если оно еще не выбрано, и перейдите на вкладку Параметры, чтобы изменить сведения о нем.

    Изображение пользовательского интерфейса для действия потока данных.

  3. Ключ контрольной точки используется для установки контрольной точки, если поток данных используется для получения измененных данных. Его можно перезаписать. Действия потока данных используют значение GUID в качестве ключа контрольной точки вместо pipelinename+ activityname, чтобы иметь возможность постоянно отслеживать состояние получения измененных данных клиента, даже в случае выполнения каких-либо действий по переименованию. Все существующие действия потока данных будут использовать старый ключ шаблона для обеспечения обратной совместимости. Ниже приведен параметр ключа контрольной точки после публикации нового действия потока данных с включенной функцией получения измененных данных.

    Изображение пользовательского интерфейса для действия потока данных с ключом контрольной точки.

  4. Выберите существующий поток данных или создайте новый с помощью кнопки «Создать». Выберите другие параметры, необходимые для завершения настройки.

Синтаксис

{
    "name": "MyDataFlowActivity",
    "type": "ExecuteDataFlow",
    "typeProperties": {
      "dataflow": {
         "referenceName": "MyDataFlow",
         "type": "DataFlowReference"
      },
      "compute": {
         "coreCount": 8,
         "computeType": "General"
      },
      "traceLevel": "Fine",
      "runConcurrently": true,
      "continueOnError": true,      
      "staging": {
          "linkedService": {
              "referenceName": "MyStagingLinkedService",
              "type": "LinkedServiceReference"
          },
          "folderPath": "my-container/my-folder"
      },
      "integrationRuntime": {
          "referenceName": "MyDataFlowIntegrationRuntime",
          "type": "IntegrationRuntimeReference"
      }
}

Свойства типа

Свойство Описание Допустимые значения Обязательно
поток данных Ссылка на выполняемый поток данных DataFlowReference Да
integrationRuntime Вычислительная среда, в которой выполняется поток данных. Если не указано, будет использоваться среда выполнения интеграции Azure с автоматическим разрешением. IntegrationRuntimeReference Нет
compute.coreCount Количество ядер, используемых в кластере Spark. Можно указать, только если используется среда выполнения интеграции Azure с автоматическим разрешением 8, 16, 32, 48, 80, 144, 272 Нет
compute.computeType Тип вычисления, используемого в кластере Spark. Можно указать, только если используется среда выполнения интеграции Azure с автоматическим разрешением "General", "MemoryOptimized" Нет
staging.linkedService Если вы используете источник или приемник Azure Synapse Analytics, укажите учетную запись хранения, используемую для промежуточного процесса Polybase.

Если в хранилище Azure настроена конечная точка службы виртуальной сети, необходимо использовать проверку подлинности по управляемому удостоверению. См. раздел Влияние использования конечных точек службы виртуальной сети со службой хранилища Azure. Ознакомьтесь с необходимыми конфигурациями для BLOB-объектов Azure и Azure Data Lake Storage 2-го поколения соответственно.
LinkedServiceReference Только если поток данных считывает или записывает данные в Azure Synapse Analytics
staging.folderPath Если вы используете источник или приемник Azure Synapse Analytics, для промежуточного процесса Polybase используется путь к папке в учетной записи хранения BLOB-объектов Строка Только если поток данных считывает или записывает данные в Azure Synapse Analytics
traceLevel Установка уровня ведения журнала для выполнения действия потока данных Прекрасно, грубая, нет Нет

Выполнение действия потока данных

Вычисление динамического размера потока данных в среде выполнения

Свойства "Число ядер" и "Тип вычисления" можно динамически установить для регулировки размера входящих данных источника в среде выполнения. Используйте такие действия конвейера, как "Поиск" или "Получение метаданных", чтобы определить размер данных набора данных источника. Затем используйте команду "Добавить динамическое содержимое" в свойствах "Действие потока данных". Можно выбрать объем вычислительных ресурсов Small (Малый), Medium (Средний) или Large (Большой). Можно также выбрать Custom (Настраиваемый) и настроить типы вычислений и количество ядер вручную.

Динамический поток данных

Ниже приведен небольшой учебный видеоролик, объясняющий эту технологию

Среда выполнения интеграции потока данных

Выберите среду выполнения Integration Runtime (IR), которая будет использоваться для выполнения действия потока данных. По умолчанию служба будет использовать среду выполнения интеграции Azure с автоматическим разрешением с четырьмя рабочими ядрами. Эта среда IR имеет тип вычислений общего назначения и выполняется в том же регионе, что и экземпляр службы. Для введенных в эксплуатацию конвейеров настоятельно рекомендуется создавать собственные среды выполнения интеграции Azure, определяющие конкретные регионы, тип вычислений, количество ядер и срок жизни для выполнения действий потока данных.

Минимальный тип вычислений общего назначения с конфигурацией 8 + 8 (всего 16 виртуальных ядер) и 10-минутным сроком жизни — это рекомендуемый минимум для большинства производственных рабочих нагрузок. Устанавливая небольшой срок жизни, Azure IR может поддерживать "горячий" кластер, который не требует нескольких минут для запуска, как "холодный"кластер. Дополнительные сведения см. в статье Среда выполнения интеграции Azure.

Azure Integration Runtime

Важно!

Выбор среды Integration Runtime в действии потока данных применяется только к запущенным выполнениям конвейера. Отладка конвейера с потоками данных выполняется в кластере, указанном в сеансе отладки.

PolyBase

Если вы используете Azure Synapse Analytics в качестве приемника или источника, необходимо выбрать промежуточное расположение для пакетной загрузки Polybase. Polybase обеспечивает групповую пакетную загрузку вместо построчной загрузки данных. Polybase радикально сокращает время загрузки в Azure Synapse Analytics.

Ключ контрольной точки

При использовании функции получения изменений для источников потока данных ADF будет автоматически поддерживать и администрировать контрольные точки. Ключ контрольной точки по умолчанию — это хэш имени потока данных и имени конвейера. Если для исходных таблиц или папок используется динамический шаблон, возможно, потребуется переопределить этот хэш и задать для него собственное значение ключа контрольной точки.

Уровень ведения журнала

Если не требуется, чтобы для каждого выполнения конвейера действий потока данных в журнал записывались все данные телеметрии, можно задать уровень ведения журнала "Обычный" или "Нет". При выполнении потоков данных в режиме "Подробный" (по умолчанию) у службы запрашивается ведение журнала всех действий на уровне отдельных секций во время преобразования данных. Это может быть ресурсоемкой операцией, поэтому включать режим подробного ведения журнала следует только при устранении неполадок. Такой подход может повысить общую производительность потоков данных и конвейеров. В режиме "Обычный" в журнал записывается только длительность преобразований, а в режиме "Нет" — только сводка длительностей.

Уровень ведения журнала

Свойства приемника

Функция группирования в потоках данных позволяет задавать порядок выполнения приемников, а также группировать приемники, используя один и тот же номер группы. Чтобы упростить управление группами, можно настроить службу для параллельного запуска приемников в одной группе. Можно задать продолжение работы группы приемников даже после того, как один из приемников обнаружит ошибку.

По умолчанию приемники потоков данных выполняются последовательно, то есть по очереди, пока не происходит сбой потока данных при обнаружении ошибки в приемнике. Кроме того, все приемники по умолчанию входят в одну группу, если только вы не перешли к свойствам потока данных и не установили разные приоритеты для приемников.

Свойства приемника

Только первая строка

Этот параметр доступен только для потоков данных, в которых включены приемники кэша для вывода в действие. Выходные данные из потока данных, которые вводятся непосредственно в ваш конвейер, ограничены 2 МБ. Установка параметра "Только первая строка" позволяет ограничить выходные данные потока данных при внедрении выходных данных действия потока данных непосредственно в конвейер.

Параметризация потоков данных

Параметризованные наборы данных

Если в потоке данных используются параметризованные наборы данных, задайте значения параметров на вкладке Параметры.

Параметры выполнения потока данных

Параметризованные потоки данных

Если поток данных является параметризованным, задайте динамические значения параметров потока данных на вкладке Параметры. Для назначения динамических или литеральных значений параметров можно использовать язык выражений конвейера или язык выражений потока данных. Дополнительные сведения см. в статье Параметры потока данных.

Параметризованные свойства вычислений.

Вы можете параметризовать количество ядер или тип вычислений, если вы используете среду выполнения интеграции Azure с автоматическим разрешением и указали значения для параметров compute.coreCount и compute.computeType.

Пример параметров выполнения потока данных

Отладка конвейера действия потока данных

Для выполнения конвейера отладки, выполняемого с действием потока данных, необходимо переключиться в режим отладки потока данных с помощью ползунка Отладки потока данных в верхней панели. Режим отладки позволяет запускать поток данных в активном кластере Spark. Дополнительные сведения см. в статье Режим отладки.

Снимок экрана: расположение кнопки

Конвейер отладки выполняется для активного кластера отладки, а не для среды выполнения интеграции, указанной в параметрах действия потока данных. При запуске режима отладки можно выбрать среду вычислений для отладки.

Мониторинг действия потока данных

В действии потока данных предусмотрена специальная процедура мониторинга, позволяющая просматривать сведения о секционировании, времени этапа и происхождении данных. Откройте панель мониторинга с помощью значка очков в разделе Действия. Дополнительные сведения см. в статье Мониторинг потоков данных.

Использование действия потока данных приводит к последующему действию

Действие потока данных выводит метрики, относящиеся к количеству строк, записанных в каждый приемник, и строк, считываемых из каждого источника. Эти результаты возвращаются в раздел output "Результат выполнения действия". Возвращаемые метрики имеют формат ниже JSON.

{
    "runStatus": {
        "metrics": {
            "<your sink name1>": {
                "rowsWritten": <number of rows written>,
                "sinkProcessingTime": <sink processing time in ms>,
                "sources": {
                    "<your source name1>": {
                        "rowsRead": <number of rows read>
                    },
                    "<your source name2>": {
                        "rowsRead": <number of rows read>
                    },
                    ...
                }
            },
            "<your sink name2>": {
                ...
            },
            ...
        }
    }
}

Например, чтобы получить количество строк, записанных в приемник с именем "sink1" в действии с именем "dataflowActivity", используйте @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten.

Чтобы получить количество строк, прочитанных из источника с именем "source1", который использовался в этом приемнике, используйте @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead.

Примечание

Если в приемнике записано ноль строк, он не будет отображаться в метриках. Существование можно проверить с помощью функции contains. Например, contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1') проверит, были ли строки записаны в sink1.

Дальнейшие действия

Ознакомьтесь с поддерживаемыми действиями потока управления: