Bagikan melalui


Menerapkan marka air untuk mengontrol ambang batas pemrosesan data

Artikel ini memperkenalkan konsep dasar marka air dan memberikan rekomendasi untuk menggunakan marka air dalam operasi streaming stateful umum. Anda harus menerapkan marka air ke operasi streaming stateful untuk menghindari perluasan jumlah data yang disimpan dalam status tak terbatas, yang dapat memperkenalkan masalah memori dan meningkatkan latensi pemrosesan selama operasi streaming yang berjalan lama.

Apa itu marka air?

Streaming Terstruktur menggunakan marka air untuk mengontrol ambang batas berapa lama untuk terus memproses pembaruan untuk entitas status tertentu. Contoh umum entitas negara meliputi:

  • Agregasi selama jendela waktu.
  • Kunci unik dalam gabungan antara dua aliran.

Saat Anda mendeklarasikan marka air, Anda menentukan bidang tanda waktu dan ambang marka air pada DataFrame streaming. Saat data baru tiba, manajer status melacak tanda waktu terbaru di bidang yang ditentukan dan memproses semua rekaman dalam ambang keterlambatan.

Contoh berikut menerapkan ambang batas marka air 10 menit ke jumlah berjendela:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

Dalam contoh ini:

  • Kolom event_time digunakan untuk menentukan marka air 10 menit dan jendela tumbling 5 menit.
  • Jumlah dikumpulkan untuk setiap id yang diamati untuk setiap jendela 5 menit yang tidak tumpang tindih.
  • Informasi status dipertahankan untuk setiap hitungan hingga akhir jendela lebih lama 10 menit dari yang terbaru diamati event_time.

Penting

Ambang batas marka air menjamin bahwa rekaman yang tiba dalam ambang yang ditentukan diproses sesuai dengan semantik kueri yang ditentukan. Rekaman yang terlambat tiba di luar ambang yang ditentukan mungkin masih diproses menggunakan metrik kueri, tetapi ini tidak dijamin.

Bagaimana marka air memengaruhi waktu dan throughput pemrosesan?

Marka air berinteraksi dengan mode output untuk mengontrol kapan data ditulis ke sink. Karena marka air mengurangi jumlah total informasi status yang akan diproses, penggunaan marka air yang efektif sangat penting untuk throughput streaming stateful yang efisien.

Catatan

Tidak semua mode output didukung untuk semua operasi stateful.

Marka air dan mode output untuk agregasi berjendela

Tabel berikut ini merinci pemrosesan kueri dengan agregasi pada tanda waktu dengan marka air yang ditentukan:

Mode output Perilaku
Lampirkan Baris ditulis ke tabel target setelah ambang batas marka air berlalu. Semua penulisan tertunda berdasarkan ambang keterlambatan. Status agregasi lama dihilangkan setelah ambang batas berlalu.
Pembaruan Baris ditulis ke tabel target saat hasil dihitung, dan dapat diperbarui dan ditimpa saat data baru tiba. Status agregasi lama dihilangkan setelah ambang batas berlalu.
Selesai Status agregasi tidak dihilangkan. Tabel target ditulis ulang dengan setiap pemicu.

Marka air dan output untuk gabungan stream-stream

Gabungan antara beberapa aliran hanya mendukung mode penampingan, dan rekaman yang cocok ditulis di setiap batch yang ditemukan. Untuk gabungan dalam, Databricks merekomendasikan pengaturan ambang marka air pada setiap sumber data streaming. Ini memungkinkan informasi status dibuang untuk rekaman lama. Tanpa marka air, Streaming Terstruktur mencoba menggabungkan setiap kunci dari kedua sisi gabungan dengan setiap pemicu.

Streaming Terstruktur memiliki semantik khusus untuk mendukung gabungan luar. Watermarking wajib untuk gabungan luar, karena menunjukkan kapan kunci harus ditulis dengan nilai null setelah tidak cocok. Perhatikan bahwa meskipun gabungan luar dapat berguna untuk merekam rekaman yang tidak pernah cocok selama pemrosesan data, karena gabungan hanya menulis ke tabel sebagai operasi penambahan, data yang hilang ini tidak direkam sampai setelah ambang keterlambatan berlalu.

Mengontrol ambang batas data terlambat dengan beberapa kebijakan marka air dalam Streaming Terstruktur

Saat bekerja dengan beberapa input Streaming Terstruktur, Anda dapat mengatur beberapa marka air untuk mengontrol ambang toleransi untuk data yang terlambat tiba. Mengonfigurasi marka air memungkinkan Anda mengontrol informasi status dan memengaruhi latensi.

Suatu kueri streaming dapat memiliki beberapa aliran input yang disatukan atau digabungkan bersama. Masing-masing aliran input dapat memiliki ambang data akhir yang berbeda yang perlu ditoleransi untuk operasi stateful. Tentukan ambang batas ini menggunakan withWatermarks("eventTime", delay) pada setiap aliran input. Berikut ini adalah contoh kueri dengan gabungan stream-stream.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

Saat menjalankan kueri, Streaming Terstruktur secara individual melacak waktu peristiwa maksimum yang terlihat di setiap aliran input, menghitung marka air berdasarkan penundaan yang sesuai, dan memilih satu marka air global yang akan digunakan untuk operasi stateful. Secara default, yang terendah akan dipilih sebagai watermark global karena untuk memastikan bahwa tidak ada data yang secara tidak sengaja dijatuhkan terlambat jika salah satu aliran tertinggal di belakang yang lain (misalnya, salah satu aliran berhenti menerima data karena kegagalan upstram). Dengan kata lain, marka air global bergerak dengan aman dengan kecepatan aliran terlambat dan output kueri ditunda.

Jika Anda ingin mendapatkan hasil yang lebih cepat, Anda dapat mengatur beberapa kebijakan marka air untuk memilih nilai maksimum sebagai marka air global dengan mengatur konfigurasi spark.sql.streaming.multipleWatermarkPolicy SQL ke max (defaultnya adalah min). Ini memungkinkan watermark global bergerak dengan kecepatan aliran tercepat. Namun, konfigurasi ini menghilangkan data dari aliran terlambat. Karena itu, Databricks merekomendasikan agar Anda menggunakan konfigurasi ini secara peradilan.

Menghilangkan duplikat dalam marka air

Di Databricks Runtime 13.3 LTS ke atas, Anda dapat mendeduplikasi rekaman dalam ambang marka air menggunakan pengidentifikasi unik.

Streaming Terstruktur menyediakan jaminan pemrosesan tepat sekali, tetapi tidak secara otomatis mendeduplikasi rekaman dari sumber data. Anda dapat menggunakan dropDuplicatesWithinWatermark untuk mendeduplikasi rekaman pada bidang tertentu, memungkinkan Anda menghapus duplikat dari aliran meskipun beberapa bidang berbeda (seperti waktu peristiwa atau waktu kedatangan).

Rekaman duplikat yang tiba dalam marka air yang ditentukan dijamin akan dihilangkan. Jaminan ini hanya ketat dalam satu arah, dan rekaman duplikat yang tiba di luar ambang yang ditentukan mungkin juga dihilangkan. Anda harus mengatur ambang penundaan marka air lebih lama dari perbedaan tanda waktu maksimum di antara peristiwa duplikat untuk menghapus semua duplikat.

Anda harus menentukan marka air untuk menggunakan dropDuplicatesWithinWatermark metode , seperti dalam contoh berikut:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")