Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Halaman ini mencakup penyetelan komputasi, teknik untuk mengurangi latensi end-to-end, dan pendekatan untuk mengukur performa kueri dalam mode real-time.
Penyetelan komputasi
Saat Anda mengonfigurasi komputasi, pertimbangkan hal berikut:
- Tidak seperti mode mikro-batch, tugas real-time dapat tidak aktif sementara menunggu data, sehingga penentuan ukuran yang tepat sangat penting untuk menghindari pemborosan sumber daya.
- Bertujuan untuk tingkat pemanfaatan kluster target, seperti 50%, dengan menyetel:
-
maxPartitions(untuk Kafka) -
spark.sql.shuffle.partitions(untuk tahap acak)
-
- Databricks merekomendasikan pengaturan
maxPartitionssehingga setiap tugas menangani beberapa partisi Kafka untuk mengurangi overhead. - Sesuaikan slot tugas per pekerja agar sesuai dengan beban kerja untuk pekerjaan satu tahap sederhana.
- Untuk pekerjaan yang banyak memanfaatkan shuffle, lakukan eksperimen untuk menemukan jumlah minimum partisi acak yang menghindari backlog dan menyesuaikan berdasarkan hasil. Komputasi tidak akan menjadwalkan pekerjaan jika tidak memiliki slot yang cukup.
Nota
Sejak Databricks Runtime versi 16.4 LTS dan versi lebih tinggi, semua alur real-time menggunakan titik pemeriksaan v2 untuk memungkinkan peralihan yang mulus antara mode real-time dan micro-batch.
Pengoptimalan latensi
Mode real-time Streaming Terstruktur memiliki teknik opsional untuk mengurangi latensi end-to-end. Keduanya tidak diaktifkan secara default. Anda harus mengaktifkannya secara terpisah.
- Pelacakan kemajuan asinkron: Memindahkan penulisan ke offset dan log komit ke dalam utas asinkron, mengurangi waktu antar-batch untuk kueri tanpa status.
- Pemeriksaan status secara asinkron: Memulai pemrosesan mikro-batch berikutnya segera setelah komputasi selesai, tanpa menunggu pemeriksaan status, mengurangi latensi untuk kueri dengan status.
Pemantauan dan pengamatan
Dalam mode real time, metrik durasi batch tradisional tidak mencerminkan latensi end-to-end yang sebenarnya. Gunakan pendekatan di bawah ini untuk mengukur latensi secara akurat dan mengidentifikasi hambatan dalam kueri Anda.
Latensi end-to-end tergantung pada beban kerja dan terkadang hanya dapat diukur secara akurat dengan logika bisnis. Misalnya, jika tanda waktu sumber adalah output di Kafka, Anda dapat menghitung latensi sebagai perbedaan antara tanda waktu output Kafka dan tanda waktu sumber.
Metrik bawaan dengan StreamingQueryProgress
Kejadian StreamingQueryProgress ini secara otomatis masuk ke log driver dan dapat diakses melalui StreamingQueryListenerfungsi panggilan balik.onQueryProgress() Ini memungkinkan Anda untuk bereaksi terhadap peristiwa kemajuan secara terprogram, misalnya, jika Anda ingin menerbitkan metrik ke sistem pemantauan eksternal.
QueryProgressEvent.json() atau toString() sertakan metrik mode real-time ini:
-
Latensi pemrosesan (
processingLatencyMs). Waktu yang berlalu antara saat kueri mode real-time membaca sebuah rekaman dan ketika kueri menulisnya ke tahap berikutnya atau proses selanjutnya. Untuk kueri tahap tunggal, ini mengukur durasi yang sama dengan latensi ujung-ke-ujung. Sistem melaporkan metrik ini per tugas. -
Latensi antrean sumber (
sourceQueuingLatencyMs). Jumlah waktu yang berlalu antara saat sistem menulis rekaman ke bus pesan—misalnya, waktu penambahan log dalam Kafka—dan ketika kueri mode real-time pertama kali membaca rekaman. Sistem melaporkan metrik ini per tugas. -
Latensi end-to-end (
e2eLatencyMs). Waktu antara saat sistem menulis rekaman ke bus pesan dan saat kueri mode waktu nyata memproses rekaman di hilir. Sistem menggabungkan metrik ini setiap batch di semua data yang diproses oleh semua tugas.
Contohnya:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
}
}
Pengukuran latensi kustom dengan API Observe
Api Observe memungkinkan Anda mengukur latensi sebaris tanpa meluncurkan pekerjaan terpisah. Jika Anda memiliki tanda waktu sumber yang memperkirakan waktu kedatangan data sumber, Anda dapat memperkirakan latensi per batch dengan merekam tanda waktu sebelum sink dan menghitung perbedaan. Hasilnya muncul dalam laporan kemajuan dan tersedia untuk audiens.
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
Contoh output:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}