Bagikan melalui


Mengatur pekerjaan Azure Databricks dengan Apache Airflow

Artikel ini menjelaskan dukungan Apache Airflow untuk mengatur alur data dengan Azure Databricks, memiliki instruksi untuk menginstal dan mengonfigurasi Airflow secara lokal, dan memberikan contoh penyebaran dan menjalankan alur kerja Azure Databricks dengan Airflow.

Orkestrasi pekerjaan dalam alur data

Mengembangkan dan menyebarkan alur pemrosesan data sering membutuhkan pengelolaan dependensi yang kompleks antar tugas. Misalnya, alur mungkin membaca data dari sumber, membersihkan data, mengubah data yang dibersihkan, dan menulis data yang diubah menjadi target. Anda juga memerlukan dukungan untuk kesalahan pengujian, penjadwalan, dan pemecahan masalah saat mengoprasikan alur.

Sistem alur kerja mengatasi tantangan ini dengan memungkinkan Anda menentukan dependensi antar tugas, menjadwalkan kapan alur berjalan, dan memantau alur kerja. Apache Airflow adalah solusi sumber terbuka untuk mengelola dan menjadwalkan alur data. Airflow mewujudkan alur data sebagai operasi grafik non-siklus terarah (DAGs). Anda menentukan alur kerja dalam file Python, dan Airflow mengelola penjadwalan dan eksekusi. Koneksi Airflow Azure Databricks memungkinkan Anda memanfaatkan mesin Spark yang dioptimalkan yang ditawarkan oleh Azure Databricks dengan fitur penjadwalan Airflow.

Persyaratan

  • Integrasi antara Airflow dan Azure Databricks memerlukan Airflow versi 2.5.0 dan yang lebih baru. Contoh dalam artikel ini diuji dengan Airflow versi 2.6.1.
  • Aliran udara memerlukan Python 3.8, 3.9, 3.10, atau 3.11. Contoh dalam artikel ini diuji dengan Python 3.8.
  • Instruksi dalam artikel ini untuk menginstal dan menjalankan Airflow memerlukan pipenv untuk membuat lingkungan virtual Python.

Operator aliran udara untuk Databricks

AIRflow DAG terdiri dari tugas, di mana setiap tugas menjalankan Operator Aliran Udara. Operator Airflow yang mendukung integrasi ke Databricks diimplementasikan di penyedia Databricks.

Penyedia Databricks mencakup operator untuk menjalankan sejumlah tugas terhadap ruang kerja Azure Databricks, termasuk mengimpor data ke dalam tabel, menjalankan kueri SQL, dan bekerja dengan folder Databricks Git.

Penyedia Databricks mengimplementasikan dua operator untuk memicu pekerjaan:

Untuk membuat pekerjaan Azure Databricks baru atau mengatur ulang pekerjaan yang ada, penyedia Databricks mengimplementasikan DatabricksCreateJobsOperator. menggunakan DatabricksCreateJobsOperator permintaan POST /api/2.1/jobs/create dan POST /api/2.1/jobs/reset API. Anda dapat menggunakan DatabricksCreateJobsOperator dengan DatabricksRunNowOperator untuk membuat dan menjalankan pekerjaan.

Catatan

Menggunakan operator Databricks untuk memicu pekerjaan memerlukan penyediaan kredensial dalam konfigurasi koneksi Databricks. Lihat Membuat token akses pribadi Azure Databricks untuk Airflow.

Operator Databricks Airflow menulis URL halaman eksekusi pekerjaan ke log Airflow setiap polling_period_seconds (defaultnya adalah 30 detik). Untuk informasi lebih lanjut, lihat halaman paket apache-airflow-providers-databricks di situs web Airflow.

Menginstal integrasi Airflow Azure Databricks secara lokal

Untuk menginstal Airflow dan penyedia Databricks secara lokal untuk pengujian dan pengembangan, gunakan langkah-langkah berikut. Untuk opsi penginstalan Airflow lainnya, termasuk membuat penginstalan produksi, lihat penginstalan dalam dokumentasi Airflow.

Buka terminal dan jalankan perintah berikut:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

Ganti <firstname>, <lastname>, dan <email> dengan nama pengguna dan email Anda. Anda akan diminta untuk memasukkan kata sandi untuk pengguna admin. Pastikan untuk menyimpan kata sandi ini karena diperlukan untuk masuk ke antarmuka pengguna Airflow.

