Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Конвейер можно запустить в рамках рабочего процесса обработки данных с помощью заданий Lakeflow, Apache Airflow или Фабрики данных Azure.
Jobs
Вы можете оркестрировать несколько задач в задании Databricks для реализации рабочего процесса обработки данных. Чтобы включить конвейер в задание, используйте задачу Конвейера при создании задания. См. задачу конвейера для заданий.
Apache Airflow
Apache Airflow — это решение с открытым исходным кодом для управления рабочими процессами данных и планирования. Airflow отображает рабочие процессы в виде направленных ациклических графов операций (DAG). Вы определяете рабочий процесс в файле Python, и Airflow управляет его планированием и выполнением. Информация о том, как установить и использовать Airflow с Azure Databricks, см. в статье "Orchestrate Lakeflow Jobs with Apache Airflow".
Чтобы запустить конвейер в рамках рабочего процесса Airflow, используйте DatabricksSubmitRunOperator.
Требования
Для использования поддержки Airflow для декларативных конвейеров Lakeflow Spark требуется следующее:
- Airflow версии 2.1.0 или более поздней.
- Пакет поставщика Databricks версии 2.1.0 или более поздний.
Example
В следующем примере создается DAG Airflow, который активирует обновление для конвейера с идентификатором 8279d543-063c-4d63-9926-dae38e35ce8b.
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('ldp',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
Замените CONNECTION_ID идентификатором подключения Airflow к рабочей области.
Сохраните этот пример в каталоге airflow/dags и используйте пользовательский интерфейс Airflow для просмотра и активации DAG. Используйте пользовательский интерфейс конвейера для просмотра сведений об обновлении конвейера.
Фабрика данных Azure
Замечание
Декларативные конвейеры Spark Lakeflow и Фабрика данных Azure включают параметры настройки количества повторных попыток при возникновении сбоя. Если значения повторных попыток настроены в конвейере и в действии Фабрики данных Azure, вызывающего конвейер, число повторных попыток — это значение повторных попыток фабрики данных Azure, умноженное на значение повторных попыток конвейера.
Например, если обновление конвейера завершается ошибкой, Декларативные конвейеры Lakeflow Spark повторяют обновление до пяти раз по умолчанию. Если для фабрики данных Azure задано три повторные попытки, а в конвейере по умолчанию используется пять повторных попыток, то неудачный конвейер может быть выполнен повторно до пятнадцати раз. Чтобы избежать чрезмерной попытки повторных попыток при сбое обновлений конвейера, Databricks рекомендует ограничить количество повторных попыток при настройке конвейера или действия Фабрики данных Azure, вызывающего конвейер.
Чтобы изменить конфигурацию повторных попыток для конвейера, используйте pipelines.numUpdateRetryAttempts параметр при настройке конвейера.
Фабрика данных Azure — это облачная служба ETL, которая позволяет управлять рабочими процессами интеграции и преобразования данных. Фабрика данных Azure напрямую поддерживает выполнение задач Azure Databricks в рабочем процессе, включая записные книжки, jar-задачи и скрипты Python. Вы также можете включить конвейер в рабочий процесс, вызвав REST API конвейера из веб-активности в фабрике данных Azure. Например, чтобы инициировать обновление пайплайна из фабрики данных Azure:
Создайте фабрику данных или откройте существующую фабрику данных.
После создания откройте страницу фабрики данных и щелкните плитку Open Azure Data Factory Studio . Появится пользовательский интерфейс Фабрики данных Azure.
Создайте новый конвейер в Фабрике данных Azure, выбрав Новый из раскрывающегося меню в пользовательском интерфейсе Azure Data Factory Studio.
В панели инструментов "Действия" разверните категорию "Общие" и перетащите Web-действие на холст конвейера. Перейдите на вкладку "Параметры" и введите следующие значения:
Замечание
В качестве рекомендации по обеспечению безопасности при аутентификации с использованием автоматизированных инструментов, систем, скриптов и приложений Databricks рекомендует использовать токены личного доступа, принадлежащие служебным принципалам, а не пользователям рабочей области. Сведения о создании маркеров для субъектов-служб см. в разделе "Управление маркерами" для субъекта-службы.
URL-адрес:
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.Замените
<get-workspace-instance>.Замените
<pipeline-id>идентификатором конвейера.Метод. Выберите POST в раскрывающемся меню.
Заголовки: нажмите кнопку +Создать. В текстовом поле "Имя" введите
Authorization. В текстовом поле "Значение " введитеBearer <personal-access-token>.Замените
<personal-access-token>на личный маркер доступа Azure Databricks.Текст. Чтобы передать дополнительные параметры запроса, введите документ JSON, содержащий параметры. Например, чтобы запустить обновление и повторно обработать все данные для конвейера:
{"full_refresh": "true"}Если нет дополнительных параметров запроса, введите пустые фигурные скобки ({}).
Чтобы протестировать веб-активность, щелкните Отладка на панели инструментов конвейера в интерфейсе Data Factory. Выходные данные и состояние выполнения, включая ошибки, отображаются на вкладке "Выходные данные " конвейера Фабрики данных Azure. Используйте пользовательский интерфейс конвейеров для просмотра сведений об обновлении конвейера.
Подсказка
Обычное требование рабочего процесса — запустить задачу после завершения предыдущей задачи. Так как запрос конвейера updates является асинхронным — запрос возвращается после запуска обновления, но до завершения обновления — задачи в конвейере Фабрики данных Azure с зависимостью от обновления конвейера должны ждать завершения обновления. Вариантом ожидания завершения обновления является добавление действия "Until" после веб-действия, которое активирует обновление декларативных конвейеров Spark Lakeflow. В действии "Until":
- Добавьте действие ожидания, чтобы подождать заданное количество секунд для завершения обновления.
- Добавьте Web-активность после активности Ожидание, которая использует запрос на обновление сведений конвейера для получения состояния обновления. Поле
stateв ответе возвращает текущее состояние обновления, в том числе, если оно завершено. - Используйте значение поля
state, чтобы задать условие завершения для действия "Until". Можно также использовать действие Set Variable, чтобы добавить переменную пайплайна, основываясь на значенииstate, и затем использовать эту переменную для условия завершения.