Veri işleme eşiklerini denetlemek için filigranları uygulama

Bu sayfa, filigran kavramlarını açıklar ve yaygın durum bilgisi içeren akış işlemlerinde filigranların kullanımına ilişkin öneriler sunar.

Akış sorguları zaman içinde durum verilerini birikir. Filigranlar, bellek hatalarını ve artan işleme gecikme süresini önlemek için eski durum verilerini otomatik olarak kaldırır.

Filigran nedir?

İşleme sırasında Yapılandırılmış Akış, mikro toplu işlemler arasında durumu korur. Akış sorguları, her mikro toplu işlemden sonra her şeyi yeniden derlemek yerine sonuçları artımlı olarak güncelleştirmek için durumu kullanır. Watermark’lar, bir sorgunun bir durum öğesini işlemeyi ne zaman durduracağını belirleyen eşik değerlerini kontrol eder.

Devlet kuruluşlarının yaygın örnekleri şunlardır:

  • Bir zaman penceresindeki birikimler.
  • İki akış arasındaki birleştirme işlemi için benzersiz anahtarlar.

Akışlı bir DataFrame'de filigran tanımlamak için bir zaman damgası alanı ve bir gecikme eşiği belirtin. Yeni veriler geldikçe, durum yöneticisi belirtilen alandaki en son zaman damgasını izler ve yalnızca geçlik eşiğindeki kayıtları işler.

Sorgular, her zaman eşik değeri içinde gelen kayıtları işler. Sorgular eşiğin dışına ulaşan kayıtları işlemeye devam edebilir, ancak bu garanti değildir.

Aşağıdaki örnek, pencereli sayım için 10 dakikalık bir filigran eşiği uygular.

Python

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

Scala

import org.apache.spark.sql.functions.window

df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()

Bu örnekte:

  • event_time sütunu, 10 dakikalık bir zaman damgası ve 5 dakikalık bir çarpmalı pencere tanımlamak için kullanılır.
  • Çakışmayan her 5 dakikalık zaman aralığında gözlemlenen her id için bir sayım toplanır.
  • Pencerenin sonu, en son gözlemlenen event_time değerinden 10 dakika daha eski olana kadar her sayım için durum bilgisi tutulur.

Önemli

groupBy() ve window() işleminde, olay zamanı işaretçisinin korunduğundan emin olmak için sütunlara adlarına göre, "<colName>" veya col("<colName>") başvurun. Scala'da da kullanabilirsiniz $colName.

Filigranlar işleme süresini ve aktarım hızını nasıl etkiler?

Çıktı modları, filigranları olan bir sorgunun havuza veri yazmasını denetler. Filigranlar, bellekteki toplam durum bilgisi miktarını azalttığı için durum bilgisi akışında aktarım hızı denetimi için gereklidir. Durum bilgisi olan tüm işlemler için tüm çıkış modları desteklenmez. Bkz. Pencereli toplulaştırmalar için filigranlar ve çıktı modu.

Filigran süresinin seçilmesinin dezavantajları vardır:

  • Daha kısa watermark süreleri, sorguların daha az durum bilgisi tuttuğu ve her watermark süresi tamamlandığında sonuçları yazdığı için sorgu gecikmesini azaltır. Ancak, kısa watermark'lerin geciken verilere karşı toleransı düşüktür.
  • Daha uzun watermark’ler, geç gelen verilere karşı yüksek tolerans gösterir. Ancak sorguların daha fazla durum bilgisi depolaması ve daha uzun bir filigran süresi sonrasında sonuçları yazmayı beklemesi gerektiğinden, uzun filigranlar sorgu gecikme süresini artırır.

Pencereli birleştirmeler için filigranlar ve çıktı modu

Aşağıdaki tablo, zaman damgası ve watermark üzerinde kümeleme içeren sorguların işleme davranışını göstermektedir:

Çıkış modu Davranış
Sonuna Ekle Watermark eşiği aşıldıktan sonra sorgu, satırları hedef tabloya yazar. Tüm yazma işlemleri gecikme eşiğine göre geciktirilir. Eşik geçtikten sonra eski toplama durumu bırakılır.
Güncelleştirmek Sonuçlar hesaplandığında sorgu, hedef tabloya satır yazar ve yeni veriler geldikçe sorgu satırları güncelleştirebilir ve üzerine yazabilir. Eşik geçtikten sonra eski toplama durumu bırakılır.
Tamamla Toplama durumu bırakılmaz. Sorgu, her tetikleyici için hedef tabloyu yeniden yazar.

Akış akışı birleşimleri için filigranlar ve çıkış modları

Birden çok akış arasındaki birleştirmeler yalnızca ekleme modunu destekler. Sorgular her toplu işlem için eşleşen kayıtları yazar.

İç birleştirmeler için Databricks, sorgunun eski kayıtlar için durum bilgisini silebilmesini sağlamak amacıyla her bir akış veri kaynağı için bir watermark eşiği ayarlamanızı önerir. Watermark olmadan Structured Streaming, her tetiklemede join işleminin her iki tarafındaki tüm anahtarları eşleştirmeye çalışır; bu da performansı etkileyebilir.

