Mengoptimalkan pemrosesan stateful di Tabel Langsung Delta dengan marka air
Untuk mengelola data yang disimpan dalam status secara efektif, gunakan marka air saat melakukan pemrosesan aliran stateful di Tabel Langsung Delta, termasuk agregasi, gabungan, dan deduplikasi. Artikel ini menjelaskan cara menggunakan marka air dalam kueri Tabel Langsung Delta Anda dan menyertakan contoh operasi yang direkomendasikan.
Catatan
Untuk memastikan kueri yang melakukan agregasi diproses secara bertahap dan tidak sepenuhnya dikomputasi ulang dengan setiap pembaruan, Anda harus menggunakan marka air.
Apa itu marka air?
Dalam pemrosesan aliran, marka air adalah fitur Apache Spark yang dapat menentukan ambang batas berbasis waktu untuk memproses data saat melakukan operasi stateful seperti agregasi. Data yang tiba diproses hingga ambang tercapai, di mana titik waktu yang ditentukan oleh ambang ditutup. Marka air dapat digunakan untuk menghindari masalah selama pemrosesan kueri, terutama saat memproses himpunan data yang lebih besar atau pemrosesan yang berjalan lama. Masalah ini dapat mencakup latensi tinggi dalam menghasilkan hasil dan bahkan kesalahan kehabisan memori (OOM) karena jumlah data yang disimpan dalam status selama pemrosesan. Karena data streaming secara inheren tidak diurutkan, marka air juga mendukung penghitungan operasi dengan benar seperti agregasi jendela waktu.
Untuk mempelajari selengkapnya tentang menggunakan marka air dalam pemrosesan aliran, lihat Watermarking di Streaming Terstruktur Apache Spark dan Menerapkan marka air untuk mengontrol ambang batas pemrosesan data.
Bagaimana Anda mendefinisikan marka air?
Anda menentukan marka air dengan menentukan bidang tanda waktu dan nilai yang mewakili ambang waktu agar data terlambat tiba. Data dianggap terlambat jika tiba setelah ambang waktu yang ditentukan. Misalnya, jika ambang batas didefinisikan sebagai 10 menit, rekaman yang tiba setelah ambang batas 10 menit mungkin dihilangkan.
Karena rekaman yang tiba setelah ambang batas yang ditentukan mungkin dihilangkan, memilih ambang batas yang memenuhi persyaratan latensi vs. kebenaran Anda penting. Memilih ambang yang lebih kecil menghasilkan rekaman yang dipancarkan lebih cepat tetapi juga berarti rekaman yang terlambat lebih mungkin dihilangkan. Ambang yang lebih besar berarti menunggu lebih lama tetapi mungkin lebih banyak kelengkapan data. Karena ukuran status yang lebih besar, ambang batas yang lebih besar mungkin juga memerlukan sumber daya komputasi tambahan. Karena nilai ambang batas tergantung pada persyaratan data dan pemrosesan Anda, pengujian dan pemantauan pemrosesan Anda penting untuk menentukan ambang batas yang optimal.
Anda menggunakan withWatermark()
fungsi di Python untuk menentukan marka air. Di SQL, gunakan WATERMARK
klausul untuk menentukan marka air:
Python
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Menggunakan marka air dengan gabungan stream-stream
Untuk gabungan stream-stream, Anda harus menentukan marka air di kedua sisi klausa interval waktu dan gabungan. Karena setiap sumber gabungan memiliki tampilan data yang tidak lengkap, klausa interval waktu diperlukan untuk memberi tahu mesin streaming ketika tidak ada kecocokan lebih lanjut yang dapat dilakukan. Klausa interval waktu harus menggunakan bidang yang sama yang digunakan untuk menentukan marka air.
Karena mungkin ada kalanya setiap aliran memerlukan ambang batas yang berbeda untuk marka air, aliran tidak perlu memiliki ambang yang sama. Untuk menghindari data yang hilang, mesin streaming mempertahankan satu marka air global berdasarkan aliran paling lambat.
Contoh berikut menggabungkan streaming tayangan iklan dan streaming klik pengguna pada iklan. Dalam contoh ini, klik harus terjadi dalam waktu 3 menit setelah tayangan. Setelah interval waktu 3 menit berlalu, baris dari status yang tidak dapat lagi dicocokkan akan dihilangkan.
Python
import dlt
dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(LIVE.bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Melakukan agregasi berjendela dengan marka air
Operasi stateful umum pada data streaming adalah agregasi berjendela. Agregasi berjendela mirip dengan agregasi yang dikelompokkan, kecuali bahwa nilai agregat dikembalikan untuk kumpulan baris yang merupakan bagian dari jendela yang ditentukan.
Jendela dapat didefinisikan sebagai panjang tertentu, dan operasi agregasi dapat dilakukan pada semua baris yang merupakan bagian dari jendela tersebut. Spark Streaming mendukung tiga jenis jendela:
- Jendela tumbling (tetap): Serangkaian interval waktu tetap, tidak tumpang tindih, dan berdampingan. Rekaman input hanya milik satu jendela.
- Jendela geser: Mirip dengan jendela tumbling, jendela geser berukuran tetap, tetapi jendela dapat tumpang tindih, dan rekaman dapat jatuh ke beberapa jendela.
Ketika data tiba melewati akhir jendela ditambah panjang marka air, tidak ada data baru yang diterima untuk jendela, hasil agregasi dipancarkan, dan status untuk jendela dihilangkan.
Contoh berikut menghitung jumlah tayangan setiap 5 menit menggunakan jendela tetap. Dalam contoh ini, klausa pilih menggunakan alias impressions_window
, lalu jendela itu sendiri didefinisikan sebagai bagian GROUP BY
dari klausa. Jendela harus didasarkan pada kolom tanda waktu yang sama dengan marka air, clickTimestamp
kolom dalam contoh ini.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(LIVE.silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Contoh serupa dalam Python untuk menghitung laba melalui jendela tetap per jam:
import dlt
@dlt.table()
def profit_by_hour():
return (
dlt.read_stream("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Deduplikasi rekaman streaming
Streaming Terstruktur memiliki jaminan pemrosesan sekali persis sekali tetapi tidak secara otomatis membatalkan duplikat rekaman dari sumber data. Misalnya, karena banyak antrean pesan memiliki setidaknya sekali jaminan, rekaman duplikat harus diharapkan saat membaca dari salah satu antrean pesan ini. Anda dapat menggunakan dropDuplicatesWithinWatermark()
fungsi untuk mende-duplikat rekaman pada bidang tertentu, menghapus duplikat dari aliran bahkan jika beberapa bidang berbeda (seperti waktu peristiwa atau waktu kedatangan). Anda harus menentukan marka air untuk menggunakan fungsi .dropDuplicatesWithinWatermark()
Semua data duplikat yang tiba dalam rentang waktu yang ditentukan oleh marka air dihilangkan.
Data yang diurutkan penting karena data yang tidak berurutan menyebabkan nilai marka air melompat ke depan dengan tidak benar. Kemudian, ketika data yang lebih lama tiba, data dianggap terlambat dan dihilangkan. withEventTimeOrder
Gunakan opsi untuk memproses rekam jepret awal secara berurutan berdasarkan tanda waktu yang ditentukan dalam marka air. Opsi withEventTimeOrder
dapat dideklarasikan dalam kode yang menentukan himpunan data atau dalam pengaturan alur menggunakan spark.databricks.delta.withEventTimeOrder.enabled
. Contoh:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Catatan
Opsi withEventTimeOrder
ini hanya didukung dengan Python.
Dalam contoh berikut, data diproses berdasarkan clickTimestamp
, dan rekaman yang tiba dalam waktu 5 detik satu sama lain yang berisi duplikat userId
dan clickAdId
kolom dihilangkan.
clicksDedupDf = (
spark.readStream
.option("withEventTimeOrder", "true")
.table(rawClicks)
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Mengoptimalkan konfigurasi alur untuk pemrosesan stateful
Untuk membantu mencegah masalah produksi dan latensi yang berlebihan, Databricks merekomendasikan untuk mengaktifkan manajemen status berbasis RocksDB untuk pemrosesan aliran stateful Anda, terutama jika pemrosesan Anda memerlukan penghematan sejumlah besar status menengah. Untuk mengaktifkan penyimpanan status RocksDB, lihat Mengaktifkan penyimpanan status RocksDB untuk Tabel Langsung Delta.