分享方式:


使用工作流程協調流程管理員執行現有的管線

適用於:Azure Data Factory Azure Synapse Analytics

提示

試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告等所有項目。 了解如何免費啟動新的試用版

注意

工作流程協調流程管理員由 Apache Airflow 提供。

注意

適用於 Azure Data Factory 的工作流程協調流程管理員依賴 開放原始碼 Apache Airflow 應用程式。 您可以在 Apache Airflow 檔或社群頁面上找到 Airflow 的檔和更多教學課程。

Data Factory 管線提供 100 個以上的數據源連接器,可提供可調整且可靠的數據整合/數據流。 在某些情況下,您想要從 Apache Airflow DAG 執行現有的數據處理站管線。 本教學課程說明如何只執行此動作。

必要條件

步驟

  1. 使用下列內容建立新的 Python 檔案 adf.py

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    您必須使用工作流程協調流程管理員 UI 管理員 - 連線 ions ->> '+' -> 選擇 '連線 ion 類型' 作為 'Azure Data Factory',然後填入您的client_idclient_secret、tenant_id、subscription_id、resource_group_namedata_factory_namepipeline_name。

  2. adf.py 檔案上傳至名為 DAGS 的資料夾內的 Blob 記憶體。

  3. DAGS 資料夾匯入工作流程協調流程管理員環境。 如果您沒有帳戶, 請建立新的帳戶

    顯示 [數據處理站管理] 索引卷標的螢幕快照,其中已選取 [Airflow] 區段。