Menjalankan alur yang sudah ada dengan Workflow Orchestration Manager
BERLAKU UNTUK: Azure Data Factory
Azure Synapse Analytics
Tip
Cobalah Data Factory di Microsoft Fabric, solusi analitik all-in-one untuk perusahaan. Microsoft Fabric mencakup semuanya mulai dari pergerakan data hingga ilmu data, analitik real time, kecerdasan bisnis, dan pelaporan. Pelajari cara memulai uji coba baru secara gratis!
Catatan
Workflow Orchestration Manager didukung oleh Apache Airflow.
Catatan
Workflow Orchestration Manager untuk Azure Data Factory bergantung pada aplikasi apache Airflow sumber terbuka. Dokumentasi dan tutorial lainnya untuk Airflow dapat ditemukan di halaman Dokumentasi atau Komunitas Apache Airflow.
Alur Data Factory menyediakan 100+ konektor sumber data yang menyediakan integrasi data/aliran data yang dapat diskalakan dan andal. Ada skenario di mana Anda ingin menjalankan alur pabrik data yang ada dari APACHE Airflow DAG Anda. Tutorial ini menunjukkan kepada Anda cara melakukannya.
Prasyarat
- Langganan Azure. Jika tidak memiliki langganan Azure, buat akun Azure gratis sebelum Anda memulai.
- Akun Microsoft Azure Storage. Jika Anda tidak memiliki akun penyimpanan Azure, lihat artikel Membuat akun penyimpanan Azure untuk langkah-langkah pembuatannya. Pastikan akun penyimpanan hanya mengizinkan akses dari jaringan yang dipilih.
- Alur Azure Data Factory. Anda dapat mengikuti salah satu tutorial dan membuat alur pabrik data baru jika Anda belum memilikinya, atau membuatnya dengan satu pilih di Memulai dan mencoba alur pabrik data pertama Anda.
- Menyiapkan Perwakilan Layanan. Anda harus membuat perwakilan layanan baru atau menggunakan yang sudah ada dan memberinya izin untuk menjalankan alur (contoh - peran kontributor di pabrik data tempat alur yang ada), bahkan jika lingkungan Workflow Orchestration Manager dan alur ada di pabrik data yang sama. Anda harus mendapatkan ID Klien perwakilan layanan dan Rahasia Klien (Kunci API).
Langkah-langkah
Buat file Python baru adf.py dengan konten di bawah ini:
from datetime import datetime, timedelta from airflow.models import DAG, BaseOperator try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from airflow.utils.edgemodifier import Label with DAG( dag_id="example_adf_run_pipeline", start_date=datetime(2022, 5, 14), schedule_interval="@daily", catchup=False, default_args={ "retries": 1, "retry_delay": timedelta(minutes=3), "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI "factory_name": "<FactoryName>", # This can also be specified in the ADF connection. "resource_group_name": "<ResourceGroupName>", # This can also be specified in the ADF connection. }, default_view="graph", ) as dag: begin = EmptyOperator(task_id="begin") end = EmptyOperator(task_id="end") # [START howto_operator_adf_run_pipeline] run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="<PipelineName>", parameters={"myParam": "value"}, ) # [END howto_operator_adf_run_pipeline] # [START howto_operator_adf_run_pipeline_async] run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="<PipelineName>", wait_for_termination=False, ) pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=run_pipeline2.output["run_id"], ) # [END howto_operator_adf_run_pipeline_async] begin >> Label("No async wait") >> run_pipeline1 begin >> Label("Do async wait with sensor") >> run_pipeline2 [run_pipeline1, pipeline_run_sensor] >> end # Task dependency created via `XComArgs`: # run_pipeline2 >> pipeline_run_sensor
Anda harus membuat koneksi menggunakan Admin UI Manajer Orkestrasi Alur Kerja - Koneksi ->> '+' -> Pilih 'Jenis koneksi' sebagai 'Azure Data Factory', lalu isi client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name, dan pipeline_name Anda.
Unggah file adf.py ke penyimpanan blob Anda dalam folder bernama DAGS.
Impor folder DAGS ke lingkungan Workflow Orchestration Manager Anda. Jika Anda tidak memilikinya, buat yang baru
Konten terkait
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk