使用工作流程協調流程管理員執行現有的管線
適用於:Azure Data Factory Azure Synapse Analytics
提示
試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告等所有項目。 了解如何免費啟動新的試用版!
注意
工作流程協調流程管理員由 Apache Airflow 提供。
注意
適用於 Azure Data Factory 的工作流程協調流程管理員依賴 開放原始碼 Apache Airflow 應用程式。 您可以在 Apache Airflow 檔或社群頁面上找到 Airflow 的檔和更多教學課程。
Data Factory 管線提供 100 個以上的數據源連接器,可提供可調整且可靠的數據整合/數據流。 在某些情況下,您想要從 Apache Airflow DAG 執行現有的數據處理站管線。 本教學課程說明如何只執行此動作。
必要條件
- Azure 訂用帳戶。 如果您沒有 Azure 訂用帳戶,請在開始前建立免費 Azure 帳戶。
- Azure 記憶體帳戶。 如果您沒有記憶體帳戶,請參閱 建立 Azure 記憶體帳戶以取得建立記憶體帳戶 的步驟。 確定記憶體帳戶只允許從選取的網路存取。
- Azure Data Factory 管線。 您可以遵循任何教學課程並建立新的數據處理站管線,以防您還沒有該教學課程,或在開始使用中 選取一個,並試用您的第一個數據處理站管線。
- 設定服務主體。 您必須建立新的服務主體或使用現有的服務主體,並授與它執行管線的許可權(例如 - 現有管線存在之數據處理站中的參與者角色),即使工作流程協調流程管理員環境和管線存在於相同的數據處理站也一樣。 您必須取得服務主體的用戶端識別碼和客戶端密碼(API 金鑰)。
步驟
使用下列內容建立新的 Python 檔案 adf.py :
from datetime import datetime, timedelta from airflow.models import DAG, BaseOperator try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from airflow.utils.edgemodifier import Label with DAG( dag_id="example_adf_run_pipeline", start_date=datetime(2022, 5, 14), schedule_interval="@daily", catchup=False, default_args={ "retries": 1, "retry_delay": timedelta(minutes=3), "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI "factory_name": "<FactoryName>", # This can also be specified in the ADF connection. "resource_group_name": "<ResourceGroupName>", # This can also be specified in the ADF connection. }, default_view="graph", ) as dag: begin = EmptyOperator(task_id="begin") end = EmptyOperator(task_id="end") # [START howto_operator_adf_run_pipeline] run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="<PipelineName>", parameters={"myParam": "value"}, ) # [END howto_operator_adf_run_pipeline] # [START howto_operator_adf_run_pipeline_async] run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="<PipelineName>", wait_for_termination=False, ) pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=run_pipeline2.output["run_id"], ) # [END howto_operator_adf_run_pipeline_async] begin >> Label("No async wait") >> run_pipeline1 begin >> Label("Do async wait with sensor") >> run_pipeline2 [run_pipeline1, pipeline_run_sensor] >> end # Task dependency created via `XComArgs`: # run_pipeline2 >> pipeline_run_sensor
您必須使用工作流程協調流程管理員 UI 管理員 - 連線 ions ->> '+' -> 選擇 '連線 ion 類型' 作為 'Azure Data Factory',然後填入您的client_id、client_secret、tenant_id、subscription_id、resource_group_name、data_factory_name和pipeline_name。
將 adf.py 檔案上傳至名為 DAGS 的資料夾內的 Blob 記憶體。
將 DAGS 資料夾匯入工作流程協調流程管理員環境。 如果您沒有帳戶, 請建立新的帳戶