Dış birleştirmeler için filigranlama zorunludur. Bir kayıt eşleşmediğinde, sorgu bu anahtar için null bir değer yazar. Birleştirmeler yalnızca ekleme modunu desteklediğinden, geçlik eşiği geçene kadar eşleşmeyen kayıtlar yazılmaz.

Birden çok watermark ilkesiyle geç veri eşiğini denetleyin

Birden çok Structured Streaming girdisi için, geç gelen verilerdeki tolerans eşiklerini kontrol etmek amacıyla birden çok gecikme işareti ayarlayabilirsiniz. Filigranlar, durum bilgilerini ve gecikme süresini denetlemenize olanak sağlar.

Akış sorgusu, birleştirilmiş veya iliştirilmiş birden çok giriş akışına sahip olabilir. Durum bilgisi olan işlemler için, giriş akışlarının her biri geç veri toleransı için farklı bir eşik gerektirebilir. Her giriş akışında kullanarak withWatermark("eventTime", delay) bu eşikleri belirtin. Aşağıda stream-stream birleşimlerine sahip örnek bir sorgu verilmiştir.

Python

input_stream1 = ...      # delays up to 1 hour
input_stream2 = ...      # delays up to 2 hours

(input_stream1.withWatermark("eventTime1", "1 hour")
  .join(
    input_stream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)
)

Scala

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

Yapılandırılmış Akış, sorguyu durum bilgisi olan işlemlerle çalıştırırken her giriş akışı için en fazla olay süresini ayrı ayrı izler, ilgili gecikmeye göre filigranları hesaplar ve tek bir genel filigran belirler. Varsayılan olarak, Structured Streaming genel filigran olarak en düşük değeri kullanır. Bir akış diğer akışların gerisinde kalırsa, minimum genel watermark sorgunun verileri yanlışlıkla gecikmiş olarak işaretlemesini engeller. Örneğin, akışlardan biri yukarı akış hataları nedeniyle veri almayı durdurduğunda bu durum oluşabilir. Genel watermark, en yavaş akışın hızında güvenli bir şekilde ilerler ve gerektiğinde sorgu çıktısını geciktirir.

Gecikmeyi azaltmak için, en hızlı akışın filigranını genel filigran olarak kullanmak üzere spark.sql.streaming.multipleWatermarkPolicy değerini max olarak ayarlayın (varsayılan değer: min). Ancak, bu yapılandırma en yavaş akışlardan gelen verileri düşürür. Databricks, bu yapılandırmayı dikkatli bir şekilde uygulamanızı önerir.

Filigranları farklı işlemlere uygulama

İşlem, distinct durumundaki tüm benzersiz kayıtları izler. Watermark olmadan durum sürekli büyür ve bellek sorunlarına yol açabilir. Durumu sınırlandırmak ve eşik aşıldıktan sonra eski kayıtları kaldırmak için bir zaman damgası alanında watermark belirtin.

Aşağıdaki örnek, bir distinct işlemi üzerinde filigran uygular:

Python

streamingDf = spark.readStream. ...  # columns: eventTime, id, value, ...

# Apply watermark before distinct operation
(streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()
)

Scala

val streamingDf = spark.readStream. ...  // columns: eventTime, id, value, ...

// Apply watermark before distinct operation
streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()

Bu örnekte akış sorgusu, en son gözlemlenenden eventTimeitibaren 1 saat içinde gelen yinelenen kayıtları kaldırır. Sorgu, eşik aşıldıktan sonra tekilleştirme için durum bilgisini atar.

Önemli

Tüm sütunlar yerine belirli sütunlardaki yinelenenleri kaldırmak için distinct yerine dropDuplicates() veya dropDuplicatesWithinWatermark() kullanın. Bkz Filigran içinde yinelenenleri kaldırma.

Filigran içindeki yinelenenleri kaldır

Databricks Runtime 13.3 LTS veya sonraki sürümlerde, bir watermark eşiği içinde kayıtların yinelemelerini kaldırmak için benzersiz bir tanımlayıcı kullanabilirsiniz.

Structured Streaming, tam olarak bir kez işlemeyi garanti eder, ancak veri kaynaklarından gelen kayıtların yinelemelerini kaldırmaz. dropDuplicatesWithinWatermark, olay zamanı ya da varış saati gibi alanlar yinelenen kayıtlar arasında farklı olsa bile, herhangi bir alandaki yinelenen kayıtları kaldırmak için kullanılır.

dropDuplicatesWithinWatermark ile sorgular, watermark eşiği içinde gelen kayıtların mükerrerlerini her zaman kaldırır. Sorgular, eşik süresinin dışında gelen kayıtlar içindeki mükerrerleri de ayıklayabilir, ancak bu garanti edilmez. Sorguların tüm yinelenenleri bırakmasını garanti etmek için, eşik eşiğini yinelenen olaylar arasındaki en büyük zaman damgası farkından büyük olacak şekilde ayarlayın.

