Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
BERLAKU UNTUK:
Azure Data Factory
Azure Synapse Analytics
Tip
Cobalah Data Factory di Microsoft Fabric, solusi analitik all-in-one untuk perusahaan. Microsoft Fabric mencakup semuanya mulai dari pergerakan data hingga ilmu data, analitik real time, kecerdasan bisnis, dan pelaporan. Pelajari cara memulai uji coba baru secara gratis!
Penting
Pada 1 Januari 2026 Anda tidak akan lagi dapat membuat instans Airflow baru menggunakan Workflow Orchestration Manager ADF. Kami menyarankan agar Anda memigrasikan semua beban kerja Workflow Orchestration Manager (Apache Airflow di Azure Data Factory) ke pekerjaan Apache Airflow di Microsoft Fabric sebelum 31 Desember 2025.
Untuk informasi selengkapnya atau untuk dukungan selama migrasi Anda ke Apache Airflow di Microsoft Fabric, hubungi Dukungan Microsoft.
Alur Data Factory menyediakan 100+ konektor sumber data yang menyediakan integrasi data/aliran data yang dapat diskalakan dan andal. Ada skenario di mana Anda ingin menjalankan alur pabrik data yang ada dari APACHE Airflow DAG Anda. Tutorial ini menunjukkan caranya.
Prasyarat
- Langganan Azure. Jika tidak memiliki langganan Azure, buat akun Azure gratis sebelum Anda memulai.
- Akun Microsoft Azure Storage. Jika Anda tidak memiliki akun penyimpanan Azure, lihat artikel Membuat akun penyimpanan Azure untuk langkah-langkah pembuatannya. Pastikan akun penyimpanan hanya mengizinkan akses dari jaringan yang dipilih.
- Alur Azure Data Factory. Anda dapat mengikuti salah satu tutorial dan membuat alur pabrik data baru jika Anda belum memilikinya, atau membuatnya dengan satu pilih di Memulai dan mencoba alur pabrik data pertama Anda.
- Menyiapkan Perwakilan Layanan. Anda harus membuat perwakilan layanan baru atau menggunakan yang sudah ada dan memberinya izin untuk menjalankan alur (contoh - peran kontributor di pabrik data tempat alur yang ada), bahkan jika lingkungan Workflow Orchestration Manager dan alur ada di pabrik data yang sama. Anda harus mendapatkan ID Klien perwakilan layanan dan Rahasia Klien (Kunci API).
Langkah-langkah
Buat file Python baru adf.py dengan konten di bawah ini:
from datetime import datetime, timedelta from airflow.models import DAG, BaseOperator try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from airflow.utils.edgemodifier import Label with DAG( dag_id="example_adf_run_pipeline", start_date=datetime(2022, 5, 14), schedule_interval="@daily", catchup=False, default_args={ "retries": 1, "retry_delay": timedelta(minutes=3), "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI "factory_name": "<FactoryName>", # This can also be specified in the ADF connection. "resource_group_name": "<ResourceGroupName>", # This can also be specified in the ADF connection. }, default_view="graph", ) as dag: begin = EmptyOperator(task_id="begin") end = EmptyOperator(task_id="end") # [START howto_operator_adf_run_pipeline] run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="<PipelineName>", parameters={"myParam": "value"}, ) # [END howto_operator_adf_run_pipeline] # [START howto_operator_adf_run_pipeline_async] run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="<PipelineName>", wait_for_termination=False, ) pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=run_pipeline2.output["run_id"], ) # [END howto_operator_adf_run_pipeline_async] begin >> Label("No async wait") >> run_pipeline1 begin >> Label("Do async wait with sensor") >> run_pipeline2 [run_pipeline1, pipeline_run_sensor] >> end # Task dependency created via `XComArgs`: # run_pipeline2 >> pipeline_run_sensorAnda harus membuat koneksi menggunakan Admin UI Manajer Orkestrasi Alur Kerja - Koneksi ->> '+' -> Pilih 'Jenis koneksi' sebagai 'Azure Data Factory', lalu isi client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name, dan pipeline_name Anda.
Unggah file adf.py ke penyimpanan blob Anda dalam folder bernama DAGS.
Impor folder DAGS ke lingkungan Workflow Orchestration Manager Anda. Jika Anda tidak memilikinya, buat yang baru