Ejecutar una canalización existente con el Administrador de orquestación de flujo de trabajo
SE APLICA A: Azure Data Factory Azure Synapse Analytics
Sugerencia
Pruebe Data Factory en Microsoft Fabric, una solución de análisis todo en uno para empresas. Microsoft Fabric abarca todo, desde el movimiento de datos hasta la ciencia de datos, el análisis en tiempo real, la inteligencia empresarial y los informes. ¡Obtenga más información sobre cómo iniciar una nueva evaluación gratuita!
Nota:
El Administrador de orquestación de flujo de trabajo cuenta con la tecnología de Apache Airflow.
Nota:
El Administrador de orquestación de flujo de trabajo para Azure Data Factory se basa en la aplicación Apache Airflow de código abierto. Puede encontrar documentación y más tutoriales para Airflow en las páginas de documentación o la comunidad de Apache Airflow.
Las canalizaciones de Data Factory proporcionan más de 100 conectores de origen de datos que ofrecen flujos de datos o integración de datos escalables y confiables. Habrá escenarios en los que quiera ejecutar una canalización de factoría de datos existente desde el DAG de Apache Airflow. En este tutorial se muestra cómo hacer justo eso.
Requisitos previos
- Suscripción de Azure. Si no tiene una suscripción a Azure, cree una cuenta gratuita de Azure antes de empezar.
- Cuenta de Azure Storage. Si no tiene una cuenta de almacenamiento, consulte Crear una cuenta de almacenamiento para crear una. Asegúrese de que la cuenta de almacenamiento solo permita el acceso desde las redes seleccionadas.
- Canalización de Azure Data Factory Puedes seguir cualquiera de los tutoriales y crear una nueva canalización de factoría de datos en caso de que aún no tengas una, o crear una con una selección en Comenzar y probar tu primera canalización de factoría de datos.
- Configuración de una entidad de servicio Deberá crear una nueva entidad de servicio o usar una existente y concederle permiso para ejecutar la canalización (ejemplo: rol colaborador en la factoría de datos donde existen las canalizaciones existentes), incluso si el entorno de Administrador de orquestación de flujo de trabajo y las canalizaciones existen en la misma factoría de datos. Deberás obtener el identificador de cliente y el secreto de cliente (clave de API) de la entidad de servicio.
Pasos
Cree un nuevo archivo de Python adf.py con el siguiente contenido:
from datetime import datetime, timedelta from airflow.models import DAG, BaseOperator try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from airflow.utils.edgemodifier import Label with DAG( dag_id="example_adf_run_pipeline", start_date=datetime(2022, 5, 14), schedule_interval="@daily", catchup=False, default_args={ "retries": 1, "retry_delay": timedelta(minutes=3), "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI "factory_name": "<FactoryName>", # This can also be specified in the ADF connection. "resource_group_name": "<ResourceGroupName>", # This can also be specified in the ADF connection. }, default_view="graph", ) as dag: begin = EmptyOperator(task_id="begin") end = EmptyOperator(task_id="end") # [START howto_operator_adf_run_pipeline] run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="<PipelineName>", parameters={"myParam": "value"}, ) # [END howto_operator_adf_run_pipeline] # [START howto_operator_adf_run_pipeline_async] run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="<PipelineName>", wait_for_termination=False, ) pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=run_pipeline2.output["run_id"], ) # [END howto_operator_adf_run_pipeline_async] begin >> Label("No async wait") >> run_pipeline1 begin >> Label("Do async wait with sensor") >> run_pipeline2 [run_pipeline1, pipeline_run_sensor] >> end # Task dependency created via `XComArgs`: # run_pipeline2 >> pipeline_run_sensor
Deberá crear la conexión mediante la interfaz de usuario del Administrador de orquestación de flujo de trabajo: Administración ->Conexiones -> '+' -> Elija 'tipo de conexión' como 'Azure Data Factory', después, rellene su client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name y pipeline_name.
Cargue el archivo adf.py en el almacenamiento de blobs dentro de una carpeta denominada DAGS.
Importe la carpeta DAGS en el entorno del Administrador de orquestación de flujo de trabajo. Si no tienes uno, crea uno nuevo