Bagikan melalui


Menjalankan alur yang sudah ada dengan Workflow Orchestration Manager

BERLAKU UNTUK: Azure Data Factory Azure Synapse Analytics

Tip

Cobalah Data Factory di Microsoft Fabric, solusi analitik all-in-one untuk perusahaan. Microsoft Fabric mencakup semuanya mulai dari pergerakan data hingga ilmu data, analitik real time, kecerdasan bisnis, dan pelaporan. Pelajari cara memulai uji coba baru secara gratis!

Catatan

Workflow Orchestration Manager didukung oleh Apache Airflow.

Catatan

Workflow Orchestration Manager untuk Azure Data Factory bergantung pada aplikasi apache Airflow sumber terbuka. Dokumentasi dan tutorial lainnya untuk Airflow dapat ditemukan di halaman Dokumentasi atau Komunitas Apache Airflow.

Alur Data Factory menyediakan 100+ konektor sumber data yang menyediakan integrasi data/aliran data yang dapat diskalakan dan andal. Ada skenario di mana Anda ingin menjalankan alur pabrik data yang ada dari APACHE Airflow DAG Anda. Tutorial ini menunjukkan kepada Anda cara melakukannya.

Prasyarat

  • Langganan Azure. Jika tidak memiliki langganan Azure, buat akun Azure gratis sebelum Anda memulai.
  • Akun Microsoft Azure Storage. Jika Anda tidak memiliki akun penyimpanan Azure, lihat artikel Membuat akun penyimpanan Azure untuk langkah-langkah pembuatannya. Pastikan akun penyimpanan hanya mengizinkan akses dari jaringan yang dipilih.
  • Alur Azure Data Factory. Anda dapat mengikuti salah satu tutorial dan membuat alur pabrik data baru jika Anda belum memilikinya, atau membuatnya dengan satu pilih di Memulai dan mencoba alur pabrik data pertama Anda.
  • Menyiapkan Perwakilan Layanan. Anda harus membuat perwakilan layanan baru atau menggunakan yang sudah ada dan memberinya izin untuk menjalankan alur (contoh - peran kontributor di pabrik data tempat alur yang ada), bahkan jika lingkungan Workflow Orchestration Manager dan alur ada di pabrik data yang sama. Anda harus mendapatkan ID Klien perwakilan layanan dan Rahasia Klien (Kunci API).

Langkah-langkah

  1. Buat file Python baru adf.py dengan konten di bawah ini:

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    Anda harus membuat koneksi menggunakan Admin UI Manajer Orkestrasi Alur Kerja - Koneksi ->> '+' -> Pilih 'Jenis koneksi' sebagai 'Azure Data Factory', lalu isi client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name, dan pipeline_name Anda.

  2. Unggah file adf.py ke penyimpanan blob Anda dalam folder bernama DAGS.

  3. Impor folder DAGS ke lingkungan Workflow Orchestration Manager Anda. Jika Anda tidak memilikinya, buat yang baru

    Cuplikan layar memperlihatkan tab manajemen pabrik data dengan bagian Airflow dipilih.