Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Memantau performa, biaya, dan kesehatan aplikasi streaming Anda sangat penting untuk membangun alur ETL yang andal dan efisien. Azure Databricks menyediakan serangkaian fitur pengamatan yang kaya di seluruh Pekerjaan, Alur Deklaratif Lakeflow Spark, dan Lakeflow Connect untuk membantu mendiagnosis hambatan, mengoptimalkan performa, dan mengelola penggunaan dan biaya sumber daya.
Artikel ini menguraikan praktik terbaik di area berikut:
- Metrik performa streaming utama
- Skema log peristiwa dan contoh kueri
- Pemantauan kueri streaming
- Pengamatan biaya menggunakan tabel sistem
- Mengekspor log dan metrik ke alat eksternal
Metrik utama untuk pengamatan streaming
Saat mengoperasikan alur streaming, pantau metrik utama berikut:
| Metric | Purpose |
|---|---|
| Backpressure | Memantau jumlah file dan nilai offset (ukuran). Membantu mengidentifikasi hambatan dan memastikan sistem dapat menangani data masuk tanpa tertinggal. |
| Throughput | Melacak jumlah pesan yang diproses per mikro-batch. Menilai efisiensi jalur pemrosesan data dan memeriksa apakah jalur tersebut sesuai dengan kecepatan penyerapan data. |
| Duration | Mengukur durasi rata-rata mikro-batch. Menunjukkan kecepatan pemrosesan dan membantu menyetel interval batch. |
| Latency | Menunjukkan berapa banyak rekaman/pesan yang diproses dari waktu ke waktu. Membantu memahami penundaan alur end-to-end dan mengoptimalkan agar latensi lebih rendah. |
| Pemanfaatan kluster | Mencerminkan penggunaan CPU dan memori (%). Memastikan penggunaan sumber daya yang efisien dan membantu menskalakan kluster untuk memenuhi tuntutan pemrosesan. |
| Network | Mengukur data yang ditransfer dan diterima. Berguna untuk mengidentifikasi hambatan jaringan dan meningkatkan performa transfer data. |
| Checkpoint | Mengidentifikasi data dan offset yang diproses. Memastikan konsistensi dan memungkinkan toleransi kesalahan selama kegagalan. |
| Cost | Menampilkan biaya per jam, harian, dan bulanan aplikasi streaming. Membantu dalam anggaran dan pengoptimalan sumber daya. |
| Lineage | Menampilkan himpunan data dan lapisan yang dibuat dalam aplikasi streaming. Memfasilitasi transformasi data, pelacakan, jaminan kualitas, dan penelusuran kesalahan. |
Log dan metrik kluster
Log dan metrik kluster Azure Databricks memberikan wawasan terperinci tentang performa dan pemanfaatan kluster. Log dan metrik ini mencakup informasi tentang CPU, memori, I/O disk, lalu lintas jaringan, dan metrik sistem lainnya. Memantau metrik ini sangat penting untuk mengoptimalkan performa kluster, mengelola sumber daya secara efisien, dan memecahkan masalah.
Log dan metrik kluster Azure Databricks menawarkan wawasan terperinci tentang performa kluster dan pemanfaatan sumber daya. Ini termasuk penggunaan CPU dan memori, I/O disk, dan lalu lintas jaringan. Memantau metrik ini sangat penting untuk:
- Mengoptimalkan performa kluster.
- Mengelola sumber daya secara efisien.
- Memecahkan masalah operasional.
Metrik dapat dimanfaatkan melalui UI Databricks atau diekspor ke alat pemantauan pribadi. Lihat Contoh buku catatan: Metrik Datadog.
Antarmuka pengguna Spark
UI Spark menunjukkan informasi terperinci tentang kemajuan pekerjaan dan tahapan, termasuk jumlah tugas yang selesai, tertunda, dan gagal. Ini membantu Anda memahami alur eksekusi dan mengidentifikasi hambatan.
Untuk aplikasi streaming, tab Streaming menampilkan metrik seperti laju input, laju pemrosesan, dan durasi batch. Ini membantu Anda memantau performa pekerjaan streaming Anda dan mengidentifikasi masalah penyerapan atau pemrosesan data apa pun.
Lihat Penelusuran kesalahan dengan antarmuka pengguna Spark untuk informasi selengkapnya.
Metrik komputasi
Metrik komputasi akan membantu Anda memahami pemanfaatan kluster. Saat pekerjaan Anda berjalan, Anda dapat melihat bagaimana skalanya dan bagaimana sumber daya Anda terpengaruh. Anda akan dapat menemukan tekanan memori yang dapat menyebabkan kegagalan OOM atau tekanan CPU yang dapat menyebabkan penundaan panjang. Berikut adalah metrik spesifik yang akan Anda lihat:
- Distribusi Beban Server: Pemanfaatan CPU setiap simpul selama satu menit terakhir.
- Pemanfaatan CPU: Persentase waktu yang dihabiskan CPU dalam berbagai mode (misalnya, pengguna, sistem, diam, dan iowait).
- Pemanfaatan Memori: Total penggunaan memori oleh setiap mode (misalnya, digunakan, gratis, buffer, dan di-cache).
- Pemanfaatan Pertukaran Memori: Total penggunaan pertukaran memori.
- Ruang Sistem File Gratis: Total penggunaan sistem file oleh setiap titik pemasangan.
- Throughput Jaringan: Jumlah byte yang diterima dan ditransmisikan melalui jaringan oleh setiap perangkat.
- Jumlah Simpul Aktif: Jumlah simpul aktif pada setiap tanda waktu untuk komputasi yang diberikan.
Lihat Pemantauan kinerja dan Bagan metrik perangkat keras untuk informasi selengkapnya.
Tabel sistem
Pemantauan biaya
Tabel sistem Azure Databricks menyediakan pendekatan terstruktur untuk memantau biaya dan performa pekerjaan. Tabel ini meliputi:
- Detail pelaksanaan pekerjaan.
- Pemanfaatan sumber daya.
- Biaya terkait.
Gunakan tabel ini untuk memahami dampak kesehatan operasional dan keuangan.
Requirements
Untuk menggunakan tabel sistem untuk pemantauan biaya:
- Admin akun harus mengaktifkan
system.lakeflow schema. - Pengguna harus memilih salah satu:
- Menjadi admin metastore dan admin akun, atau
- Memiliki izin
USEdanSELECTpada skema sistem.
Contoh kueri: Pekerjaan termahal (30 hari terakhir)
Kueri ini mengidentifikasi pekerjaan termahal selama 30 hari terakhir, membantu analisis dan pengoptimalan biaya.
WITH list_cost_per_job AS (
SELECT
t1.workspace_id,
t1.usage_metadata.job_id,
COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
FIRST(identity_metadata.run_as, true) AS run_as,
FIRST(t1.custom_tags, true) AS custom_tags,
MAX(t1.usage_end_time) AS last_seen_date
FROM system.billing.usage t1
INNER JOIN system.billing.list_prices list_prices ON
t1.cloud = list_prices.cloud AND
t1.sku_name = list_prices.sku_name AND
t1.usage_start_time >= list_prices.price_start_time AND
(t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
WHERE
t1.billing_origin_product = "JOBS"
AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
GROUP BY ALL
),
most_recent_jobs AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
FROM
system.lakeflow.jobs QUALIFY rn=1
)
SELECT
t2.name,
t1.job_id,
t1.workspace_id,
t1.runs,
t1.run_as,
SUM(list_cost) AS list_cost,
t1.last_seen_date
FROM list_cost_per_job t1
LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY list_cost DESC
Alur Deklaratif Lakeflow Spark
Log peristiwa Lakeflow Spark Declarative Pipelines merekam catatan komprehensif semua peristiwa dalam alur pipeline, termasuk:
- Log audit.
- Pemeriksaan kualitas data.
- Kemajuan alur.
- Silsilah data.
Log kejadian diaktifkan secara otomatis untuk semua Alur Deklaratif Lakeflow Spark dan dapat diakses melalui:
- UI Pipeline: Menampilkan log secara langsung.
- PIPELINES API: Akses terprogram.
- Kueri langsung: Mengkueri tabel log peristiwa.
Untuk informasi selengkapnya, lihat skema log peristiwa untuk Alur Deklaratif Lakeflow Spark.
Contoh kueri
Contoh kueri ini membantu memantau performa dan kesehatan alur dengan menyediakan metrik utama seperti durasi batch, throughput, backpressure, dan pemanfaatan sumber daya.
Durasi batch rata-rata
Kueri ini menghitung durasi rata-rata batch yang diproses oleh saluran.
SELECT
(max_t - min_t) / batch_count as avg_batch_duration_seconds,
batch_count,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
count(*) as batch_count,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Throughput rata-rata
Kueri ini menghitung rata-rata throughput jalur pemrosesan dalam jumlah baris yang diproses tiap detik.
SELECT
(max_t - min_t) / total_rows as avg_throughput_rps,
total_rows,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
sum(
details:flow_progress:metrics:num_output_rows
) as total_rows,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Backpressure
Kueri ini mengukur tekanan balik alur dengan memeriksa tunggakan data.
SELECT
timestamp,
DOUBLE(
details:flow_progress:metrics:backlog_bytes
) AS backlog_bytes,
DOUBLE(
details:flow_progress:metrics:backlog_files
) AS backlog_files
FROM
event_log
WHERE
event_type = 'flow_progress'
Pemanfaatan kluster dan slot
Kueri ini memberikan wawasan mengenai pemanfaatan kluster atau slot yang digunakan oleh pipeline.
SELECT
date_trunc("hour", timestamp) AS hour,
AVG (
DOUBLE (
details:cluster_resources:num_task_slots
)
) AS num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:avg_num_task_slots
)
) AS avg_num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:num_executors
)
) AS num_executors,
AVG (
DOUBLE (
details:cluster_resources:avg_task_slot_utilization
)
) AS avg_utilization,
AVG (
DOUBLE (
details:cluster_resources:avg_num_queued_tasks
)
) AS queue_size
FROM
event_log
WHERE
details : cluster_resources : avg_num_queued_tasks IS NOT NULL
AND origin.update_id = '${latest_update_id}'
GROUP BY
1;
Jobs
Anda dapat memantau kueri streaming dalam pekerjaan melalui Pendengar Kueri Streaming.
Lampirkan pendengar ke sesi Spark untuk mengaktifkan Pendengar Kueri Streaming diAzure Databricks. Pendengar ini akan memantau kemajuan dan metrik kueri streaming Anda. Ini dapat digunakan untuk mendorong metrik ke alat pemantauan eksternal atau mencatatnya untuk analisis lebih lanjut.
Contoh: Mengekspor metrik ke alat pemantauan eksternal
Note
Ini tersedia di Databricks Runtime 11.3 LTS ke atas untuk Python dan Scala.
Anda dapat mengekspor metrik streaming ke layanan eksternal untuk pemberitahuan atau dasbor dengan menggunakan StreamingQueryListener antarmuka.
Berikut adalah contoh dasar tentang cara mengimplementasikan pendengar:
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: ", event.id)
def onQueryProgress(self, event):
print("Query made progress: ", event.progress)
def onQueryTerminated(self, event):
print("Query terminated: ", event.id)
spark.streams.addListener(MyListener())
Contoh: Menggunakan pendengar kueri dalam Azure Databricks
Di bawah ini adalah contoh dari log kejadian StreamingQueryListener untuk kueri streaming dari Kafka ke Delta Lake.
{
"id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"timestamp": "2024-05-15T21:57:50.782Z",
"batchId": 0,
"batchDuration": 3601,
"numInputRows": 20,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 5.55401277422938,
"durationMs": {
"addBatch": 1544,
"commitBatch": 686,
"commitOffsets": 27,
"getBatch": 12,
"latestOffset": 577,
"queryPlanning": 105,
"triggerExecution": 3600,
"walCommit": 34
},
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 20,
"numRowsUpdated": 20,
"allUpdatesTimeMs": 473,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 277,
"memoryUsedBytes": 13120,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 5,
"numStateStoreInstances": 20,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"stateOnCurrentVersionSizeBytes": 5280
}
}
],
"sources": [
{
"description": "KafkaV2[Subscribe[topic-1]]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
},
{
"description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
]
}
Untuk contoh selengkapnya, lihat: Contoh.
Metrik kemajuan kueri
Metrik kemajuan kueri sangat penting untuk memantau performa dan kesehatan kueri streaming Anda. Metrik ini mencakup jumlah baris input, tingkat pemrosesan, dan berbagai durasi yang terkait dengan eksekusi kueri. Anda dapat mengamati metrik ini dengan melampirkan StreamingQueryListener ke sesi Spark. Pendengar akan memancarkan peristiwa yang berisi metrik ini di akhir setiap epoch streaming.
Misalnya, Anda dapat mengakses metrik menggunakan StreamingQueryProgress.observedMetrics map dalam metode onQueryProgress pendengar. Ini memungkinkan Anda melacak dan menganalisis performa kueri streaming Anda secara real time.
class MyListener(StreamingQueryListener):
def onQueryProgress(self, event):
print("Query made progress: ", event.progress.observedMetrics)