Skrip ini melakukan langkah-langkah berikut:

  1. Membuat direktori bernama airflow dan mengubah ke direktori tersebut.
  2. pipenv Menggunakan untuk membuat dan menelurkan lingkungan virtual Python. Databricks merekomendasikan menggunakan lingkungan virtual Python untuk mengisolasi versi paket dan dependensi kode ke lingkungan tersebut. Isolasi ini membantu mengurangi ketidakcocokan versi paket yang tidak terduga dan tabrakan dependensi kode.
  3. Menginisialisasi variabel lingkungan bernama yang AIRFLOW_HOME diatur ke jalur airflow direktori.
  4. Menginstal Paket penyedia Airflow dan Airflow Databricks.
  5. airflow/dags Membuat direktori. Airflow menggunakan direktori dags untuk menyimpan definisi DAG.
  6. Menginisialisasi database SQLite yang digunakan Airflow untuk melacak metadata. Dalam penyebaran Airflow produksi, Anda akan mengonfigurasi Airflow dengan database standar. Database SQLite dan konfigurasi default untuk penyebaran Airflow Anda diinsialisasi di direktori airflow.
  7. Membuat pengguna admin untuk Airflow.

Tip

Untuk mengonfirmasi penginstalan penyedia Databricks, jalankan perintah berikut di direktori penginstalan Airflow:

airflow providers list

Mulai server web dan penjadwal Airflow

Server web Airflow diperlukan untuk melihat UI Airflow. Untuk memulai server web, buka terminal di direktori penginstalan Airflow dan jalankan perintah berikut:

Catatan

Jika server web Airflow gagal dimulai karena konflik port, Anda dapat mengubah port default dalam konfigurasi Airflow.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

Penjadwal adalah komponen Airflow yang menjadwalkan DAGs. Untuk memulai penjadwal, buka terminal baru di direktori penginstalan Airflow dan jalankan perintah berikut:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Menguji instalasi Airflow

Untuk memverifikasi instalasi Airflow, Anda dapat menjalankan salah satu contoh DAG yang disertakan dengan Airflow:

  1. Di jendela browser, buka http://localhost:8080/home. Masuk ke UI Airflow dengan nama pengguna dan kata sandi yang Anda buat saat menginstal Airflow. Halaman DAG Airflow muncul.
  2. Klik tombol Pause/Unpause DAG untuk melepaskan salah satu contoh DAG, misalnya,example_python_operator.
  3. Picu contoh DAG dengan mengklik tombol Picu DAG .
  4. Klik nama DAG untuk melihat detail, termasuk status run DAG.

Membuat token akses pribadi Azure Databricks untuk Airflow

Airflow terhubung ke Databricks menggunakan token akses pribadi (PAT) Azure Databricks. Untuk membuat PAT:

  1. Di ruang kerja Azure Databricks Anda, klik nama pengguna Azure Databricks Anda di bilah atas, lalu pilih Pengaturan dari menu drop-down.
  2. Klik Pengembang.
  3. Di samping Token akses, klik Kelola.
  4. Klik Buat token baru.
  5. (Opsional) Masukkan komentar yang membantu Anda mengidentifikasi token ini di masa mendatang, dan mengubah masa pakai default token selama 90 hari. Untuk membuat token tanpa masa pakai (tidak disarankan), biarkan kotak Seumur Hidup (hari) kosong (kosong).
  6. Klik Buat.
  7. Salin token yang ditampilkan ke lokasi aman, lalu klik Selesai.

Catatan

Pastikan untuk menyimpan token yang disalin di lokasi yang aman. Jangan bagikan token yang Anda salin dengan orang lain. Jika Anda kehilangan token yang disalin, Anda tidak dapat meregenerasi token yang sama persis. Sebagai gantinya, Anda harus mengulangi prosedur ini untuk membuat token baru. Jika Anda kehilangan token yang disalin, atau Anda yakin bahwa token telah disusupi, Databricks sangat menyarankan agar Anda segera menghapus token tersebut dari ruang kerja Anda dengan mengklik ikon tempat sampah (Cabut) di samping token di halaman Token akses.

Jika Anda tidak dapat membuat atau menggunakan token di ruang kerja, ini mungkin karena administrator ruang kerja Anda telah menonaktifkan token atau belum memberi Anda izin untuk membuat atau menggunakan token. Lihat administrator ruang kerja Anda atau topik berikut:

Catatan

Sebagai praktik terbaik keamanan, saat Anda mengautentikasi dengan alat, sistem, skrip, dan aplikasi otomatis, Databricks merekomendasikan agar Anda menggunakan token akses pribadi milik perwakilan layanan, bukan pengguna ruang kerja. Untuk membuat token untuk perwakilan layanan, lihat Mengelola token untuk perwakilan layanan.

Anda juga dapat mengautentikasi ke Azure Databricks menggunakan token ID Microsoft Entra. Lihat Koneksi Databricks dalam dokumentasi Airflow.

Mengonfigurasi koneksi Azure Databricks

