Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfa, filigran kavramlarını açıklar ve yaygın durum bilgisi içeren akış işlemlerinde filigranların kullanımına ilişkin öneriler sunar.
Akış sorguları zaman içinde durum verilerini birikir. Filigranlar, bellek hatalarını ve artan işleme gecikme süresini önlemek için eski durum verilerini otomatik olarak kaldırır.
Filigran nedir?
İşleme sırasında Yapılandırılmış Akış, mikro toplu işlemler arasında durumu korur. Akış sorguları, her mikro toplu işlemden sonra her şeyi yeniden derlemek yerine sonuçları artımlı olarak güncelleştirmek için durumu kullanır. Watermark’lar, bir sorgunun bir durum öğesini işlemeyi ne zaman durduracağını belirleyen eşik değerlerini kontrol eder.
Devlet kuruluşlarının yaygın örnekleri şunlardır:
- Bir zaman penceresindeki birikimler.
- İki akış arasındaki birleştirme işlemi için benzersiz anahtarlar.
Akışlı bir DataFrame'de filigran tanımlamak için bir zaman damgası alanı ve bir gecikme eşiği belirtin. Yeni veriler geldikçe, durum yöneticisi belirtilen alandaki en son zaman damgasını izler ve yalnızca geçlik eşiğindeki kayıtları işler.
Sorgular, her zaman eşik değeri içinde gelen kayıtları işler. Sorgular eşiğin dışına ulaşan kayıtları işlemeye devam edebilir, ancak bu garanti değildir.
Aşağıdaki örnek, pencereli sayım için 10 dakikalık bir filigran eşiği uygular.
Python
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Scala
import org.apache.spark.sql.functions.window
df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
Bu örnekte:
-
event_timesütunu, 10 dakikalık bir zaman damgası ve 5 dakikalık bir çarpmalı pencere tanımlamak için kullanılır. - Çakışmayan her 5 dakikalık zaman aralığında gözlemlenen her
idiçin bir sayım toplanır. - Pencerenin sonu, en son gözlemlenen
event_timedeğerinden 10 dakika daha eski olana kadar her sayım için durum bilgisi tutulur.
Önemli
groupBy() ve window() işleminde, olay zamanı işaretçisinin korunduğundan emin olmak için sütunlara adlarına göre, "<colName>" veya col("<colName>") başvurun. Scala'da da kullanabilirsiniz $colName.
Filigranlar işleme süresini ve aktarım hızını nasıl etkiler?
Çıktı modları, filigranları olan bir sorgunun havuza veri yazmasını denetler. Filigranlar, bellekteki toplam durum bilgisi miktarını azalttığı için durum bilgisi akışında aktarım hızı denetimi için gereklidir. Durum bilgisi olan tüm işlemler için tüm çıkış modları desteklenmez. Bkz. Pencereli toplulaştırmalar için filigranlar ve çıktı modu.
Filigran süresinin seçilmesinin dezavantajları vardır:
- Daha kısa watermark süreleri, sorguların daha az durum bilgisi tuttuğu ve her watermark süresi tamamlandığında sonuçları yazdığı için sorgu gecikmesini azaltır. Ancak, kısa watermark'lerin geciken verilere karşı toleransı düşüktür.
- Daha uzun watermark’ler, geç gelen verilere karşı yüksek tolerans gösterir. Ancak sorguların daha fazla durum bilgisi depolaması ve daha uzun bir filigran süresi sonrasında sonuçları yazmayı beklemesi gerektiğinden, uzun filigranlar sorgu gecikme süresini artırır.
Pencereli birleştirmeler için filigranlar ve çıktı modu
Aşağıdaki tablo, zaman damgası ve watermark üzerinde kümeleme içeren sorguların işleme davranışını göstermektedir:
| Çıkış modu | Davranış |
|---|---|
| Sonuna Ekle | Watermark eşiği aşıldıktan sonra sorgu, satırları hedef tabloya yazar. Tüm yazma işlemleri gecikme eşiğine göre geciktirilir. Eşik geçtikten sonra eski toplama durumu bırakılır. |
| Güncelleştirmek | Sonuçlar hesaplandığında sorgu, hedef tabloya satır yazar ve yeni veriler geldikçe sorgu satırları güncelleştirebilir ve üzerine yazabilir. Eşik geçtikten sonra eski toplama durumu bırakılır. |
| Tamamla | Toplama durumu bırakılmaz. Sorgu, her tetikleyici için hedef tabloyu yeniden yazar. |
Akış akışı birleşimleri için filigranlar ve çıkış modları
Birden çok akış arasındaki birleştirmeler yalnızca ekleme modunu destekler. Sorgular her toplu işlem için eşleşen kayıtları yazar.
İç birleştirmeler için Databricks, sorgunun eski kayıtlar için durum bilgisini silebilmesini sağlamak amacıyla her bir akış veri kaynağı için bir watermark eşiği ayarlamanızı önerir. Watermark olmadan Structured Streaming, her tetiklemede join işleminin her iki tarafındaki tüm anahtarları eşleştirmeye çalışır; bu da performansı etkileyebilir.
Dış birleştirmeler için filigranlama zorunludur. Bir kayıt eşleşmediğinde, sorgu bu anahtar için null bir değer yazar. Birleştirmeler yalnızca ekleme modunu desteklediğinden, geçlik eşiği geçene kadar eşleşmeyen kayıtlar yazılmaz.
Birden çok watermark ilkesiyle geç veri eşiğini denetleyin
Birden çok Structured Streaming girdisi için, geç gelen verilerdeki tolerans eşiklerini kontrol etmek amacıyla birden çok gecikme işareti ayarlayabilirsiniz. Filigranlar, durum bilgilerini ve gecikme süresini denetlemenize olanak sağlar.
Akış sorgusu, birleştirilmiş veya iliştirilmiş birden çok giriş akışına sahip olabilir. Durum bilgisi olan işlemler için, giriş akışlarının her biri geç veri toleransı için farklı bir eşik gerektirebilir. Her giriş akışında kullanarak withWatermark("eventTime", delay) bu eşikleri belirtin. Aşağıda stream-stream birleşimlerine sahip örnek bir sorgu verilmiştir.
Python
input_stream1 = ... # delays up to 1 hour
input_stream2 = ... # delays up to 2 hours
(input_stream1.withWatermark("eventTime1", "1 hour")
.join(
input_stream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
)
Scala
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Yapılandırılmış Akış, sorguyu durum bilgisi olan işlemlerle çalıştırırken her giriş akışı için en fazla olay süresini ayrı ayrı izler, ilgili gecikmeye göre filigranları hesaplar ve tek bir genel filigran belirler. Varsayılan olarak, Structured Streaming genel filigran olarak en düşük değeri kullanır. Bir akış diğer akışların gerisinde kalırsa, minimum genel watermark sorgunun verileri yanlışlıkla gecikmiş olarak işaretlemesini engeller. Örneğin, akışlardan biri yukarı akış hataları nedeniyle veri almayı durdurduğunda bu durum oluşabilir. Genel watermark, en yavaş akışın hızında güvenli bir şekilde ilerler ve gerektiğinde sorgu çıktısını geciktirir.
Gecikmeyi azaltmak için, en hızlı akışın filigranını genel filigran olarak kullanmak üzere spark.sql.streaming.multipleWatermarkPolicy değerini max olarak ayarlayın (varsayılan değer: min). Ancak, bu yapılandırma en yavaş akışlardan gelen verileri düşürür. Databricks, bu yapılandırmayı dikkatli bir şekilde uygulamanızı önerir.
Filigranları farklı işlemlere uygulama
İşlem, distinct durumundaki tüm benzersiz kayıtları izler. Watermark olmadan durum sürekli büyür ve bellek sorunlarına yol açabilir. Durumu sınırlandırmak ve eşik aşıldıktan sonra eski kayıtları kaldırmak için bir zaman damgası alanında watermark belirtin.
Aşağıdaki örnek, bir distinct işlemi üzerinde filigran uygular:
Python
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
Scala
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
Bu örnekte akış sorgusu, en son gözlemlenenden eventTimeitibaren 1 saat içinde gelen yinelenen kayıtları kaldırır. Sorgu, eşik aşıldıktan sonra tekilleştirme için durum bilgisini atar.
Önemli
Tüm sütunlar yerine belirli sütunlardaki yinelenenleri kaldırmak için distinct yerine dropDuplicates() veya dropDuplicatesWithinWatermark() kullanın. Bkz Filigran içinde yinelenenleri kaldırma.
Filigran içindeki yinelenenleri kaldır
Databricks Runtime 13.3 LTS veya sonraki sürümlerde, bir watermark eşiği içinde kayıtların yinelemelerini kaldırmak için benzersiz bir tanımlayıcı kullanabilirsiniz.
Structured Streaming, tam olarak bir kez işlemeyi garanti eder, ancak veri kaynaklarından gelen kayıtların yinelemelerini kaldırmaz.
dropDuplicatesWithinWatermark, olay zamanı ya da varış saati gibi alanlar yinelenen kayıtlar arasında farklı olsa bile, herhangi bir alandaki yinelenen kayıtları kaldırmak için kullanılır.
dropDuplicatesWithinWatermark ile sorgular, watermark eşiği içinde gelen kayıtların mükerrerlerini her zaman kaldırır. Sorgular, eşik süresinin dışında gelen kayıtlar içindeki mükerrerleri de ayıklayabilir, ancak bu garanti edilmez. Sorguların tüm yinelenenleri bırakmasını garanti etmek için, eşik eşiğini yinelenen olaylar arasındaki en büyük zaman damgası farkından büyük olacak şekilde ayarlayın.
dropDuplicatesWithinWatermark yöntemini kullanmak için bir filigran belirtmelisiniz:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))
Kullanım örneği örnekleri
Aşağıdaki örneklerde gelişmiş pencere kullanımı örnekleri gösterilmektedir:
Saatlik satış toplamlarını hesaplamak için atlayan pencereleri kullanma
Atlayan pencereler, çakışmayan aralıklarla sabit boyutlu olur. Her giriş satırı tam olarak bir pencereye aittir. Saatlik satış toplamları gibi ayrık zaman aralığı toplamalarını hesaplamak için atlayan pencereleri kullanın:
Python
from pyspark.sql.functions import window, sum
hourly_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val hourlySales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
Bu örnekte:
-
window("timestamp", "1 hour")siparişleri, sabah 5 ile 6 arası ve sabah 6 ile 7 arası gibi çakışmayan 1 saatlik aralıklarda gruplandırır. -
withWatermark("timestamp", "1 hour")her pencerenin toplam değerini, pencerenin bitiş zaman damgası maksimum sipariş zaman damgasından 1 saat daha eski olana kadar durumda tutar.
Hareketli toplamaları hesaplamak için kayan pencereleri kullanın
Kayan pencereler, çakışabilen aralıklarla sabit boyuttadır. Tek bir satır birden çok pencereye ait olabilir. Kayan pencereleri kullanarak 6 saatlik bir zaman aralığındaki satışlar gibi sıralı toplamaları hesap edin:
Python
from pyspark.sql.functions import window, sum
rolling_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val rollingSales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "6 hours", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
Bu örnekte:
-
window("timestamp", "6 hours", slideDuration="1 hour")siparişleri 1 saat ilerleterek 6 saatlik aralıklar halinde gruplandırma; örneğin, 05-11:00 ve 06:00 ile 12:00 arasında. -
withWatermark("timestamp", "1 hour")pencere bitiş zaman damgası maksimum sipariş zaman damgasından 1 saat daha eski hâle gelene kadar her pencerenin toplulaştırma sonucunu durumda tutar. -
slideDurationdeğerinden küçük veya eşitwindowDurationolmalıdır.
Kullanıcı etkinliğini denetlemek için oturum pencerelerini kullanma
Oturum pencerelerinin boyutu sabit değildir. Bir satır geldiğinde bir pencere açılır ve yeni satır gelmeyen bir boşluk süresinin ardından kapanır. Kullanıcının 30 dakikalık bir süre içindeki sayfa görünümleri gibi uzun boş dönemler arasındaki etkinlik artışlarını toplamak için oturum pencerelerini kullanın:
Python
from pyspark.sql.functions import session_window, sum
sessionized_page_views = (activity
.withWatermark("timestamp", "1 hour")
.groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
.agg(sum("page_views").alias("total_page_views"))
)
Scala
import org.apache.spark.sql.functions.{session_window, sum}
val sessionizedPageViews = activity
.withWatermark("timestamp", "1 hour")
.groupBy($"user_id", session_window($"timestamp", "30 minutes"))
.agg(sum($"page_views").alias("total_page_views"))
Bu örnekte:
-
session_window("timestamp", gapDuration="30 minutes")ilk sayfa görünümü geldiğinde bir pencere açar. 30 dakika içinde gelen sonraki her sayfa görünümü pencereyi genişletir. 30 dakika içinde hiçbir sayfa görünümü gelmediğinde, pencere kapanır ve sonraki sayfa görünümü yeni bir pencere başlatır. -
withWatermark("timestamp", "1 hour"), pencere bitiş zaman damgası maksimum sayfa görüntüleme zaman damgasından 1 saat daha eski olana kadar her oturumun birikimini durumda tutar. -
window()vesession_window()içintimeColumnbağımsız değişkeni,TimestampTypeveyaTimestampNTZTypetüründe olmalıdır. - Pencereleri olay zamanı yerine işleme süresine göre tanımlamak için kullanın
current_timestamp(). - Pencere sürelerini mikrosaniyeden günlere kadar ayarlayabilirsiniz. Ay süreleri ve daha uzun süreler desteklenmez.
- Tüm pencere durumlarını süresiz olarak korumak için pencereli toplulaştırmalarla
completeçıkış modunu kullanın. Büyük veri kümelerinde durum büyümesini sınırlamak ve bellek sorunlarını önlemek için uygun bir filigranlaappendçıkış modunu kullanın. Çıkış modu davranışı hakkında daha fazla ayrıntı için Pencereli toplamalar için watermark'lar ve çıkış modu bölümüne bakın.