Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Halaman ini menjelaskan konsep dasar penandaan air dan menyediakan rekomendasi mengenai penggunaan penandaan air dalam operasi streaming stateful umum. Anda harus menerapkan marka air ke operasi streaming stateful untuk menghindari perluasan jumlah data yang disimpan dalam state secara tak terbatas, yang dapat menyebabkan masalah memori atau meningkatkan latensi pemrosesan selama operasi streaming yang berjalan lama.
Apa itu marka air?
Streaming Terstruktur menggunakan penanda waktu untuk mengontrol ambang batas durasi berapa lama pembaruan akan diproses untuk entitas status tertentu. Contoh umum entitas negara meliputi:
- Agregasi dalam rentang waktu.
- Kunci unik dalam penggabungan antara dua aliran data.
Saat Anda mendeklarasikan marka air, Anda menentukan bidang stempel waktu dan ambang marka air pada DataFrame streaming. Saat data baru tiba, pengelola status melacak cap waktu terbaru di bidang yang ditentukan dan memproses semua rekaman dalam ambang batas keterlambatan.
Contoh berikut menerapkan ambang batas marka air 10 menit ke jumlah berjendela:
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Dalam contoh ini:
- Kolom
event_timedigunakan untuk menentukan marka air 10 menit dan jendela bergulir 5 menit. - Jumlah dihimpun untuk setiap
idyang diamati dalam jendela waktu 5 menit yang tidak tumpang tindih. - Informasi status dipertahankan untuk setiap hitungan hingga akhir jendela lebih lama 10 menit dari
event_timeterbaru yang diamati.
Penting
Ambang batas marka air menjamin bahwa rekaman yang tiba dalam ambang yang ditentukan diproses sesuai dengan semantik kueri yang ditentukan. Rekaman yang terlambat tiba di luar ambang yang ditentukan mungkin masih diproses menggunakan metrik kueri, tetapi ini tidak dijamin.
Bagaimana marka air memengaruhi waktu dan throughput pemrosesan?
Watermark berinteraksi dengan mode keluaran untuk mengontrol kapan data ditulis ke sink. Karena tanda air mengurangi total informasi status yang diproses, penggunaan tanda air yang efektif sangat penting untuk kapasitas aliran data (throughput) streaming stateful yang efisien.
Catatan
Tidak semua mode keluaran didukung untuk seluruh operasi berbasis status.
Tanda air dan mode output untuk agregasi berbingkai
Tabel berikut ini merinci pemrosesan kueri dengan agregasi pada cap waktu dengan watermark yang ditentukan.
| Mode output | Perilaku |
|---|---|
| Lampirkan | Baris ditambahkan ke tabel target setelah melewati ambang batas watermark. Semua penulisan data ditunda berdasarkan ambang keterlambatan. Status agregasi lama dihilangkan setelah ambang batas berlalu. |
| Pemutakhiran | Baris ditulis ke tabel target saat hasil dihitung, dan dapat ditimpa dan diperbarui saat data baru tiba. Status agregasi lama dihilangkan setelah ambang batas berlalu. |
| Selesai | Status agregasi tidak dihilangkan. Tabel target ditulis ulang setiap kali trigger diaktifkan. |
Tanda air dan output untuk penggabungan stream-stream
Gabungan antara beberapa aliran hanya mendukung mode penampingan, dan rekaman yang cocok ditulis di setiap batch yang ditemukan. Untuk join dalam, Databricks merekomendasikan pengaturan batas watermark pada setiap sumber data streaming. Ini memungkinkan informasi status dibuang untuk rekaman lama. Tanpa watermark, Structured Streaming mencoba menggabungkan setiap kunci dari kedua sisi join pada setiap pemicu.
Streaming Terstruktur memiliki semantik khusus untuk mendukung join luar. Watermarking wajib untuk gabungan luar, karena menunjukkan kapan kunci harus ditulis dengan nilai null setelah tidak cocok. Meskipun gabungan luar dapat berguna untuk merekam rekaman yang tidak pernah cocok selama pemrosesan data, karena gabungan hanya menulis ke tabel sebagai operasi penambahan, data yang hilang ini tidak direkam sampai setelah ambang keterlambatan berlalu.
Mengontrol ambang batas data terlambat dengan kebijakan multi watermark dalam Streaming Tersusun
Saat bekerja dengan beberapa input Streaming Terstruktur, Anda dapat mengatur beberapa marka air untuk mengontrol ambang toleransi untuk data yang terlambat tiba. Mengonfigurasi penanda memungkinkan Anda untuk mengontrol informasi status dan memengaruhi latensi.
Suatu kueri streaming dapat memiliki beberapa aliran input yang disatukan atau digabungkan bersama. Masing-masing aliran input dapat memiliki ambang data akhir yang berbeda yang perlu ditoleransi untuk operasi stateful. Tentukan ambang batas ini menggunakan withWatermarks("eventTime", delay) pada setiap aliran input. Berikut ini adalah contoh kueri dengan penggabungan stream-stream.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Saat menjalankan kueri, Streaming Terstruktur secara tersendiri melacak waktu peristiwa maksimum yang terlihat di setiap aliran input, menghitung tanda air berdasarkan penundaan yang ada, dan memilih satu tanda air global yang akan digunakan untuk operasi stateful. Secara bawaan, minimum dipilih sebagai marka air global karena mencegah data dianggap terlalu lambat dan diabaikan secara tidak sengaja jika salah satu aliran berada di belakang yang lain (misalnya, salah satu aliran berhenti menerima data karena kegagalan hilir). Dengan kata lain, marka air global bergerak dengan aman pada kecepatan aliran paling lambat, dan sebagai akibatnya, output kueri ditunda.
Jika Anda ingin mendapatkan hasil yang lebih cepat, Anda dapat mengatur beberapa kebijakan marka air untuk memilih nilai maksimum sebagai marka air global dengan mengatur konfigurasi SQL spark.sql.streaming.multipleWatermarkPolicy ke max (defaultnya adalah min). Ini memungkinkan marka air global bergerak pada kecepatan aliran tercepat. Namun, konfigurasi ini menghilangkan data dari aliran terlambat. Databricks merekomendasikan penggunaan konfigurasi ini secara bijaksana.
Menerapkan marka air ke operasi yang berbeda
distinct Operator ini adalah operator stateful yang memerlukan watermark untuk mencegah pertumbuhan status yang tidak terbatas. Tanpa marka air, Streaming Terstruktur mencoba melacak setiap rekaman unik tanpa batas waktu, yang dapat menyebabkan masalah memori atau peningkatan latensi pemrosesan.
Saat Anda menerapkan distinct ke DataFrame streaming, Anda harus menentukan watermark pada bidang tanda waktu. Tanda air mengontrol berapa lama pengelola status mempertahankan rekaman untuk deduplikasi. Setelah ambang batas tanda air terlampaui, rekaman lama dihapus dari penyimpanan.
Contoh berikut menerapkan marka air pada operasi distinct.
Python
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
Scala
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
Dalam contoh ini, rekaman duplikat yang tiba dalam waktu 1 jam dari yang teramati terbaru eventTime dihapus dari stream. Informasi status untuk deduplikasi dihapus setelah ambang batas terlampaui.
Penting
Jika Anda perlu mendeduplikasi pada kolom tertentu daripada semua kolom, gunakan dropDuplicates() atau dropDuplicatesWithinWatermark() alih-alih distinct. Lihat bagian berikutnya untuk detailnya.
Menghilangkan duplikat dalam tanda air
Di Databricks Runtime 13.3 LTS atau yang lebih baru, Anda dapat menghapus duplikasi rekaman dalam batas ambang watermark menggunakan pengidentifikasi unik.
Streaming Terstruktur menyediakan jaminan pemrosesan tepat-satu-kali, tetapi tidak secara otomatis menghilangkan duplikasi rekaman dari sumber data. Anda dapat menggunakan dropDuplicatesWithinWatermark untuk menghapus duplikasi rekaman berdasarkan bidang tertentu, memungkinkan Anda menghapus duplikat dari aliran meskipun beberapa bidang berbeda (misalnya waktu peristiwa atau waktu kedatangan).
Rekaman duplikat yang tiba dalam ambang batas waktu yang ditentukan dijamin dihapus. Jaminan ini hanya ketat dalam satu arah saja, dan rekaman duplikat yang tiba melebihi batas yang ditentukan mungkin akan dihapus. Anda harus mengatur batas waktu penundaan penanda air lebih lama dari selisih waktu maksimum di antara peristiwa duplikat untuk menghapus semua duplikat.
Anda harus menentukan marka air untuk menggunakan metode dropDuplicatesWithinWatermark, seperti dalam contoh berikut:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))