Pengamatan di Azure Databricks untuk pekerjaan, Alur Deklaratif Lakeflow Spark, dan Lakeflow Connect

Memantau performa, biaya, dan kesehatan aplikasi streaming Anda sangat penting untuk membangun alur ETL yang andal dan efisien. Azure Databricks menyediakan serangkaian fitur pengamatan yang kaya di seluruh Pekerjaan, Alur Deklaratif Lakeflow Spark, dan Lakeflow Connect untuk membantu mendiagnosis hambatan, mengoptimalkan performa, dan mengelola penggunaan dan biaya sumber daya.

Artikel ini menguraikan praktik terbaik di area berikut:

  • Metrik performa streaming utama
  • Skema log peristiwa dan contoh kueri
  • Pemantauan kueri streaming
  • Pengamatan biaya menggunakan tabel sistem
  • Mengekspor log dan metrik ke alat eksternal

Metrik utama untuk pengamatan streaming

Saat mengoperasikan alur streaming, pantau metrik utama berikut:

Metric Purpose
Backpressure Memantau jumlah file dan nilai offset (ukuran). Membantu mengidentifikasi hambatan dan memastikan sistem dapat menangani data masuk tanpa tertinggal.
Throughput Melacak jumlah pesan yang diproses per mikro-batch. Menilai efisiensi jalur pemrosesan data dan memeriksa apakah jalur tersebut sesuai dengan kecepatan penyerapan data.
Duration Mengukur durasi rata-rata mikro-batch. Menunjukkan kecepatan pemrosesan dan membantu menyetel interval batch.
Latency Menunjukkan berapa banyak rekaman/pesan yang diproses dari waktu ke waktu. Membantu memahami penundaan alur end-to-end dan mengoptimalkan agar latensi lebih rendah.
Pemanfaatan kluster Mencerminkan penggunaan CPU dan memori (%). Memastikan penggunaan sumber daya yang efisien dan membantu menskalakan kluster untuk memenuhi tuntutan pemrosesan.
Network Mengukur data yang ditransfer dan diterima. Berguna untuk mengidentifikasi hambatan jaringan dan meningkatkan performa transfer data.
Checkpoint Mengidentifikasi data dan offset yang diproses. Memastikan konsistensi dan memungkinkan toleransi kesalahan selama kegagalan.
Cost Menampilkan biaya per jam, harian, dan bulanan aplikasi streaming. Membantu dalam anggaran dan pengoptimalan sumber daya.
Lineage Menampilkan himpunan data dan lapisan yang dibuat dalam aplikasi streaming. Memfasilitasi transformasi data, pelacakan, jaminan kualitas, dan penelusuran kesalahan.

Log dan metrik kluster

Log dan metrik kluster Azure Databricks memberikan wawasan terperinci tentang performa dan pemanfaatan kluster. Log dan metrik ini mencakup informasi tentang CPU, memori, I/O disk, lalu lintas jaringan, dan metrik sistem lainnya. Memantau metrik ini sangat penting untuk mengoptimalkan performa kluster, mengelola sumber daya secara efisien, dan memecahkan masalah.

Log dan metrik kluster Azure Databricks menawarkan wawasan terperinci tentang performa kluster dan pemanfaatan sumber daya. Ini termasuk penggunaan CPU dan memori, I/O disk, dan lalu lintas jaringan. Memantau metrik ini sangat penting untuk:

  • Mengoptimalkan performa kluster.
  • Mengelola sumber daya secara efisien.
  • Memecahkan masalah operasional.

Metrik dapat dimanfaatkan melalui UI Databricks atau diekspor ke alat pemantauan pribadi. Lihat Contoh buku catatan: Metrik Datadog.

Antarmuka pengguna Spark

UI Spark menunjukkan informasi terperinci tentang kemajuan pekerjaan dan tahapan, termasuk jumlah tugas yang selesai, tertunda, dan gagal. Ini membantu Anda memahami alur eksekusi dan mengidentifikasi hambatan.

Untuk aplikasi streaming, tab Streaming menampilkan metrik seperti laju input, laju pemrosesan, dan durasi batch. Ini membantu Anda memantau performa pekerjaan streaming Anda dan mengidentifikasi masalah penyerapan atau pemrosesan data apa pun.

Lihat Penelusuran kesalahan dengan antarmuka pengguna Spark untuk informasi selengkapnya.

Metrik komputasi

