Ausführen einer vorhandenen Pipeline mit Workflow Orchestration Manager
GILT FÜR: Azure Data Factory Azure Synapse Analytics
Tipp
Testen Sie Data Factory in Microsoft Fabric, eine All-in-One-Analyselösung für Unternehmen. Microsoft Fabric deckt alle Aufgaben ab, von der Datenverschiebung bis hin zu Data Science, Echtzeitanalysen, Business Intelligence und Berichterstellung. Erfahren Sie, wie Sie kostenlos eine neue Testversion starten!
Hinweis
Workflow Orchestration Manager wird von Apache Airflow unterstützt.
Hinweis
Workflow Orchestration Manager für Azure Data Factory basiert auf der Open-Source-Anwendung Apache Airflow. Die Dokumentation und weitere Tutorials zu Airflow finden Sie in der Dokumentation zu Apache Airflow oder auf den Communityseiten.
Data Factory-Pipelines bieten mehr als 100 Datenquellenconnectors, die skalierbare und zuverlässige Datenintegrationen/Datenflüsse bieten. Es gibt Szenarien, in denen Sie eine vorhandene Data Factory-Pipeline über Ihren Apache Airflow-DAG ausführen möchten. Dieses Tutorial veranschaulicht die Vorgehensweise.
Voraussetzungen
- Azure-Abonnement. Wenn Sie über kein Azure-Abonnement verfügen, können Sie ein kostenloses Azure-Konto erstellen, bevor Sie beginnen.
- Azure-Speicherkonto. Wenn Sie kein Speicherkonto besitzen, finden Sie unter Informationen zu Azure-Speicherkonten Schritte zum Erstellen eines solchen Kontos. Stellen Sie sicher, dass das Speicherkonto nur den Zugriff über ausgewählte Netzwerke zulässt.
- Azure Data Factory-Pipeline. Sie können einem der Tutorials folgen und eine neue Data Factory-Pipeline erstellen, falls Sie noch keine haben. Alternativ können Sie unter Erste Schritte mit Azure Data Factory mit nur einer Auswahl eine erstellen.
- Einrichten eines Dienstprinzipals. Sie müssen einen neuen Dienstprinzipal erstellen oder einen vorhandenen verwenden und ihm die Berechtigung zum Ausführen der Pipeline erteilen (z. B. die Rolle „Mitwirkender“ in der Data Factory mit den bereits vorhandenen Pipelines), auch wenn sich die Workflow Orchestration Manager-Umgebung und die Pipelines in derselben Data Factory befinden. Sie müssen die Client-ID und den geheimen Clientschlüssel (API-Schlüssel) des Dienstprinzipals abrufen.
Schritte
Erstellen Sie eine neue Python-Datei vom Typ adf.py mit dem folgenden Inhalt:
from datetime import datetime, timedelta from airflow.models import DAG, BaseOperator try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from airflow.utils.edgemodifier import Label with DAG( dag_id="example_adf_run_pipeline", start_date=datetime(2022, 5, 14), schedule_interval="@daily", catchup=False, default_args={ "retries": 1, "retry_delay": timedelta(minutes=3), "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI "factory_name": "<FactoryName>", # This can also be specified in the ADF connection. "resource_group_name": "<ResourceGroupName>", # This can also be specified in the ADF connection. }, default_view="graph", ) as dag: begin = EmptyOperator(task_id="begin") end = EmptyOperator(task_id="end") # [START howto_operator_adf_run_pipeline] run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="<PipelineName>", parameters={"myParam": "value"}, ) # [END howto_operator_adf_run_pipeline] # [START howto_operator_adf_run_pipeline_async] run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="<PipelineName>", wait_for_termination=False, ) pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=run_pipeline2.output["run_id"], ) # [END howto_operator_adf_run_pipeline_async] begin >> Label("No async wait") >> run_pipeline1 begin >> Label("Do async wait with sensor") >> run_pipeline2 [run_pipeline1, pipeline_run_sensor] >> end # Task dependency created via `XComArgs`: # run_pipeline2 >> pipeline_run_sensor
Sie müssen die Verbindung über die Benutzeroberfläche von Workflow Orchestration Manager: „Administrator“ > „Verbindungen“ > „+“ > Wählen Sie als „Verbindungstyp“ die Option „Azure Data Factory“ aus, und geben Sie dann Ihre Werte für client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name und pipeline_name ein.
Laden Sie die Datei adf.py in Ihren Blobspeicher in einen Ordner namens DAGS hoch.
Importieren Sie den Ordner DAGS in Ihre Workflow Orchestration Manager-Umgebung. Falls Sie noch keine besitzen, erstellen Sie eine neue