Mengoptimalkan pemrosesan stateful di Tabel Langsung Delta dengan marka air

Untuk mengelola data yang disimpan dalam status secara efektif, gunakan marka air saat melakukan pemrosesan aliran stateful di Tabel Langsung Delta, termasuk agregasi, gabungan, dan deduplikasi. Artikel ini menjelaskan cara menggunakan marka air dalam kueri Tabel Langsung Delta Anda dan menyertakan contoh operasi yang direkomendasikan.

Catatan

Untuk memastikan kueri yang melakukan agregasi diproses secara bertahap dan tidak sepenuhnya dikomputasi ulang dengan setiap pembaruan, Anda harus menggunakan marka air.

Apa itu marka air?

Dalam pemrosesan aliran, marka air adalah fitur Apache Spark yang dapat menentukan ambang batas berbasis waktu untuk memproses data saat melakukan operasi stateful seperti agregasi. Data yang tiba diproses hingga ambang tercapai, di mana titik waktu yang ditentukan oleh ambang ditutup. Marka air dapat digunakan untuk menghindari masalah selama pemrosesan kueri, terutama saat memproses himpunan data yang lebih besar atau pemrosesan yang berjalan lama. Masalah ini dapat mencakup latensi tinggi dalam menghasilkan hasil dan bahkan kesalahan kehabisan memori (OOM) karena jumlah data yang disimpan dalam status selama pemrosesan. Karena data streaming secara inheren tidak diurutkan, marka air juga mendukung penghitungan operasi dengan benar seperti agregasi jendela waktu.

Untuk mempelajari selengkapnya tentang menggunakan marka air dalam pemrosesan aliran, lihat Watermarking di Streaming Terstruktur Apache Spark dan Menerapkan marka air untuk mengontrol ambang batas pemrosesan data.

Bagaimana Anda mendefinisikan marka air?

Anda menentukan marka air dengan menentukan bidang tanda waktu dan nilai yang mewakili ambang waktu agar data terlambat tiba. Data dianggap terlambat jika tiba setelah ambang waktu yang ditentukan. Misalnya, jika ambang batas didefinisikan sebagai 10 menit, rekaman yang tiba setelah ambang batas 10 menit mungkin dihilangkan.

Karena rekaman yang tiba setelah ambang batas yang ditentukan mungkin dihilangkan, memilih ambang batas yang memenuhi persyaratan latensi vs. kebenaran Anda penting. Memilih ambang yang lebih kecil menghasilkan rekaman yang dipancarkan lebih cepat tetapi juga berarti rekaman yang terlambat lebih mungkin dihilangkan. Ambang yang lebih besar berarti menunggu lebih lama tetapi mungkin lebih banyak kelengkapan data. Karena ukuran status yang lebih besar, ambang batas yang lebih besar mungkin juga memerlukan sumber daya komputasi tambahan. Karena nilai ambang batas tergantung pada persyaratan data dan pemrosesan Anda, pengujian dan pemantauan pemrosesan Anda penting untuk menentukan ambang batas yang optimal.

Anda menggunakan withWatermark() fungsi di Python untuk menentukan marka air. Di SQL, gunakan WATERMARK klausul untuk menentukan marka air:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Menggunakan marka air dengan gabungan stream-stream

Untuk gabungan stream-stream, Anda harus menentukan marka air di kedua sisi klausa interval waktu dan gabungan. Karena setiap sumber gabungan memiliki tampilan data yang tidak lengkap, klausa interval waktu diperlukan untuk memberi tahu mesin streaming ketika tidak ada kecocokan lebih lanjut yang dapat dilakukan. Klausa interval waktu harus menggunakan bidang yang sama yang digunakan untuk menentukan marka air.

Karena mungkin ada kalanya setiap aliran memerlukan ambang batas yang berbeda untuk marka air, aliran tidak perlu memiliki ambang yang sama. Untuk menghindari data yang hilang, mesin streaming mempertahankan satu marka air global berdasarkan aliran paling lambat.

Contoh berikut menggabungkan streaming tayangan iklan dan streaming klik pengguna pada iklan. Dalam contoh ini, klik harus terjadi dalam waktu 3 menit setelah tayangan. Setelah interval waktu 3 menit berlalu, baris dari status yang tidak dapat lagi dicocokkan akan dihilangkan.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Melakukan agregasi berjendela dengan marka air