Metrik komputasi akan membantu Anda memahami pemanfaatan kluster. Saat pekerjaan Anda berjalan, Anda dapat melihat bagaimana skalanya dan bagaimana sumber daya Anda terpengaruh. Anda akan dapat menemukan tekanan memori yang dapat menyebabkan kegagalan OOM atau tekanan CPU yang dapat menyebabkan penundaan panjang. Berikut adalah metrik spesifik yang akan Anda lihat:

  • Distribusi Beban Server: Pemanfaatan CPU setiap simpul selama satu menit terakhir.
  • Pemanfaatan CPU: Persentase waktu yang dihabiskan CPU dalam berbagai mode (misalnya, pengguna, sistem, diam, dan iowait).
  • Pemanfaatan Memori: Total penggunaan memori oleh setiap mode (misalnya, digunakan, gratis, buffer, dan di-cache).
  • Pemanfaatan Pertukaran Memori: Total penggunaan pertukaran memori.
  • Ruang Sistem File Gratis: Total penggunaan sistem file oleh setiap titik pemasangan.
  • Throughput Jaringan: Jumlah byte yang diterima dan ditransmisikan melalui jaringan oleh setiap perangkat.
  • Jumlah Simpul Aktif: Jumlah simpul aktif pada setiap tanda waktu untuk komputasi yang diberikan.

Lihat Pemantauan kinerja dan Bagan metrik perangkat keras untuk informasi selengkapnya.

Tabel sistem

Pemantauan biaya

Tabel sistem Azure Databricks menyediakan pendekatan terstruktur untuk memantau biaya dan performa pekerjaan. Tabel ini meliputi:

  • Detail pelaksanaan pekerjaan.
  • Pemanfaatan sumber daya.
  • Biaya terkait.

Gunakan tabel ini untuk memahami dampak kesehatan operasional dan keuangan.

Requirements

Untuk menggunakan tabel sistem untuk pemantauan biaya:

  • Admin akun harus mengaktifkan system.lakeflow schema.
  • Pengguna harus memilih salah satu:
    • Menjadi admin metastore dan admin akun, atau
    • Memiliki izin USE dan SELECT pada skema sistem.

Contoh kueri: Pekerjaan termahal (30 hari terakhir)

Kueri ini mengidentifikasi pekerjaan termahal selama 30 hari terakhir, membantu analisis dan pengoptimalan biaya.

WITH list_cost_per_job AS (
     SELECT
       t1.workspace_id,
       t1.usage_metadata.job_id,
       COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
       SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
       FIRST(identity_metadata.run_as, true) AS run_as,
       FIRST(t1.custom_tags, true) AS custom_tags,
       MAX(t1.usage_end_time) AS last_seen_date
     FROM system.billing.usage t1
     INNER JOIN system.billing.list_prices list_prices ON
       t1.cloud = list_prices.cloud AND
       t1.sku_name = list_prices.sku_name AND
       t1.usage_start_time >= list_prices.price_start_time AND
       (t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
     WHERE
       t1.billing_origin_product = "JOBS"
       AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
     GROUP BY ALL
   ),
   most_recent_jobs AS (
     SELECT
       *,
       ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
     FROM
       system.lakeflow.jobs QUALIFY rn=1
   )
   SELECT
     t2.name,
     t1.job_id,
     t1.workspace_id,
     t1.runs,
     t1.run_as,
     SUM(list_cost) AS list_cost,
     t1.last_seen_date
   FROM list_cost_per_job t1
   LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
   GROUP BY ALL
   ORDER BY list_cost DESC

Alur Deklaratif Lakeflow Spark

Log peristiwa Lakeflow Spark Declarative Pipelines merekam catatan komprehensif semua peristiwa dalam alur pipeline, termasuk:

  • Log audit.
  • Pemeriksaan kualitas data.
  • Kemajuan alur.
  • Silsilah data.

Log kejadian diaktifkan secara otomatis untuk semua Alur Deklaratif Lakeflow Spark dan dapat diakses melalui:

  • UI Pipeline: Menampilkan log secara langsung.
  • PIPELINES API: Akses terprogram.
  • Kueri langsung: Mengkueri tabel log peristiwa.

Untuk informasi selengkapnya, lihat skema log peristiwa untuk Alur Deklaratif Lakeflow Spark.

Contoh kueri

Contoh kueri ini membantu memantau performa dan kesehatan alur dengan menyediakan metrik utama seperti durasi batch, throughput, backpressure, dan pemanfaatan sumber daya.

Durasi batch rata-rata

Kueri ini menghitung durasi rata-rata batch yang diproses oleh saluran.

SELECT
  (max_t - min_t) / batch_count as avg_batch_duration_seconds,
  batch_count,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      count(*) as batch_count,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Throughput rata-rata

Kueri ini menghitung rata-rata throughput jalur pemrosesan dalam jumlah baris yang diproses tiap detik.

SELECT
  (max_t - min_t) / total_rows as avg_throughput_rps,
  total_rows,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      sum(
        details:flow_progress:metrics:num_output_rows
      ) as total_rows,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Backpressure

Kueri ini mengukur tekanan balik alur dengan memeriksa tunggakan data.

SELECT
  timestamp,
  DOUBLE(
    details:flow_progress:metrics:backlog_bytes
  ) AS backlog_bytes,
  DOUBLE(
    details:flow_progress:metrics:backlog_files
  ) AS backlog_files
FROM
  event_log
WHERE
  event_type = 'flow_progress'

Pemanfaatan kluster dan slot

Kueri ini memberikan wawasan mengenai pemanfaatan kluster atau slot yang digunakan oleh pipeline.

SELECT
  date_trunc("hour", timestamp) AS hour,
  AVG (
    DOUBLE (
      details:cluster_resources:num_task_slots
    )
  ) AS num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_task_slots
    )
  ) AS avg_num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:num_executors
    )
  ) AS num_executors,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_task_slot_utilization
    )
  ) AS avg_utilization,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_queued_tasks
    )
  ) AS queue_size
