Mengatur pekerjaan Azure Databricks dengan Apache Airflow
Artikel ini menjelaskan dukungan Apache Airflow untuk mengatur alur data dengan Azure Databricks, memiliki instruksi untuk menginstal dan mengonfigurasi Airflow secara lokal, dan memberikan contoh penyebaran dan menjalankan alur kerja Azure Databricks dengan Airflow.
Orkestrasi pekerjaan dalam alur data
Mengembangkan dan menyebarkan alur pemrosesan data sering membutuhkan pengelolaan dependensi yang kompleks antar tugas. Misalnya, alur mungkin membaca data dari sumber, membersihkan data, mengubah data yang dibersihkan, dan menulis data yang diubah menjadi target. Anda juga memerlukan dukungan untuk kesalahan pengujian, penjadwalan, dan pemecahan masalah saat mengoprasikan alur.
Sistem alur kerja mengatasi tantangan ini dengan memungkinkan Anda menentukan dependensi antar tugas, menjadwalkan kapan alur berjalan, dan memantau alur kerja. Apache Airflow adalah solusi sumber terbuka untuk mengelola dan menjadwalkan alur data. Airflow mewujudkan alur data sebagai operasi grafik non-siklus terarah (DAGs). Anda menentukan alur kerja dalam file Python, dan Airflow mengelola penjadwalan dan eksekusi. Koneksi Airflow Azure Databricks memungkinkan Anda memanfaatkan mesin Spark yang dioptimalkan yang ditawarkan oleh Azure Databricks dengan fitur penjadwalan Airflow.
Persyaratan
- Integrasi antara Airflow dan Azure Databricks memerlukan Airflow versi 2.5.0 dan yang lebih baru. Contoh dalam artikel ini diuji dengan Airflow versi 2.6.1.
- Aliran udara memerlukan Python 3.8, 3.9, 3.10, atau 3.11. Contoh dalam artikel ini diuji dengan Python 3.8.
- Instruksi dalam artikel ini untuk menginstal dan menjalankan Airflow memerlukan pipenv untuk membuat lingkungan virtual Python.
Operator aliran udara untuk Databricks
AIRflow DAG terdiri dari tugas, di mana setiap tugas menjalankan Operator Aliran Udara. Operator Airflow yang mendukung integrasi ke Databricks diimplementasikan di penyedia Databricks.
Penyedia Databricks mencakup operator untuk menjalankan sejumlah tugas terhadap ruang kerja Azure Databricks, termasuk mengimpor data ke dalam tabel, menjalankan kueri SQL, dan bekerja dengan folder Databricks Git.
Penyedia Databricks mengimplementasikan dua operator untuk memicu pekerjaan:
- DatabricksRunNowOperator memerlukan pekerjaan Azure Databricks yang ada dan menggunakan permintaan API POST /api/2.1/jobs/run-now untuk memicu eksekusi. Databricks merekomendasikan penggunaan
DatabricksRunNowOperator
karena mengurangi duplikasi definisi pekerjaan, dan eksekusi pekerjaan yang dipicu dengan operator ini dapat ditemukan di antarmuka pengguna Pekerjaan. - DatabricksSubmitRunOperator tidak memerlukan pekerjaan untuk ada di Azure Databricks dan menggunakan permintaan POST /api/2.1/jobs/runs/submit API untuk mengirimkan spesifikasi pekerjaan dan memicu eksekusi.
Untuk membuat pekerjaan Azure Databricks baru atau mengatur ulang pekerjaan yang ada, penyedia Databricks mengimplementasikan DatabricksCreateJobsOperator. menggunakan DatabricksCreateJobsOperator
permintaan POST /api/2.1/jobs/create dan POST /api/2.1/jobs/reset API. Anda dapat menggunakan DatabricksCreateJobsOperator
dengan DatabricksRunNowOperator
untuk membuat dan menjalankan pekerjaan.
Catatan
Menggunakan operator Databricks untuk memicu pekerjaan memerlukan penyediaan kredensial dalam konfigurasi koneksi Databricks. Lihat Membuat token akses pribadi Azure Databricks untuk Airflow.
Operator Databricks Airflow menulis URL halaman eksekusi pekerjaan ke log Airflow setiap polling_period_seconds
(defaultnya adalah 30 detik). Untuk informasi lebih lanjut, lihat halaman paket apache-airflow-providers-databricks di situs web Airflow.
Menginstal integrasi Airflow Azure Databricks secara lokal
Untuk menginstal Airflow dan penyedia Databricks secara lokal untuk pengujian dan pengembangan, gunakan langkah-langkah berikut. Untuk opsi penginstalan Airflow lainnya, termasuk membuat penginstalan produksi, lihat penginstalan dalam dokumentasi Airflow.
Buka terminal dan jalankan perintah berikut:
mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>
Ganti <firstname>
, <lastname>
, dan <email>
dengan nama pengguna dan email Anda. Anda akan diminta untuk memasukkan kata sandi untuk pengguna admin. Pastikan untuk menyimpan kata sandi ini karena diperlukan untuk masuk ke antarmuka pengguna Airflow.
Skrip ini melakukan langkah-langkah berikut:
- Membuat direktori bernama
airflow
dan mengubah ke direktori tersebut. pipenv
Menggunakan untuk membuat dan menelurkan lingkungan virtual Python. Databricks merekomendasikan menggunakan lingkungan virtual Python untuk mengisolasi versi paket dan dependensi kode ke lingkungan tersebut. Isolasi ini membantu mengurangi ketidakcocokan versi paket yang tidak terduga dan tabrakan dependensi kode.- Menginisialisasi variabel lingkungan bernama yang
AIRFLOW_HOME
diatur ke jalurairflow
direktori. - Menginstal Paket penyedia Airflow dan Airflow Databricks.
airflow/dags
Membuat direktori. Airflow menggunakan direktoridags
untuk menyimpan definisi DAG.- Menginisialisasi database SQLite yang digunakan Airflow untuk melacak metadata. Dalam penyebaran Airflow produksi, Anda akan mengonfigurasi Airflow dengan database standar. Database SQLite dan konfigurasi default untuk penyebaran Airflow Anda diinsialisasi di direktori
airflow
. - Membuat pengguna admin untuk Airflow.
Tip
Untuk mengonfirmasi penginstalan penyedia Databricks, jalankan perintah berikut di direktori penginstalan Airflow:
airflow providers list
Mulai server web dan penjadwal Airflow
Server web Airflow diperlukan untuk melihat UI Airflow. Untuk memulai server web, buka terminal di direktori penginstalan Airflow dan jalankan perintah berikut:
Catatan
Jika server web Airflow gagal dimulai karena konflik port, Anda dapat mengubah port default dalam konfigurasi Airflow.
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver
Penjadwal adalah komponen Airflow yang menjadwalkan DAGs. Untuk memulai penjadwal, buka terminal baru di direktori penginstalan Airflow dan jalankan perintah berikut:
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler
Menguji instalasi Airflow
Untuk memverifikasi instalasi Airflow, Anda dapat menjalankan salah satu contoh DAG yang disertakan dengan Airflow:
- Di jendela browser, buka
http://localhost:8080/home
. Masuk ke UI Airflow dengan nama pengguna dan kata sandi yang Anda buat saat menginstal Airflow. Halaman DAG Airflow muncul. - Klik tombol Pause/Unpause DAG untuk melepaskan salah satu contoh DAG, misalnya,
example_python_operator
. - Picu contoh DAG dengan mengklik tombol Picu DAG .
- Klik nama DAG untuk melihat detail, termasuk status run DAG.
Membuat token akses pribadi Azure Databricks untuk Airflow
Airflow terhubung ke Databricks menggunakan token akses pribadi (PAT) Azure Databricks. Untuk membuat PAT:
- Di ruang kerja Azure Databricks Anda, klik nama pengguna Azure Databricks Anda di bilah atas, lalu pilih Pengaturan dari menu drop-down.
- Klik Pengembang.
- Di samping Token akses, klik Kelola.
- Klik Buat token baru.
- (Opsional) Masukkan komentar yang membantu Anda mengidentifikasi token ini di masa mendatang, dan mengubah masa pakai default token selama 90 hari. Untuk membuat token tanpa masa pakai (tidak disarankan), biarkan kotak Seumur Hidup (hari) kosong (kosong).
- Klik Buat.
- Salin token yang ditampilkan ke lokasi aman, lalu klik Selesai.
Catatan
Pastikan untuk menyimpan token yang disalin di lokasi yang aman. Jangan bagikan token yang Anda salin dengan orang lain. Jika Anda kehilangan token yang disalin, Anda tidak dapat meregenerasi token yang sama persis. Sebagai gantinya, Anda harus mengulangi prosedur ini untuk membuat token baru. Jika Anda kehilangan token yang disalin, atau Anda yakin bahwa token telah disusupi, Databricks sangat menyarankan agar Anda segera menghapus token tersebut dari ruang kerja Anda dengan mengklik ikon tempat sampah (Cabut) di samping token di halaman Token akses.
Jika Anda tidak dapat membuat atau menggunakan token di ruang kerja, ini mungkin karena administrator ruang kerja Anda telah menonaktifkan token atau belum memberi Anda izin untuk membuat atau menggunakan token. Lihat administrator ruang kerja Anda atau topik berikut:
Catatan
Sebagai praktik terbaik keamanan, saat Anda mengautentikasi dengan alat, sistem, skrip, dan aplikasi otomatis, Databricks merekomendasikan agar Anda menggunakan token akses pribadi milik perwakilan layanan, bukan pengguna ruang kerja. Untuk membuat token untuk perwakilan layanan, lihat Mengelola token untuk perwakilan layanan.
Anda juga dapat mengautentikasi ke Azure Databricks menggunakan token ID Microsoft Entra. Lihat Koneksi Databricks dalam dokumentasi Airflow.
Mengonfigurasi koneksi Azure Databricks
Instalasi Airflow Anda berisi koneksi default untuk Azure Databricks. Untuk memperbarui koneksi untuk terhubung ke ruang kerja Anda menggunakan token akses pribadi yang Anda buat di atas:
- Di jendela browser, buka
http://localhost:8080/connection/list/
. Jika diminta untuk masuk, masukkan nama pengguna dan kata sandi admin Anda. - Di bawah ID Conn, temukan databricks_default dan klik tombol Edit rekaman.
- Ganti nilai di bidang Host dengan nama instans ruang kerja penyebaran Azure Databricks Anda, misalnya,
https://adb-123456789.cloud.databricks.com
. - Di bidang Kata Sandi, masukkan token akses pribadi Azure Databricks Anda.
- Klik Simpan.
Jika Anda menggunakan token ID Microsoft Entra, lihat Koneksi Databricks dalam dokumentasi Airflow untuk informasi tentang mengonfigurasi autentikasi.
Contoh: Membuat DAG Airflow untuk menjalankan pekerjaan Azure Databricks
Contoh berikut menunjukkan cara membuat penyebaran Airflow sederhana yang berjalan di mesin lokal Anda dan menyebarkan contoh DAG untuk memicu run di Azure Databricks. Dalam contoh ini, Anda akan:
- Membuat notebook baru dan tambahkan kode untuk mencetak salam berdasarkan parameter yang dikonfigurasi.
- Membuat pekerjaan Azure Databricks dengan satu tugas yang menjalankan notebook.
- Mengonfigurasi koneksi Airflow ke ruang kerja Azure Databricks Anda.
- Membuat DAG Airflow untuk memicu pekerjaan notebook. Anda menentukan DAG dalam skrip Python menggunakan
DatabricksRunNowOperator
. - Menggunakan UI Airflow untuk memicu DAG dan melihat status run.
Membuat notebook
Contoh ini menggunakan notebook yang berisi dua sel:
- Sel pertama berisi widget teks Databricks Utilities yang mendefinisikan variabel bernama
greeting
yang diatur ke nilai defaultworld
. - Sel kedua mencetak nilai variabel
greeting
yang berawalanhello
.
Untuk membuat notebook:
Buka ruang kerja Azure Databricks Anda, klik Baru di bar samping, dan pilih Buku Catatan.
Beri nama buku catatan Anda, seperti Hello Airflow, dan pastikan bahasa default diatur ke Python.
Salin kode Python berikut dan tempelkan ke sel pertama buku catatan.
dbutils.widgets.text("greeting", "world", "Greeting") greeting = dbutils.widgets.get("greeting")
Tambahkan sel baru di bawah sel pertama dan salin dan tempel kode Python berikut ke dalam sel baru:
print("hello {}".format(greeting))
Membuat pekerjaan
Klik Alur Kerja di bilah samping.
Klik .
Tab Tugas muncul dengan dialog buat tugas.
Ganti Tambahkan nama untuk pekerjaan Anda... dengan nama pekerjaan Anda.
Di bidang Nama tugas, masukkan nama untuk tugas; misalnya, tugas-memberi salam.
Di menu drop-down Jenis, pilih Buku Catatan.
Di menu drop-down Sumber, pilih Ruang Kerja.
Klik kotak teks Jalur dan gunakan browser file untuk menemukan buku catatan yang Anda buat, klik nama buku catatan, dan klik Konfirmasi.
Klik Tambahkan di bawah Parameter. Di bidang Kunci, masukkan
greeting
. Di bidang Nilai, masukkanAirflow user
.Klik Buat tugas.
Di panel Detail pekerjaan, salin nilai ID Pekerjaan. Nilai ini diperlukan untuk memicu pekerjaan dari Airflow.
Menjalankan pekerjaan
Untuk menguji pekerjaan baru Anda di antarmuka pengguna Pekerjaan Azure Databricks, klik di sudut kanan atas. Setelah proses selesai, Anda dapat memverifikasi output dengan melihat detail eksekusi pekerjaan.
Membuat DAG Airflow baru
Anda menentukan DAG Airflow dalam file Python. Untuk membuat DAG agar memicu contoh pekerjaan notebook:
Di editor teks atau IDE, buat file baru bernama
databricks_dag.py
dengan konten berikut:from airflow import DAG from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow' } with DAG('databricks_dag', start_date = days_ago(2), schedule_interval = None, default_args = default_args ) as dag: opr_run_now = DatabricksRunNowOperator( task_id = 'run_now', databricks_conn_id = 'databricks_default', job_id = JOB_ID )
Ganti
JOB_ID
dengan nilai ID pekerjaan yang disimpan sebelumnya.Simpan file itu dalam direktori
airflow/dags
. Airflow secara otomatis membaca dan menginstal file DAG yang disimpan diairflow/dags/
.
Menginstal dan memverifikasi DAG di Airflow
Untuk memicu dan memverifikasi DAG di UI Airflow:
- Di jendela browser, buka
http://localhost:8080/home
. Layar DAGs Airflow muncul. - Temukan
databricks_dag
dan klik tombol Pause/Unpause DAG untuk menjalankan DAG. - Picu DAG dengan mengklik tombol Picu DAG .
- Klik jalankan di kolom Runs untuk melihat status dan detail run.