Een bestaande pijplijn uitvoeren met Workflow Orchestration Manager

VAN TOEPASSING OP: Azure Data Factory Azure Synapse Analytics

Tip

Probeer Data Factory uit in Microsoft Fabric, een alles-in-één analyseoplossing voor ondernemingen. Microsoft Fabric omvat alles, van gegevensverplaatsing tot gegevenswetenschap, realtime analyses, business intelligence en rapportage. Meer informatie over het gratis starten van een nieuwe proefversie .

Notitie

Workflow Orchestration Manager wordt mogelijk gemaakt door Apache Airflow.

Notitie

Werkstroomindelingsbeheer voor Azure Data Factory is afhankelijk van de open source Apache Airflow-toepassing. Documentatie en meer zelfstudies voor Airflow vindt u op de Apache Airflow-documentatie of communitypagina's.

Data Factory-pijplijnen bieden meer dan 100 connectors voor gegevensbronnen die schaalbare en betrouwbare gegevensintegratie/gegevensstromen bieden. Er zijn scenario's waarin u een bestaande data factory-pijplijn wilt uitvoeren vanuit uw Apache Airflow DAG. In deze zelfstudie leert u hoe u dat doet.

Vereisten

  • Azure-abonnement. Als u nog geen abonnement op Azure hebt, maakt u een gratis Azure-account voordat u begint.
  • Azure-opslagaccount. Als u geen opslagaccount hebt, raadpleegt u het artikel Een opslagaccount maken om een account te maken. Zorg ervoor dat het opslagaccount alleen toegang toestaat vanuit geselecteerde netwerken.
  • Azure Data Factory-pijplijn. U kunt een van de zelfstudies volgen en een nieuwe data factory-pijplijn maken voor het geval u er nog geen hebt, of een pijplijn maken met één selectie in Aan de slag en uw eerste data factory-pijplijn uitproberen.
  • Een service-principal instellen. U moet een nieuwe service-principal maken of een bestaande service-principal gebruiken en deze toestemming geven om de pijplijn uit te voeren (bijvoorbeeld de rol van inzender in de data factory waar de bestaande pijplijnen bestaan), zelfs als de werkstroomindelingsbeheeromgeving en de pijplijnen in dezelfde data factory bestaan. U moet de client-id en het clientgeheim (API-sleutel) van de service-principal ophalen.

Stappen

  1. Maak een nieuw Python-bestand adf.py met de onderstaande inhoud:

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    U moet de verbinding maken met de gebruikersinterface van Workflow Orchestration Manager Beheer -> Verbinding maken ions -> '+' -> Kies 'Verbinding maken iontype' als 'Azure Data Factory' en vul vervolgens uw client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name in en pipeline_name.

  2. Upload het adf.py bestand naar uw blobopslag in een map met de naam DAGS.

  3. Importeer de DAGS-map in uw werkstroomindelingsbeheeromgeving. Als u er nog geen hebt, maakt u een nieuwe

    Schermopname van het tabblad Data Factory-beheer met de sectie Airflow geselecteerd.