Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Anda dapat menjalankan alur sebagai bagian dari alur kerja pemrosesan data dengan Lakeflow Jobs, Apache Airflow, atau Azure Data Factory.
Pekerjaan
Anda dapat mengatur beberapa tugas dalam pekerjaan Databricks untuk menerapkan alur kerja pemrosesan data. Untuk menyertakan alur dalam pekerjaan, gunakan tugas Alur saat Anda membuat pekerjaan. Lihat Tugas pipeline untuk pekerjaan.
Apache Airflow
Apache Airflow adalah solusi sumber terbuka untuk mengelola dan menjadwalkan alur kerja data. Airflow mewakili alur kerja sebagai graf asiklik (DAG) operasi yang diarahkan. Anda menentukan alur kerja dalam file Python dan Airflow mengelola penjadwalan dan eksekusi. Untuk informasi tentang menginstal dan menggunakan Airflow dengan Azure Databricks, lihat Mengatur Pekerjaan Lakeflow dengan Apache Airflow.
Untuk menjalankan alur sebagai bagian dari alur kerja Airflow, gunakan DatabricksSubmitRunOperator.
Persyaratan
Berikut adalah persyaratan untuk menggunakan dukungan Airflow pada Lakeflow Spark Declarative Pipelines:
- Airflow versi 2.1.0 atau yang lebih baru.
- Paket penyedia Databricks versi 2.1.0 atau yang lebih baru.
Example
Contoh berikut membuat Airflow DAG yang memicu pembaruan untuk jalur pemrosesan dengan pengidentifikasi 8279d543-063c-4d63-9926-dae38e35ce8b:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('ldp',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
Ganti CONNECTION_ID dengan pengidentifikasi untuk koneksi Airflow ke ruang kerja Anda.
Simpan contoh ini di airflow/dags direktori dan gunakan UI Airflow untuk melihat dan memicu DAG. Gunakan UI alur untuk melihat detail pembaruan alur.
Azure Data Factory
Nota
Lakeflow Spark Declarative Pipelines dan Azure Data Factory masing-masing menyertakan opsi untuk mengonfigurasi jumlah percobaan ulang saat kegagalan terjadi. Jika nilai coba lagi dikonfigurasi pada alur Anda dan pada aktivitas Azure Data Factory yang memanggil alur, jumlah coba lagi diperoleh dengan mengalikan nilai coba lagi Azure Data Factory dengan nilai coba lagi alur.
Misalnya, jika pembaruan alur gagal, Lakeflow Spark Declarative Pipelines mencoba kembali pembaruan hingga lima kali secara default. Jika pengulangan di Azure Data Factory diatur ke tiga, dan pipeline Anda menggunakan default lima percobaan ulang, pipeline yang gagal mungkin akan dicoba ulang hingga lima belas kali. Untuk menghindari upaya coba lagi yang berlebihan saat pembaruan alur gagal, Databricks merekomendasikan untuk membatasi jumlah percobaan ulang saat mengonfigurasi alur atau aktivitas Azure Data Factory yang memanggil alur.
Untuk mengubah konfigurasi pengulangan untuk alur Anda, gunakan parameter pipelines.numUpdateRetryAttempts saat mengonfigurasi alur.
Azure Data Factory adalah layanan ETL berbasis cloud yang memungkinkan Anda mengatur alur kerja integrasi dan transformasi data. Azure Data Factory secara langsung mendukung menjalankan tugas Azure Databricks dalam alur kerja, termasuk notebook, tugas JAR, dan skrip Python. Anda juga dapat menambahkan pipeline ke dalam alur kerja dengan memanggil API REST pipeline dari Aktivitas web Azure Data Factory. Misalnya, untuk memicu pembaruan alur dari Azure Data Factory:
Buat pabrik data atau buka pabrik data yang ada.
Saat pembuatan selesai, buka halaman pabrik data Anda dan klik ubin Buka Azure Data Factory Studio. Antarmuka pengguna Azure Data Factory muncul.
Buat alur Azure Data Factory baru dengan memilih Alur dari menu drop-down Baru di antarmuka pengguna Azure Data Factory Studio.
Di kotak alat Aktivitas , perluas Umum dan seret aktivitas Web ke kanvas alur. Klik tab Pengaturan dan masukkan nilai berikut ini:
Nota
Sebagai praktik terbaik keamanan, saat Anda mengautentikasi dengan alat, sistem, skrip, dan aplikasi otomatis, Databricks merekomendasikan agar Anda menggunakan token akses pribadi milik perwakilan layanan , bukan pengguna ruang kerja. Untuk membuat token untuk perwakilan layanan, lihat Mengelola token untuk perwakilan layanan.
URL:
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.Ganti
<get-workspace-instance>.Ganti
<pipeline-id>dengan pengidentifikasi alur.Metode: Pilih POST dari menu drop-down.
Header: Klik + Baru. Dalam kotak teks Nama , masukkan
Authorization. Dalam kotak teks Nilai , masukkanBearer <personal-access-token>.Ganti
<personal-access-token>dengan token akses pribadi Azure Databricks.Isi: Untuk meneruskan parameter permintaan tambahan, masukkan dokumen JSON yang berisi parameter. Misalnya, untuk memulai pembaruan dan memproses ulang semua data untuk alur:
{"full_refresh": "true"}. Jika tidak ada parameter permintaan tambahan, masukkan kurung kurawal kosong ({}).
Untuk menguji aktivitas Web, klik Debug pada toolbar alur di antarmuka pengguna Data Factory. Output dan status eksekusi, termasuk kesalahan, ditampilkan di tab Output dari alur Azure Data Factory. Gunakan antarmuka pengguna pipeline untuk melihat detail pembaruan pipeline.
Petunjuk / Saran
Persyaratan alur kerja umum adalah memulai tugas setelah menyelesaikan tugas sebelumnya. Karena permintaan pipeline updates bersifat asinkron—permintaan kembali setelah memulai pembaruan tetapi sebelum pembaruan selesai—maka tugas dalam pipeline Azure Data Factory Anda yang bergantung pada pembaruan pipeline tersebut harus menunggu hingga pembaruan tersebut selesai. Opsi untuk menunggu penyelesaian pembaruan adalah menambahkan aktivitas Until setelah aktivitas Web yang memicu pembaruan Alur Deklaratif Lakeflow Spark. Dalam aktivitas Until:
- Tambahkan aktivitas Tunggu untuk menunggu jumlah detik yang dikonfigurasi untuk penyelesaian pembaruan.
- Tambahkan aktivitas Web setelah aktivitas Tunggu yang menggunakan permintaan detail pembaruan alur untuk mendapatkan status pembaruan. Bidang
statedalam respons mengembalikan status pembaruan saat ini, termasuk jika telah selesai. - Gunakan nilai
statebidang untuk mengatur kondisi penghentian untuk aktivitas Until. Anda juga dapat menggunakan aktivitas Atur Variabel untuk menetapkan variabel pipeline berdasarkan nilaistatedan menggunakan variabel ini sebagai kondisi penghentian.