Aracılığıyla paylaş


Workflow Orchestration Manager ile mevcut bir işlem hattını çalıştırma

UYGULANANLAR: Azure Data Factory Azure Synapse Analytics

İpucu

Kuruluşlar için hepsi bir arada analiz çözümü olan Microsoft Fabric'te Data Factory'yi deneyin. Microsoft Fabric , veri taşımadan veri bilimine, gerçek zamanlı analize, iş zekasına ve raporlamaya kadar her şeyi kapsar. Yeni bir deneme sürümünü ücretsiz olarak başlatmayı öğrenin!

Önemli

1 Ocak 2026'da artık ADF İş Akışı Düzenleme Yöneticisi'ni kullanarak yeni Airflow örnekleri oluşturamayacaksınız. 31 Aralık 2025'den önce tüm İş Akışı Düzenleme Yöneticisi (Azure Data Factory'de Apache Airflow) iş yüklerini Microsoft Fabric'teki Apache Airflow işlerine geçirmenizi öneririz.

Microsoft Fabric'te Apache Airflow'a geçişiniz sırasında daha fazla bilgi veya destek için Microsoft Desteği'ne başvurun.

Data Factory işlem hatları, ölçeklenebilir ve güvenilir veri tümleştirmesi/veri akışları sağlayan 100'den fazla veri kaynağı bağlayıcısı sağlar. Apache Airflow DAG'nizden mevcut bir veri fabrikası işlem hattını çalıştırmak istediğiniz senaryolar vardır. Bu kılavuz size nasıl yapılacağını gösterir.

Önkoşullar

Adımlar

  1. Aşağıdaki içeriklerle adf.py yeni bir Python dosyası oluşturun:

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    Workflow Orchestration Manager KULLANıCı Arabirimi Yöneticisi - Bağlantılar ->> '+' -> 'Azure Data Factory' olarak 'Bağlantı türü' seçeneğini belirleyip client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name ve pipeline_name kullanarak bağlantı oluşturmanız gerekir.

  2. adf.py dosyasını DAGS adlı bir klasör içinde blob depolamanıza yükleyin.

  3. DAGS klasörünü workflow orchestration manager ortamınıza aktarın. Yoksa yeni bir tane oluşturun

    Airflow bölümünün seçili olduğu veri fabrikası yönetim sekmesini gösteren ekran görüntüsü.