dropDuplicatesWithinWatermark yöntemini kullanmak için bir filigran belirtmelisiniz:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(Seq("guid"))

Kullanım örneği örnekleri

Aşağıdaki örneklerde gelişmiş pencere kullanımı örnekleri gösterilmektedir:

Saatlik satış toplamlarını hesaplamak için atlayan pencereleri kullanma

Atlayan pencereler, çakışmayan aralıklarla sabit boyutlu olur. Her giriş satırı tam olarak bir pencereye aittir. Saatlik satış toplamları gibi ayrık zaman aralığı toplamalarını hesaplamak için atlayan pencereleri kullanın:

Python

from pyspark.sql.functions import window, sum

hourly_sales = (orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "1 hour"))
  .agg(sum("amount").alias("total_sales"))
)

Scala

import org.apache.spark.sql.functions.{window, sum}

val hourlySales = orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "1 hour"))
  .agg(sum($"amount").alias("total_sales"))

Bu örnekte:

  • window("timestamp", "1 hour") siparişleri, sabah 5 ile 6 arası ve sabah 6 ile 7 arası gibi çakışmayan 1 saatlik aralıklarda gruplandırır.
  • withWatermark("timestamp", "1 hour") her pencerenin toplam değerini, pencerenin bitiş zaman damgası maksimum sipariş zaman damgasından 1 saat daha eski olana kadar durumda tutar.

Hareketli toplamaları hesaplamak için kayan pencereleri kullanın

Kayan pencereler, çakışabilen aralıklarla sabit boyuttadır. Tek bir satır birden çok pencereye ait olabilir. Kayan pencereleri kullanarak 6 saatlik bir zaman aralığındaki satışlar gibi sıralı toplamaları hesap edin:

Python

from pyspark.sql.functions import window, sum

rolling_sales = (orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
  .agg(sum("amount").alias("total_sales"))
)

Scala

import org.apache.spark.sql.functions.{window, sum}

val rollingSales = orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "6 hours", "1 hour"))
  .agg(sum($"amount").alias("total_sales"))

Bu örnekte:

  • window("timestamp", "6 hours", slideDuration="1 hour") siparişleri 1 saat ilerleterek 6 saatlik aralıklar halinde gruplandırma; örneğin, 05-11:00 ve 06:00 ile 12:00 arasında.
  • withWatermark("timestamp", "1 hour") pencere bitiş zaman damgası maksimum sipariş zaman damgasından 1 saat daha eski hâle gelene kadar her pencerenin toplulaştırma sonucunu durumda tutar.
  • slideDuration değerinden küçük veya eşit windowDurationolmalıdır.

Kullanıcı etkinliğini denetlemek için oturum pencerelerini kullanma

Oturum pencerelerinin boyutu sabit değildir. Bir satır geldiğinde bir pencere açılır ve yeni satır gelmeyen bir boşluk süresinin ardından kapanır. Kullanıcının 30 dakikalık bir süre içindeki sayfa görünümleri gibi uzun boş dönemler arasındaki etkinlik artışlarını toplamak için oturum pencerelerini kullanın:

Python

from pyspark.sql.functions import session_window, sum

sessionized_page_views = (activity
  .withWatermark("timestamp", "1 hour")
  .groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
  .agg(sum("page_views").alias("total_page_views"))
)

Scala

import org.apache.spark.sql.functions.{session_window, sum}

val sessionizedPageViews = activity
  .withWatermark("timestamp", "1 hour")
  .groupBy($"user_id", session_window($"timestamp", "30 minutes"))
  .agg(sum($"page_views").alias("total_page_views"))

Bu örnekte:

  • session_window("timestamp", gapDuration="30 minutes") ilk sayfa görünümü geldiğinde bir pencere açar. 30 dakika içinde gelen sonraki her sayfa görünümü pencereyi genişletir. 30 dakika içinde hiçbir sayfa görünümü gelmediğinde, pencere kapanır ve sonraki sayfa görünümü yeni bir pencere başlatır.
  • withWatermark("timestamp", "1 hour"), pencere bitiş zaman damgası maksimum sayfa görüntüleme zaman damgasından 1 saat daha eski olana kadar her oturumun birikimini durumda tutar.
  • window() ve session_window() için timeColumn bağımsız değişkeni, TimestampType veya TimestampNTZType türünde olmalıdır.
  • Pencereleri olay zamanı yerine işleme süresine göre tanımlamak için kullanın current_timestamp() .
  • Pencere sürelerini mikrosaniyeden günlere kadar ayarlayabilirsiniz. Ay süreleri ve daha uzun süreler desteklenmez.
  • Tüm pencere durumlarını süresiz olarak korumak için pencereli toplulaştırmalarla complete çıkış modunu kullanın. Büyük veri kümelerinde durum büyümesini sınırlamak ve bellek sorunlarını önlemek için uygun bir filigranla append çıkış modunu kullanın. Çıkış modu davranışı hakkında daha fazla ayrıntı için Pencereli toplamalar için watermark'lar ve çıkış modu bölümüne bakın.