Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Untuk mengelola data yang disimpan dalam status secara efektif, gunakan penanda waktu saat melakukan pemrosesan aliran berstatus di Lakeflow Spark Declarative Pipelines, termasuk agregasi, penggabungan, dan deduplikasi. Artikel ini menjelaskan cara menggunakan marka air dalam kueri alur Anda dan menyertakan contoh operasi yang direkomendasikan.
Nota
Untuk memastikan kueri yang melakukan agregasi diproses secara inkremental dan tidak sepenuhnya dikomputasi ulang dengan setiap pembaruan, Anda harus menggunakan penanda.
Apa itu marka air?
Dalam pemrosesan aliran, marka air adalah fitur Apache Spark yang dapat menentukan ambang batas berbasis waktu untuk memproses data saat melakukan operasi stateful seperti agregasi. Data yang tiba diproses hingga ambang tercapai, pada saat itu jendela waktu yang ditentukan oleh ambang tersebut ditutup. Watermark dapat digunakan untuk menghindari masalah selama pemrosesan query, terutama saat memproses himpunan data yang lebih besar atau pemrosesan yang berjalan lama. Masalah ini dapat mencakup latensi tinggi dalam menghasilkan hasil dan bahkan kesalahan kehabisan memori (OOM) karena jumlah data yang disimpan dalam status selama pemrosesan. Karena data streaming secara alami tidak diurutkan, penanda air juga mendukung penghitungan operasi dengan benar seperti agregasi berbasis jendela waktu.
Untuk mempelajari selengkapnya tentang menggunakan marka air dalam pemrosesan aliran, lihat Watermarking di Streaming Terstruktur Apache Spark dan Menerapkan marka air untuk mengontrol ambang batas pemrosesan data.
Bagaimana Anda mendefinisikan tanda air?
Anda menentukan marka air dengan menentukan kolom tanda waktu dan nilai yang menunjukkan ambang waktu untuk kedatangan data terlambat. Data dianggap terlambat jika tiba setelah ambang waktu yang ditentukan. Misalnya, jika ambang batas didefinisikan sebagai 10 menit, rekaman yang tiba setelah ambang batas 10 menit mungkin dihilangkan.
Karena rekaman yang tiba setelah ambang batas yang ditentukan mungkin dihilangkan, memilih ambang batas yang memenuhi persyaratan latensi vs. kebenaran Anda penting. Memilih ambang yang lebih kecil menghasilkan rekaman yang dipancarkan lebih cepat tetapi juga berarti rekaman yang terlambat lebih mungkin dihilangkan. Ambang yang lebih besar berarti menunggu lebih lama tetapi mungkin lebih banyak kelengkapan data. Karena ukuran status yang lebih besar, ambang batas yang lebih besar mungkin juga memerlukan sumber daya komputasi tambahan. Karena nilai ambang batas tergantung pada persyaratan data dan pemrosesan Anda, pengujian dan pemantauan pemrosesan Anda penting untuk menentukan ambang batas yang optimal.
Anda menggunakan fungsi withWatermark() di Python untuk menentukan tanda air. Di SQL, gunakan WATERMARK klausa untuk mendefinisikan watermark:
Phyton
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Menggunakan marka air dengan gabungan stream-stream
Untuk gabungan stream-stream, Anda harus menentukan marka air di kedua sisi gabungan dan klausa interval waktu. Karena setiap sumber gabungan memiliki tampilan data yang tidak lengkap, klausa interval waktu diperlukan untuk memberi tahu mesin streaming ketika tidak ada kecocokan lebih lanjut yang dapat dilakukan. Klausa interval waktu harus menggunakan elemen yang sama yang digunakan untuk mendefinisikan penanda waktu.
Karena mungkin ada kalanya setiap aliran memerlukan ambang batas yang berbeda untuk marka air, aliran tidak perlu memiliki ambang yang sama. Untuk menghindari data yang hilang, mesin aliran mempertahankan satu penanda global berdasarkan aliran paling lambat.
Contoh berikut menggabungkan streaming tayangan iklan dan streaming klik pengguna pada iklan. Dalam contoh ini, klik harus terjadi dalam waktu 3 menit setelah tayangan. Setelah interval waktu 3 menit berlalu, baris dari status yang tidak dapat lagi dicocokkan akan dihilangkan.
Phyton
from pyspark import pipelines as dp
dp.create_streaming_table("adImpressionClicks")
@dp.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Gunakan agregasi berjendela dengan penanda waktu
Operasi stateful yang umum pada data streaming adalah agregasi berjendela. Agregasi berjendela mirip dengan agregasi yang dikelompokkan, kecuali bahwa nilai agregat dikembalikan untuk kumpulan baris yang merupakan bagian dari jendela yang ditentukan.
Jendela dapat didefinisikan sebagai panjang tertentu, dan operasi agregasi dapat dilakukan pada semua baris yang merupakan bagian dari jendela tersebut. Spark Streaming mendukung tiga jenis jendela:
- Jendela Tumbling (tetap): Serangkaian interval waktu dengan ukuran tetap, tidak tumpang tindih, dan saling bersebelahan. Rekaman input hanya milik satu jendela.
- Jendela geser: Mirip dengan jendela bergulir, jendela geser berukuran tetap, tetapi jendela-jendela ini dapat saling tumpang tindih, dan sebuah catatan dapat masuk ke beberapa jendela.
Ketika data tiba setelah periode watermark melewati akhir jendela, tidak ada data baru yang diterima untuk jendela tersebut, hasil agregasi dihasilkan, dan status jendela dihapus.
Contoh berikut menghitung jumlah tayangan setiap 5 menit menggunakan jendela tetap. Dalam contoh ini, klausa pilih menggunakan alias impressions_window, lalu jendela itu sendiri didefinisikan sebagai bagian GROUP BY dari klausa. Jendela harus didasarkan pada kolom tanda waktu yang sama dengan marka air, kolom clickTimestamp dalam contoh ini.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, window(clickTimestamp, "5 minutes") as impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Contoh serupa dalam Python untuk menghitung laba dalam jendela waktu tetap setiap jam:
from pyspark import pipelines as dp
@dp.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Menghilangkan duplikasi rekaman streaming
Streaming Terstruktur memiliki jaminan pemrosesan tepat sekali tetapi tidak secara otomatis menghapus duplikat rekaman dari sumber data. Misalnya, karena banyak antrean pesan memiliki jaminan setidaknya sekali, rekaman duplikat sebaiknya diantisipasi saat membaca dari salah satu antrean pesan ini. Anda dapat menggunakan fungsi dropDuplicatesWithinWatermark() untuk mendeduplikat rekaman di bidang tertentu, menghapus duplikat dari aliran bahkan jika beberapa bidang berbeda (seperti waktu peristiwa atau waktu kedatangan). Anda harus menentukan tanda air untuk menggunakan fungsi dropDuplicatesWithinWatermark(). Semua data duplikat yang datang dalam rentang waktu yang ditentukan oleh penanda waktu dihapus.
Data yang diurutkan penting karena data yang tidak berurutan menyebabkan nilai penanda waktu bergerak maju secara salah. Kemudian, ketika data yang lebih lama tiba, data tersebut dinyatakan terlambat dan dihilangkan. Gunakan opsi withEventTimeOrder untuk memproses snapshot awal secara berurutan berdasarkan tanda waktu yang ditentukan dalam watermark. Opsi withEventTimeOrder dapat dideklarasikan dalam kode yang menentukan himpunan data atau dalam pengaturan alur menggunakan spark.databricks.delta.withEventTimeOrder.enabled. Contohnya:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Nota
Opsi withEventTimeOrder ini hanya didukung dengan Python.
Dalam contoh berikut, data diproses diurutkan berdasarkan clickTimestamp, dan rekaman yang tiba dalam jarak 5 detik satu sama lain serta berisi kolom userId dan clickAdId yang duplikat akan dihapus.
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Mengoptimalkan konfigurasi alur untuk pemrosesan berstatus
Untuk membantu mencegah masalah produksi dan latensi yang berlebihan, Databricks merekomendasikan mengaktifkan manajemen keadaan berbasis RocksDB pada pemrosesan aliran yang bersifat stateful Anda, terutama jika pemrosesan tersebut memerlukan penyimpanan sejumlah besar keadaan menengah.
Alur tanpa server secara otomatis mengelola konfigurasi penyimpanan status.
Anda dapat mengaktifkan manajemen status berbasis RocksDB dengan mengatur konfigurasi berikut sebelum menyebarkan alur:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
Untuk mempelajari selengkapnya tentang penyimpanan status RocksDB, termasuk rekomendasi konfigurasi untuk RocksDB, lihat Mengonfigurasi penyimpanan status RocksDB di Azure Databricks.