Compartir a través de


Ejecutar canalizaciones declarativas de Lakeflow en un flujo de trabajo

Puede ejecutar canalizaciones declarativas de Lakeflow como parte de un flujo de trabajo de procesamiento de datos en Trabajos de Lakeflow, Apache Airflow o Azure Data Factory.

Trabajos

Puede organizar varias tareas en un trabajo de Databricks para implementar un flujo de trabajo de procesamiento de datos. Para incluir una canalización en un trabajo, use la tarea Canalización al crear un trabajo. Consulte Tarea de canalización para trabajos.

Airflow de Apache

Apache Airflow es una solución de código abierto para administrar y programar flujos de trabajo de datos. Airflow representa los flujos de trabajo como gráficos acíclicos dirigidos (DAG) de operaciones. El flujo de trabajo se define en un archivo de Python y Airflow administra la programación y la ejecución. Para obtener información sobre cómo instalar y usar Airflow con Azure Databricks, consulte Orquestación de trabajos de Lakeflow con Apache Airflow.

Para ejecutar una canalización como parte de un flujo de trabajo de Airflow, use DatabricksSubmitRunOperator.

Requisitos

Lo siguiente se requiere para usar la compatibilidad de Airflow con las canalizaciones declarativas de Lakeflow:

Ejemplo

En el ejemplo siguiente se crea un DAG de flujo de aire que desencadena una actualización para la canalización con el identificador 8279d543-063c-4d63-9926-dae38e35ce8b:

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

Reemplace CONNECTION_ID por el identificador de una conexión de Airflow al área de trabajo.

Guarde este ejemplo en el directorio airflow/dags y utilice la interfaz de usuario de Airflow para ver y desencadenar el DAG. Use la interfaz de usuario de Canalizaciones declarativas de Lakeflow para ver los detalles de la actualización de la canalización.

Azure Data Factory

Nota:

Las canalizaciones declarativas de Lakeflow y Azure Data Factory incluyen ambas opciones para configurar el número de reintentos cuando se produce un error. Si los valores de reintento están configurados tanto en la canalización como en la actividad de Azure Data Factory que llama a la canalización, el número de reintentos se calcula multiplicando el valor de reintento de Azure Data Factory por el valor de reintento de las canalizaciones declarativas de Lakeflow.

Por ejemplo, si se produce un error en una actualización de canalización, Lakeflow Declarative Pipelines reintenta la actualización hasta cinco veces de forma predeterminada. Si el reintento de Azure Data Factory se establece en tres y la canalización usa el valor predeterminado de cinco reintentos, es posible que la canalización con errores se vuelva a intentar hasta quince veces. Para evitar intentos excesivos de reintento cuando se producen errores en las actualizaciones de canalización, Databricks recomienda limitar el número de reintentos al configurar la canalización o la actividad de Azure Data Factory que llama a la canalización.

Para cambiar la configuración de reintentos de la canalización, utilice la opción pipelines.numUpdateRetryAttempts al ajustarla.

Azure Data Factory es un servicio ETL basado en la nube que le permite organizar flujos de trabajo de integración y transformación de datos. Azure Data Factory admite directamente la ejecución de tareas de Azure Databricks en un flujo de trabajo, incluidos cuadernos, tareas JAR y scripts de Python. También puede incluir una canalización en un flujo de trabajo llamando a la API de canalizaciones declarativas de Lakeflow desde una actividad web de Azure Data Factory. Por ejemplo, para desencadenar una actualización de canalización desde Azure Data Factory:

  1. Cree una factoría de datos o abra una existente.

  2. Cuando se complete la creación, abra la página de la factoría de datos y haga clic en el icono Abrir Azure Data Factory Studio. Aparecerá la interfaz de usuario de Azure Data Factory.

  3. Para crear una canalización de Azure Data Factory, seleccione Canalización en el menú desplegable Nuevo de la interfaz de usuario de Azure Data Factory Studio.

  4. En el cuadro de herramientas Actividades, expanda General y arrastre la actividad Web al lienzo de la canalización. Haga clic en la pestaña Configuración y escriba los valores siguientes:

    Nota:

    Como procedimiento recomendado de seguridad, al autenticarse con herramientas automatizadas, sistemas, scripts y aplicaciones, Databricks recomienda usar tokens de acceso personales que pertenecen a entidades de servicio en lugar de usuarios del área de trabajo. Para crear tokens para entidades de servicio, consulte Administrar tokens para una entidad de servicio.

    • Dirección URL: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

      Reemplace <get-workspace-instance>.

      Reemplace <pipeline-id> por el identificador de la canalización.

    • Método: seleccione POST en el menú desplegable.

    • Encabezados: haga clic en + Nuevo. En el cuadro de texto Nombre, escribe Authorization. En el cuadro de texto Valor, escriba Bearer <personal-access-token>.

      Reemplace <personal-access-token> por un token de acceso personal de Azure Databricks.

    • Cuerpo: para pasar parámetros adicionales de solicitud, especifique un documento JSON que contenga los parámetros. Por ejemplo, para iniciar una actualización y volver a procesar todos los datos de la canalización: {"full_refresh": "true"}. Si no hay otros parámetros de solicitud, escriba llaves vacías ({}).

Para probar la actividad web, haga clic en Depurar en la barra de herramientas de la canalización en la interfaz de usuario de Data Factory. La salida y el estado de la ejecución (incluidos los errores) se muestran en la pestaña Salida de la canalización de Azure Data Factory. Use la interfaz de usuario de Canalizaciones declarativas de Lakeflow para ver los detalles de la actualización de la canalización.

Sugerencia

Un requisito de flujo de trabajo común es iniciar una tarea después de completar una tarea anterior. Dado que la solicitud de las canalizaciones declarativas de Lakeflow es asincrónica—la solicitud se devuelve después de iniciar la actualización, pero antes de que esta se complete—las tareas de su canalización de Azure Data Factory que dependen de la actualización de las canalizaciones declarativas de Lakeflow deben esperar a que la actualización se complete. Una opción para esperar la finalización de la actualización es agregar una actividad Until después de la actividad web que desencadena la actualización de canalizaciones declarativas de Lakeflow. En la actividad Until:

  1. Agregue una actividad Wait para esperar un número configurado de segundos a que se complete la actualización.
  2. Agregue una actividad web después de la actividad Wait que usa la solicitud de detalles de actualización de canalizaciones declarativas de Lakeflow para obtener el estado de la actualización. El campo state de la respuesta devuelve el estado actual de la actualización, incluido si se completó.
  3. Utilice el valor del campo state para establecer la condición de finalización de la actividad Until. También puede utilizar una actividad Establecer variable para agregar una variable de canalización en función del valor state y utilizar esta variable para la condición de finalización.