Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
VAN TOEPASSING OP:
Azure Data Factory
Azure Synapse Analytics
Tip
Probeer Data Factory uit in Microsoft Fabric, een alles-in-één analyseoplossing voor ondernemingen. Microsoft Fabric omvat alles, van gegevensverplaatsing tot gegevenswetenschap, realtime analyses, business intelligence en rapportage. Meer informatie over het gratis starten van een nieuwe proefversie .
Belangrijk
Op 1 januari 2026 kunt u geen nieuwe Airflow-exemplaren meer maken met werkstroomindelingsbeheer van ADF. We raden u aan alle workloads voor Workflow Orchestration Manager (Apache Airflow in Azure Data Factory) vóór 31 december 2025 te migreren naar Apache Airflow-taken in Microsoft Fabric.
Voor meer informatie of ondersteuning tijdens uw migratie naar Apache Airflow in Microsoft Fabric, neem contact op met Microsoft Support.
Data Factory-pijplijnen bieden meer dan 100 connectors voor gegevensbronnen die schaalbare en betrouwbare gegevensintegratie/gegevensstromen bieden. Er zijn scenario's waarin u een bestaande data factory-pijplijn wilt uitvoeren vanuit uw Apache Airflow DAG. In deze zelfstudie leert u hoe u dit doet.
Vereisten
- Azure-abonnement. Als u nog geen abonnement op Azure hebt, maakt u een gratis Azure-account voordat u begint.
- Azure-opslagaccount. Als u geen opslagaccount hebt, raadpleegt u het artikel Een opslagaccount maken om een account te maken. Zorg ervoor dat het opslagaccount alleen toegang toestaat vanuit geselecteerde netwerken.
- Azure Data Factory-pijplijn. U kunt een van de zelfstudies volgen en een nieuwe data factory-pijplijn maken voor het geval u er nog geen hebt, of een pijplijn maken met één selectie in Aan de slag en uw eerste data factory-pijplijn uitproberen.
- Een service-principal instellen. U moet een nieuwe service-principal maken of een bestaande service-principal gebruiken en deze toestemming geven om de pijplijn uit te voeren (bijvoorbeeld de rol van inzender in de data factory waar de bestaande pijplijnen bestaan), zelfs als de werkstroomindelingsbeheeromgeving en de pijplijnen in dezelfde data factory bestaan. U moet de client-id en het clientgeheim (API-sleutel) van de service-principal ophalen.
Stappen
Maak een nieuw Python-bestand adf.py met de onderstaande inhoud:
from datetime import datetime, timedelta from airflow.models import DAG, BaseOperator try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from airflow.utils.edgemodifier import Label with DAG( dag_id="example_adf_run_pipeline", start_date=datetime(2022, 5, 14), schedule_interval="@daily", catchup=False, default_args={ "retries": 1, "retry_delay": timedelta(minutes=3), "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI "factory_name": "<FactoryName>", # This can also be specified in the ADF connection. "resource_group_name": "<ResourceGroupName>", # This can also be specified in the ADF connection. }, default_view="graph", ) as dag: begin = EmptyOperator(task_id="begin") end = EmptyOperator(task_id="end") # [START howto_operator_adf_run_pipeline] run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="<PipelineName>", parameters={"myParam": "value"}, ) # [END howto_operator_adf_run_pipeline] # [START howto_operator_adf_run_pipeline_async] run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="<PipelineName>", wait_for_termination=False, ) pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=run_pipeline2.output["run_id"], ) # [END howto_operator_adf_run_pipeline_async] begin >> Label("No async wait") >> run_pipeline1 begin >> Label("Do async wait with sensor") >> run_pipeline2 [run_pipeline1, pipeline_run_sensor] >> end # Task dependency created via `XComArgs`: # run_pipeline2 >> pipeline_run_sensorU moet de verbinding maken met de ui-beheerder van werkstroomindelingsbeheer - Verbindingen ->> '+' -> Kies 'Verbindingstype' als 'Azure Data Factory' en vul vervolgens uw client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name en pipeline_name in.
Upload het adf.py bestand naar uw blobopslag in een map met de naam DAGS.
Importeer de DAGS-map in uw werkstroomindelingsbeheeromgeving. Als u er nog geen hebt, maakt u een nieuwe