FROM
  event_log
WHERE
  details : cluster_resources : avg_num_queued_tasks IS NOT NULL
  AND origin.update_id = '${latest_update_id}'
GROUP BY
  1;

Jobs

Anda dapat memantau kueri streaming dalam pekerjaan melalui Pendengar Kueri Streaming.

Lampirkan pendengar ke sesi Spark untuk mengaktifkan Pendengar Kueri Streaming diAzure Databricks. Pendengar ini akan memantau kemajuan dan metrik kueri streaming Anda. Ini dapat digunakan untuk mendorong metrik ke alat pemantauan eksternal atau mencatatnya untuk analisis lebih lanjut.

Contoh: Mengekspor metrik ke alat pemantauan eksternal

Note

Ini tersedia di Databricks Runtime 11.3 LTS ke atas untuk Python dan Scala.

Anda dapat mengekspor metrik streaming ke layanan eksternal untuk pemberitahuan atau dasbor dengan menggunakan StreamingQueryListener antarmuka.

Berikut adalah contoh dasar tentang cara mengimplementasikan pendengar:

from pyspark.sql.streaming import StreamingQueryListener

class MyListener(StreamingQueryListener):
   def onQueryStarted(self, event):
       print("Query started: ", event.id)

   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress)

   def onQueryTerminated(self, event):
       print("Query terminated: ", event.id)

spark.streams.addListener(MyListener())

Contoh: Menggunakan pendengar kueri dalam Azure Databricks

Di bawah ini adalah contoh dari log kejadian StreamingQueryListener untuk kueri streaming dari Kafka ke Delta Lake.

{
  "id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
  "runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
  "timestamp": "2024-05-15T21:57:50.782Z",
  "batchId": 0,
  "batchDuration": 3601,
  "numInputRows": 20,
  "inputRowsPerSecond": 0.0,
  "processedRowsPerSecond": 5.55401277422938,
  "durationMs": {
    "addBatch": 1544,
    "commitBatch": 686,
    "commitOffsets": 27,
    "getBatch": 12,
    "latestOffset": 577,
    "queryPlanning": 105,
    "triggerExecution": 3600,
    "walCommit": 34
  },
  "stateOperators": [
    {
      "operatorName": "symmetricHashJoin",
      "numRowsTotal": 20,
      "numRowsUpdated": 20,
      "allUpdatesTimeMs": 473,
      "numRowsRemoved": 0,
      "allRemovalsTimeMs": 0,
      "commitTimeMs": 277,
      "memoryUsedBytes": 13120,
      "numRowsDroppedByWatermark": 0,
      "numShufflePartitions": 5,
      "numStateStoreInstances": 20,
      "customMetrics": {
        "loadedMapCacheHitCount": 0,
        "loadedMapCacheMissCount": 0,
        "stateOnCurrentVersionSizeBytes": 5280
      }
    }
  ],
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic-1]]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "avgOffsetsBehindLatest": "0.0",
        "estimatedTotalBytesBehindLatest": "0.0",
        "maxOffsetsBehindLatest": "0",
        "minOffsetsBehindLatest": "0"
      }
    },
    {
      "description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "numBytesOutstanding": "0",
        "numFilesOutstanding": "0"
      }
    }
  ]
}

Untuk contoh selengkapnya, lihat: Contoh.

Metrik kemajuan kueri

Metrik kemajuan kueri sangat penting untuk memantau performa dan kesehatan kueri streaming Anda. Metrik ini mencakup jumlah baris input, tingkat pemrosesan, dan berbagai durasi yang terkait dengan eksekusi kueri. Anda dapat mengamati metrik ini dengan melampirkan StreamingQueryListener ke sesi Spark. Pendengar akan memancarkan peristiwa yang berisi metrik ini di akhir setiap epoch streaming.

Misalnya, Anda dapat mengakses metrik menggunakan StreamingQueryProgress.observedMetrics map dalam metode onQueryProgress pendengar. Ini memungkinkan Anda melacak dan menganalisis performa kueri streaming Anda secara real time.

class MyListener(StreamingQueryListener):
   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress.observedMetrics)