Udostępnij za pośrednictwem


Uruchamianie istniejącego potoku za pomocą programu Workflow Orchestration Manager

DOTYCZY: Azure Data Factory Azure Synapse Analytics

Napiwek

Wypróbuj usługę Data Factory w usłudze Microsoft Fabric — rozwiązanie analityczne typu all-in-one dla przedsiębiorstw. Usługa Microsoft Fabric obejmuje wszystko, od przenoszenia danych do nauki o danych, analizy w czasie rzeczywistym, analizy biznesowej i raportowania. Dowiedz się, jak bezpłatnie rozpocząć nową wersję próbną !

Uwaga

Menedżer orkiestracji przepływu pracy jest obsługiwany przez platformę Apache Airflow.

Uwaga

Program Workflow Orchestration Manager dla usługi Azure Data Factory korzysta z aplikacji typu open source Apache Airflow. Dokumentację i więcej samouczków dotyczących rozwiązania Airflow można znaleźć na stronach dokumentacji lub społeczności platformy Apache Airflow.

Potoki usługi Data Factory zapewniają 100+ łączniki źródeł danych, które zapewniają skalowalną i niezawodną integrację danych/przepływy danych. Istnieją scenariusze, w których chcesz uruchomić istniejący potok fabryki danych z grupy DAG platformy Apache Airflow. W tym samouczku pokazano, jak to zrobić.

Wymagania wstępne

  • Subskrypcja platformy Azure. Jeśli nie masz subskrypcji platformy Azure, przed rozpoczęciem utwórz bezpłatne konto platformy Azure.
  • Konto usługi Azure Storage. Jeśli nie masz konta magazynu, utwórz je, wykonując czynności przedstawione w artykule Tworzenie konta magazynu platformy Azure. Upewnij się, że konto magazynu zezwala na dostęp tylko z wybranych sieci.
  • Potok usługi Azure Data Factory. Możesz skorzystać z dowolnego samouczka i utworzyć nowy potok fabryki danych, jeśli jeszcze go nie masz, lub utworzyć jeden z jednym wybranym w obszarze Rozpocznij i wypróbować swój pierwszy potok fabryki danych.
  • Konfigurowanie jednostki usługi. Musisz utworzyć nową jednostkę usługi lub użyć istniejącej jednostki usługi i udzielić jej uprawnienia do uruchamiania potoku (na przykład — rola współautora w fabryce danych, w której istnieją istniejące potoki), nawet jeśli środowisko Menedżera orkiestracji przepływu pracy i potoki istnieją w tej samej fabryce danych. Musisz pobrać identyfikator klienta jednostki usługi i klucz tajny klienta (klucz interfejsu API).

Kroki

  1. Utwórz nowy plik w języku Python adf.py z poniższą zawartością:

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    Musisz utworzyć połączenie przy użyciu administratora interfejsu użytkownika programu Workflow Orchestration Manager — połączenia —> "+" —> wybierz pozycję "Typ połączenia" jako "Azure Data Factory", a następnie wypełnij client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name i pipeline_name.>

  2. Przekaż plik adf.py do magazynu obiektów blob w folderze o nazwie DAGS.

  3. Zaimportuj folder DAGS do środowiska menedżera orkiestracji przepływu pracy. Jeśli go nie masz, utwórz nowy

    Zrzut ekranu przedstawiający kartę zarządzanie fabryką danych z wybraną sekcją Airflow.