Operasi stateful umum pada data streaming adalah agregasi berjendela. Agregasi berjendela mirip dengan agregasi yang dikelompokkan, kecuali bahwa nilai agregat dikembalikan untuk kumpulan baris yang merupakan bagian dari jendela yang ditentukan.

Jendela dapat didefinisikan sebagai panjang tertentu, dan operasi agregasi dapat dilakukan pada semua baris yang merupakan bagian dari jendela tersebut. Spark Streaming mendukung tiga jenis jendela:

  • Jendela tumbling (tetap): Serangkaian interval waktu tetap, tidak tumpang tindih, dan berdampingan. Rekaman input hanya milik satu jendela.
  • Jendela geser: Mirip dengan jendela tumbling, jendela geser berukuran tetap, tetapi jendela dapat tumpang tindih, dan rekaman dapat jatuh ke beberapa jendela.

Ketika data tiba melewati akhir jendela ditambah panjang marka air, tidak ada data baru yang diterima untuk jendela, hasil agregasi dipancarkan, dan status untuk jendela dihilangkan.

Contoh berikut menghitung jumlah tayangan setiap 5 menit menggunakan jendela tetap. Dalam contoh ini, klausa pilih menggunakan alias impressions_window, lalu jendela itu sendiri didefinisikan sebagai bagian GROUP BY dari klausa. Jendela harus didasarkan pada kolom tanda waktu yang sama dengan marka air, clickTimestamp kolom dalam contoh ini.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Contoh serupa dalam Python untuk menghitung laba melalui jendela tetap per jam:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    dlt.read_stream("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Deduplikasi rekaman streaming

Streaming Terstruktur memiliki jaminan pemrosesan sekali persis sekali tetapi tidak secara otomatis membatalkan duplikat rekaman dari sumber data. Misalnya, karena banyak antrean pesan memiliki setidaknya sekali jaminan, rekaman duplikat harus diharapkan saat membaca dari salah satu antrean pesan ini. Anda dapat menggunakan dropDuplicatesWithinWatermark() fungsi untuk mende-duplikat rekaman pada bidang tertentu, menghapus duplikat dari aliran bahkan jika beberapa bidang berbeda (seperti waktu peristiwa atau waktu kedatangan). Anda harus menentukan marka air untuk menggunakan fungsi .dropDuplicatesWithinWatermark() Semua data duplikat yang tiba dalam rentang waktu yang ditentukan oleh marka air dihilangkan.

Data yang diurutkan penting karena data yang tidak berurutan menyebabkan nilai marka air melompat ke depan dengan tidak benar. Kemudian, ketika data yang lebih lama tiba, data dianggap terlambat dan dihilangkan. withEventTimeOrder Gunakan opsi untuk memproses rekam jepret awal secara berurutan berdasarkan tanda waktu yang ditentukan dalam marka air. Opsi withEventTimeOrder dapat dideklarasikan dalam kode yang menentukan himpunan data atau dalam pengaturan alur menggunakan spark.databricks.delta.withEventTimeOrder.enabled. Contoh:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Catatan

Opsi withEventTimeOrder ini hanya didukung dengan Python.

Dalam contoh berikut, data diproses berdasarkan clickTimestamp, dan rekaman yang tiba dalam waktu 5 detik satu sama lain yang berisi duplikat userId dan clickAdId kolom dihilangkan.

clicksDedupDf = (
  spark.readStream
    .option("withEventTimeOrder", "true")
    .table(rawClicks)
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Mengoptimalkan konfigurasi alur untuk pemrosesan stateful

Untuk membantu mencegah masalah produksi dan latensi yang berlebihan, Databricks merekomendasikan untuk mengaktifkan manajemen status berbasis RocksDB untuk pemrosesan aliran stateful Anda, terutama jika pemrosesan Anda memerlukan penghematan sejumlah besar status menengah. Untuk mengaktifkan penyimpanan status RocksDB, lihat Mengaktifkan penyimpanan status RocksDB untuk Tabel Langsung Delta.