Instalasi Airflow Anda berisi koneksi default untuk Azure Databricks. Untuk memperbarui koneksi untuk terhubung ke ruang kerja Anda menggunakan token akses pribadi yang Anda buat di atas:

  1. Di jendela browser, buka http://localhost:8080/connection/list/. Jika diminta untuk masuk, masukkan nama pengguna dan kata sandi admin Anda.
  2. Di bawah ID Conn, temukan databricks_default dan klik tombol Edit rekaman.
  3. Ganti nilai di bidang Host dengan nama instans ruang kerja penyebaran Azure Databricks Anda, misalnya, https://adb-123456789.cloud.databricks.com.
  4. Di bidang Kata Sandi, masukkan token akses pribadi Azure Databricks Anda.
  5. Klik Simpan.

Jika Anda menggunakan token ID Microsoft Entra, lihat Koneksi Databricks dalam dokumentasi Airflow untuk informasi tentang mengonfigurasi autentikasi.

Contoh: Membuat DAG Airflow untuk menjalankan pekerjaan Azure Databricks

Contoh berikut menunjukkan cara membuat penyebaran Airflow sederhana yang berjalan di mesin lokal Anda dan menyebarkan contoh DAG untuk memicu run di Azure Databricks. Dalam contoh ini, Anda akan:

  1. Membuat notebook baru dan tambahkan kode untuk mencetak salam berdasarkan parameter yang dikonfigurasi.
  2. Membuat pekerjaan Azure Databricks dengan satu tugas yang menjalankan notebook.
  3. Mengonfigurasi koneksi Airflow ke ruang kerja Azure Databricks Anda.
  4. Membuat DAG Airflow untuk memicu pekerjaan notebook. Anda menentukan DAG dalam skrip Python menggunakan DatabricksRunNowOperator.
  5. Menggunakan UI Airflow untuk memicu DAG dan melihat status run.

Membuat notebook

Contoh ini menggunakan notebook yang berisi dua sel:

  • Sel pertama berisi widget teks Databricks Utilities yang mendefinisikan variabel bernama greeting yang diatur ke nilai default world.
  • Sel kedua mencetak nilai variabelgreeting yang berawalan hello.

Untuk membuat notebook:

  1. Buka ruang kerja Azure Databricks Anda, klik Ikon Baru Baru di bar samping, dan pilih Buku Catatan.

  2. Beri nama buku catatan Anda, seperti Hello Airflow, dan pastikan bahasa default diatur ke Python.

  3. Salin kode Python berikut dan tempelkan ke sel pertama buku catatan.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. Tambahkan sel baru di bawah sel pertama dan salin dan tempel kode Python berikut ke dalam sel baru:

    print("hello {}".format(greeting))
    

Membuat pekerjaan

  1. Klik Ikon Alur Kerja Alur Kerja di bilah samping.

  2. Klik Tombol Buat Pekerjaan.

    Tab Tugas muncul dengan dialog buat tugas.

    Membuat dialog tugas pertama

  3. Ganti Tambahkan nama untuk pekerjaan Anda... dengan nama pekerjaan Anda.

  4. Di bidang Nama tugas, masukkan nama untuk tugas; misalnya, tugas-memberi salam.

  5. Di menu drop-down Jenis, pilih Buku Catatan.

  6. Di menu drop-down Sumber, pilih Ruang Kerja.

  7. Klik kotak teks Jalur dan gunakan browser file untuk menemukan buku catatan yang Anda buat, klik nama buku catatan, dan klik Konfirmasi.

  8. Klik Tambahkan di bawah Parameter. Di bidang Kunci, masukkan greeting. Di bidang Nilai, masukkan Airflow user.

  9. Klik Buat tugas.

Di panel Detail pekerjaan, salin nilai ID Pekerjaan. Nilai ini diperlukan untuk memicu pekerjaan dari Airflow.

Menjalankan pekerjaan

Untuk menguji pekerjaan baru Anda di antarmuka pengguna Pekerjaan Azure Databricks, klik Tombol Jalankan Sekarang di sudut kanan atas. Setelah proses selesai, Anda dapat memverifikasi output dengan melihat detail eksekusi pekerjaan.

Membuat DAG Airflow baru

Anda menentukan DAG Airflow dalam file Python. Untuk membuat DAG agar memicu contoh pekerjaan notebook:

  1. Di editor teks atau IDE, buat file baru bernama databricks_dag.py dengan konten berikut:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    Ganti JOB_ID dengan nilai ID pekerjaan yang disimpan sebelumnya.

  2. Simpan file itu dalam direktori airflow/dags. Airflow secara otomatis membaca dan menginstal file DAG yang disimpan di airflow/dags/.

Menginstal dan memverifikasi DAG di Airflow

Untuk memicu dan memverifikasi DAG di UI Airflow:

  1. Di jendela browser, buka http://localhost:8080/home. Layar DAGs Airflow muncul.
  2. Temukan databricks_dag dan klik tombol Pause/Unpause DAG untuk menjalankan DAG.
  3. Picu DAG dengan mengklik tombol Picu DAG .
  4. Klik jalankan di kolom Runs untuk melihat status dan detail run.