Menjalankan pipeline dalam alur kerja

Anda dapat menjalankan alur sebagai bagian dari alur kerja pemrosesan data dengan Lakeflow Jobs, Apache Airflow, atau Azure Data Factory.

Pekerjaan

Anda dapat mengatur beberapa tugas dalam pekerjaan Databricks untuk menerapkan alur kerja pemrosesan data. Untuk menyertakan alur dalam pekerjaan, gunakan tugas Alur saat Anda membuat pekerjaan. Lihat Tugas pipeline untuk pekerjaan.

Apache Airflow

Apache Airflow adalah solusi sumber terbuka untuk mengelola dan menjadwalkan alur kerja data. Airflow mewakili alur kerja sebagai graf asiklik (DAG) operasi yang diarahkan. Anda menentukan alur kerja dalam file Python dan Airflow mengelola penjadwalan dan eksekusi. Untuk informasi tentang menginstal dan menggunakan Airflow dengan Azure Databricks, lihat Mengatur Pekerjaan Lakeflow dengan Apache Airflow.

Untuk menjalankan alur sebagai bagian dari alur kerja Airflow, gunakan DatabricksSubmitRunOperator.

Persyaratan

Berikut adalah persyaratan untuk menggunakan dukungan Airflow pada Lakeflow Spark Declarative Pipelines:

  • Airflow versi 2.1.0 atau yang lebih baru.
  • Paket penyedia Databricks versi 2.1.0 atau yang lebih baru.

Example

Contoh berikut membuat Airflow DAG yang memicu pembaruan untuk jalur pemrosesan dengan pengidentifikasi 8279d543-063c-4d63-9926-dae38e35ce8b:

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('ldp',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

Ganti CONNECTION_ID dengan pengidentifikasi untuk koneksi Airflow ke ruang kerja Anda.

Simpan contoh ini di airflow/dags direktori dan gunakan UI Airflow untuk melihat dan memicu DAG. Gunakan UI alur untuk melihat detail pembaruan alur.

Azure Data Factory

Nota

Lakeflow Spark Declarative Pipelines dan Azure Data Factory masing-masing menyertakan opsi untuk mengonfigurasi jumlah percobaan ulang saat kegagalan terjadi. Jika nilai coba lagi dikonfigurasi pada alur Anda dan pada aktivitas Azure Data Factory yang memanggil alur, jumlah coba lagi diperoleh dengan mengalikan nilai coba lagi Azure Data Factory dengan nilai coba lagi alur.

Misalnya, jika pembaruan alur gagal, Lakeflow Spark Declarative Pipelines mencoba kembali pembaruan hingga lima kali secara default. Jika pengulangan di Azure Data Factory diatur ke tiga, dan pipeline Anda menggunakan default lima percobaan ulang, pipeline yang gagal mungkin akan dicoba ulang hingga lima belas kali. Untuk menghindari upaya coba lagi yang berlebihan saat pembaruan alur gagal, Databricks merekomendasikan untuk membatasi jumlah percobaan ulang saat mengonfigurasi alur atau aktivitas Azure Data Factory yang memanggil alur.

Untuk mengubah konfigurasi pengulangan untuk alur Anda, gunakan parameter pipelines.numUpdateRetryAttempts saat mengonfigurasi alur.

Azure Data Factory adalah layanan ETL berbasis cloud yang memungkinkan Anda mengatur alur kerja integrasi dan transformasi data. Azure Data Factory secara langsung mendukung menjalankan tugas Azure Databricks dalam alur kerja, termasuk notebook, tugas JAR, dan skrip Python. Anda juga dapat menambahkan pipeline ke dalam alur kerja dengan memanggil API REST pipeline dari Aktivitas web Azure Data Factory. Misalnya, untuk memicu pembaruan alur dari Azure Data Factory:

  1. Buat pabrik data atau buka pabrik data yang ada.

  2. Saat pembuatan selesai, buka halaman pabrik data Anda dan klik ubin Buka Azure Data Factory Studio. Antarmuka pengguna Azure Data Factory muncul.

  3. Buat alur Azure Data Factory baru dengan memilih Alur dari menu drop-down Baru di antarmuka pengguna Azure Data Factory Studio.

  4. Di kotak alat Aktivitas , perluas Umum dan seret aktivitas Web ke kanvas alur. Klik tab Pengaturan dan masukkan nilai berikut ini:

    Nota

    Sebagai praktik terbaik keamanan, saat Anda mengautentikasi dengan alat, sistem, skrip, dan aplikasi otomatis, Databricks merekomendasikan agar Anda menggunakan token akses pribadi milik perwakilan layanan , bukan pengguna ruang kerja. Untuk membuat token untuk perwakilan layanan, lihat Mengelola token untuk perwakilan layanan.

    • URL: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

      Ganti <get-workspace-instance>.

      Ganti <pipeline-id> dengan pengidentifikasi alur.

    • Metode: Pilih POST dari menu drop-down.

    • Header: Klik + Baru. Dalam kotak teks Nama , masukkan Authorization. Dalam kotak teks Nilai , masukkan Bearer <personal-access-token>.

      Ganti <personal-access-token> dengan token akses pribadi Azure Databricks.

    • Isi: Untuk meneruskan parameter permintaan tambahan, masukkan dokumen JSON yang berisi parameter. Misalnya, untuk memulai pembaruan dan memproses ulang semua data untuk alur: {"full_refresh": "true"}. Jika tidak ada parameter permintaan tambahan, masukkan kurung kurawal kosong ({}).

Untuk menguji aktivitas Web, klik Debug pada toolbar alur di antarmuka pengguna Data Factory. Output dan status eksekusi, termasuk kesalahan, ditampilkan di tab Output dari alur Azure Data Factory. Gunakan antarmuka pengguna pipeline untuk melihat detail pembaruan pipeline.

Petunjuk / Saran

Persyaratan alur kerja umum adalah memulai tugas setelah menyelesaikan tugas sebelumnya. Karena permintaan pipeline updates bersifat asinkron—permintaan kembali setelah memulai pembaruan tetapi sebelum pembaruan selesai—maka tugas dalam pipeline Azure Data Factory Anda yang bergantung pada pembaruan pipeline tersebut harus menunggu hingga pembaruan tersebut selesai. Opsi untuk menunggu penyelesaian pembaruan adalah menambahkan aktivitas Until setelah aktivitas Web yang memicu pembaruan Alur Deklaratif Lakeflow Spark. Dalam aktivitas Until:

  1. Tambahkan aktivitas Tunggu untuk menunggu jumlah detik yang dikonfigurasi untuk penyelesaian pembaruan.
  2. Tambahkan aktivitas Web setelah aktivitas Tunggu yang menggunakan permintaan detail pembaruan alur untuk mendapatkan status pembaruan. Bidang state dalam respons mengembalikan status pembaruan saat ini, termasuk jika telah selesai.
  3. Gunakan nilai state bidang untuk mengatur kondisi penghentian untuk aktivitas Until. Anda juga dapat menggunakan aktivitas Atur Variabel untuk menetapkan variabel pipeline berdasarkan nilai state dan menggunakan variabel ini sebagai kondisi penghentian.