Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Na tej stronie opisano pojęcia związane z znakami wodnymi i przedstawiono zalecenia dotyczące używania znaków wodnych w typowych operacjach przesyłania strumieniowego stanowego.
Zapytania przesyłane strumieniowo gromadzą dane stanu w czasie. Znaki wodne automatycznie usuwają stare dane stanu, aby zapobiec błędom pamięci i zwiększonemu opóźnieniu przetwarzania.
Co to jest znak wodny?
Podczas przetwarzania Structured Streaming utrzymuje stan między mikropartiami. Zapytania strumieniowe wykorzystują stan do przyrostowego aktualizowania wyników zamiast ponownego przeliczania wszystkiego po każdej mikropartii. Znaki wodne kontrolują próg, gdy zapytanie zatrzymuje przetwarzanie jednostki stanu.
Typowe przykłady jednostek stanu to:
- Agregacje w przedziale czasu.
- Unikatowe klucze w sprzężeniach między dwoma strumieniami.
Aby zadeklarować znak wodny w ramce danych przesyłania strumieniowego, określ pole znacznika czasu i próg opóźnienia. Po nadejściu nowych danych menedżer stanu śledzi najnowszy znacznik czasu w określonym polu i przetwarza tylko rekordy w ramach progu opóźnienia.
Zapytania zawsze przetwarzają rekordy, które napływają w wyznaczonym progu. Zapytania mogą nadal przetwarzać rekordy, które docierają poza tym progiem, ale nie jest to gwarantowane.
Poniższy przykład stosuje próg limitu 10 minut do liczby okien:
Python
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Scala
import org.apache.spark.sql.functions.window
df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
W tym przykładzie:
- Kolumna
event_timesłuży do definiowania 10-minutowego znacznika wody i 5-minutowego przewijanego okna. - Zliczana jest liczba dla każdego zaobserwowanego elementu
idw każdym nienakładającym się 5-minutowym oknie. - Informacje o stanie są przechowywane dla każdego zliczenia do momentu, gdy koniec okna będzie o 10 minut wcześniejszy niż najnowsze obserwowane
event_time.
Ważne
W operacji groupBy() i window() odwołuj się do kolumn według nazwy, "<colName>" lub col("<colName>"), aby upewnić się, że znacznik czasu zdarzenia zostanie zachowany. W języku Scala można również użyć polecenia $colName.
Jak znaki wodne wpływają na czas przetwarzania i przepływność?
Tryby wyjścia określają, kiedy zapytanie ze znacznikami wodnymi zapisuje dane do ujścia. Znaki wodne są niezbędne do kontrolowania przepływności w strumieniu stanowym, ponieważ zmniejszają łączną ilość informacji o stanie w pamięci. Nie wszystkie tryby wyjściowe są obsługiwane dla wszystkich operacji stanowych. Zobacz Znaczniki wodne i tryb wyjścia dla agregacji okienkowych.
Wybór czasu trwania znaku wodnego wiąże się z pewnymi kompromisami:
- Krótsze znaczniki wodne obniżają opóźnienie zapytań, ponieważ zapytania przechowują mniej informacji o stanie i zapisują wyniki po upływie każdego interwału znacznika wodnego. Jednak niewielkie watermarki słabo tolerują opóźnione dane.
- Dłuższe znaki wodne mają wysoką tolerancję późnych danych. Jednak długie watermarki zwiększają opóźnienia zapytań, ponieważ zapytania muszą przechowywać więcej informacji o stanie i czekać z zapisaniem wyników do upływu dłuższego czasu watermarku.
Znaczniki wodne i tryb wyjściowy dla agregacji okienkowych
W poniższej tabeli przedstawiono zachowanie przetwarzania zapytań z agregacją na znaczniku czasu i znakiem wodnym:
| Tryb wyjściowy | Zachowanie |
|---|---|
| Dołączanie | Zapytanie zapisuje wiersze do tabeli docelowej po przekroczeniu progu watermark. Wszystkie zapisy są opóźnione na podstawie progu latencji. Stary stan agregacji jest porzucany po osiągnięciu progu. |
| Zaktualizuj | Zapytanie zapisuje wiersze w tabeli docelowej w miarę obliczania wyników, a zapytanie może aktualizować i zastępować wiersze w miarę nadejścia nowych danych. Stary stan agregacji jest porzucany po osiągnięciu progu. |
| Ukończ | Stan agregacji nie jest utracony. Zapytanie ponownie zapisuje tabelę docelową dla każdego wyzwalacza. |
Znaczniki wodne i tryby wyjściowe dla złączeń strumień–strumień
Sprzężenia między wieloma strumieniami obsługują tylko tryb dołączania. Zapytania zapisują pasujące rekordy dla każdej partii.
W przypadku sprzężeń wewnętrznych usługa Databricks zaleca ustawienie progu limitu dla każdego źródła danych przesyłania strumieniowego, aby umożliwić zapytaniu odrzucenie informacji o stanie starych rekordów. Bez znaczników wodnych Structured Streaming próbuje przy każdym wyzwoleniu połączyć każdy klucz z obu stron złączenia, co może wpływać na wydajność.
W przypadku sprzężeń zewnętrznych watermarking jest obowiązkowy. Gdy rekord jest niezgodny, zapytanie zapisuje wartość null dla tego klucza. Ponieważ złączenia obsługują tylko tryb dopisywania, niedopasowane rekordy nie są zapisywane, dopóki nie upłynie próg dopuszczalnego opóźnienia.
Kontrolowanie progu opóźnionych danych przy użyciu wielu zasad znaków wodnych
W przypadku wielu wejść Structured Streaming można ustawić wiele znaczników wodnych, aby kontrolować progi tolerancji dla opóźnionych danych. Watermarki umożliwiają sterowanie informacjami o stanie i opóźnieniem.
Zapytanie przesyłane strumieniowo może mieć wiele strumieni wejściowych, które są połączone lub złączone. W przypadku operacji stanowych każdy ze strumieni wejściowych może wymagać innego progu tolerancji dla spóźnionych danych. Określ te progi przy użyciu withWatermark("eventTime", delay) dla każdego strumienia wejściowego. Poniżej przedstawiono przykładowe zapytanie z łączeniami strumieni-strumieni.
Python
input_stream1 = ... # delays up to 1 hour
input_stream2 = ... # delays up to 2 hours
(input_stream1.withWatermark("eventTime1", "1 hour")
.join(
input_stream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
)
Scala
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Podczas uruchamiania zapytania z operacjami stanowymi usługa Structured Streaming indywidualnie śledzi maksymalny czas zdarzenia dla każdego strumienia wejściowego, oblicza znaki wodne na podstawie odpowiedniego opóźnienia i określa pojedynczy globalny znak wodny. Domyślnie przesyłanie strumieniowe ze strukturą używa wartości minimalnej jako globalnego limitu. Jeśli jeden ze strumieni zacznie pozostawać w tyle za pozostałymi, minimalny globalny znacznik wodny zapobiega przypadkowemu oznaczeniu danych jako spóźnionych przez zapytanie. Na przykład może się tak zdarzyć, gdy jeden ze strumieni przestaje otrzymywać dane z powodu awarii po stronie źródła. Globalny znacznik wodny bezpiecznie przesuwa się w tempie najwolniejszego strumienia i w razie potrzeby opóźnia wynik zapytania.
Aby zmniejszyć opóźnienie, ustaw spark.sql.streaming.multipleWatermarkPolicy na max (wartość domyślna to min), aby jako globalnego znacznika wodnego użyć znacznika wodnego najszybszego strumienia. Jednak ta konfiguracja usuwa dane z najwolniejszych strumieni. Usługa Databricks zaleca stosowanie tej konfiguracji z ostrożnością.
Stosowanie znaków wodnych do odrębnych operacji
Operacja distinct rejestruje każdy unikalny rekord w stanie. Bez znaku wodnego stan rośnie w nieskończoność i może powodować problemy z pamięcią. Określ znak wodny w polu sygnatury czasowej, aby powiązać stan i usunąć stare rekordy po osiągnięciu progu.
Poniższy przykład stosuje znak wodny dla operacji distinct:
Python
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
Scala
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
W tym przykładzie zapytanie strumieniowe usuwa zduplikowane rekordy, które napływają w ciągu 1 godziny od ostatnio zaobserwowanego eventTime. Zapytanie odrzuca informacje o stanie deduplikacji po osiągnięciu progu.
Ważne
Aby deduplikować określone kolumny zamiast wszystkich kolumn, użyj polecenia dropDuplicates() lub dropDuplicatesWithinWatermark() zamiast distinct. Zobacz Usuwanie duplikatów w znaku wodnym.
Usuwanie duplikatów w znaku wodnym
W środowisku Databricks Runtime 13.3 LTS lub nowszym można użyć unikalnego identyfikatora do usuwania duplikatów rekordów w obrębie progu watermark.
Przesyłanie strumieniowe ze strukturą gwarantuje dokładnie jednokrotne przetwarzanie, ale nie deduplikuje rekordów ze źródeł danych. Służy dropDuplicatesWithinWatermark do usuwania duplikatów w dowolnym polu, nawet jeśli pola różnią się między zduplikowanymi rekordami, takimi jak czas zdarzenia lub czas przybycia.
W przypadku funkcji dropDuplicatesWithinWatermark zapytania zawsze usuwają zduplikowane rekordy, które docierają w obrębie progu znacznika wodnego. Zapytania mogą również deduplikować rekordy, które napływają po upływie tego progu, ale nie ma na to gwarancji. Aby zagwarantować, że zapytania porzucają wszystkie duplikaty, ustaw próg limitu na większy niż maksymalna różnica sygnatury czasowej między zduplikowanymi zdarzeniami.
Aby użyć metody dropDuplicatesWithinWatermark, należy określić znak wodny:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))
Przykłady przypadków użycia
W poniższych przykładach przedstawiono zaawansowane przypadki użycia okien:
Użyj okien stałoczasowych, aby obliczyć godzinowe sumy sprzedaży
Okna stałe mają stały rozmiar i niepokrywające się przedziały. Każdy wiersz wejściowy należy do dokładnie jednego okna. Użyj okien przeskakujących, aby obliczać agregacje dla odrębnych przedziałów czasu, takie jak godzinowe sumy sprzedaży:
Python
from pyspark.sql.functions import window, sum
hourly_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val hourlySales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
W tym przykładzie:
-
window("timestamp", "1 hour")grupuje zamówienia na nienakładające się interwały 1-godzinne, takie jak od 5 do 6:00 i od 6 do 7:00. -
withWatermark("timestamp", "1 hour")przechowuje stan agregacji każdego okna, dopóki znacznik czasu zakończenia okna nie jest o 1 godzinę wcześniejszy od maksymalnego znacznika czasu zamówienia.
Użyj okien przesuwnych do obliczania agregatów kroczących
Okna przesuwne mają stały rozmiar i przedziały, które mogą się nakładać. Pojedynczy wiersz może należeć do wielu okien. Użyj okien przesuwnych, aby obliczyć agregacje stopniowe, takie jak sprzedaż w okresie 6-godzinnym:
Python
from pyspark.sql.functions import window, sum
rolling_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val rollingSales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "6 hours", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
W tym przykładzie:
-
window("timestamp", "6 hours", slideDuration="1 hour")grupuje zamówienia w 6-godzinne przedziały, które przesuwają się co 1 godzinę, na przykład od 5:00 do 11:00 oraz od 6:00 do 12:00. -
withWatermark("timestamp", "1 hour")program przechowuje agregację każdego okna w stanie, dopóki sygnatura czasowa zakończenia okna jest 1 godzina starsza niż maksymalna sygnatura czasowa zamówienia. -
slideDurationwartość musi być mniejsza lub równawindowDuration.
Sprawdzanie aktywności użytkownika za pomocą okien sesji
Okna sesji nie mają stałego rozmiaru. Okno otwiera się, gdy nadejdzie wiersz, i zamyka się po upływie przerwy, w której nie pojawiają się żadne nowe wiersze. Użyj okien sesji, aby agregować wzrosty aktywności między długimi okresami bezczynności, takimi jak wyświetlenia stron użytkownika w ciągu 30 minut:
Python
from pyspark.sql.functions import session_window, sum
sessionized_page_views = (activity
.withWatermark("timestamp", "1 hour")
.groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
.agg(sum("page_views").alias("total_page_views"))
)
Scala
import org.apache.spark.sql.functions.{session_window, sum}
val sessionizedPageViews = activity
.withWatermark("timestamp", "1 hour")
.groupBy($"user_id", session_window($"timestamp", "30 minutes"))
.agg(sum($"page_views").alias("total_page_views"))
W tym przykładzie:
-
session_window("timestamp", gapDuration="30 minutes")Otwiera okno po nadejściu pierwszego widoku strony. Każdy kolejny widok strony, który pojawia się w ciągu 30 minut, rozszerza okno. Gdy żaden widok strony nie pojawi się w ciągu 30 minut, okno zostanie zamknięte, a następny widok strony uruchomi nowe okno. -
withWatermark("timestamp", "1 hour")przechowuje zagregowane dane każdej sesji w pamięci stanu, dopóki znacznik czasu zakończenia okna nie będzie o 1 godzinę wcześniejszy niż maksymalny znacznik czasu odsłony strony. - Argument
timeColumndlawindow()isession_window()musi być typuTimestampTypelubTimestampNTZType. - Służy
current_timestamp()do definiowania okien na podstawie czasu przetwarzania, a nie czasu zdarzenia. - Czasy trwania okien można ustawić z mikrosekund do dni. Czasy trwania miesiąca i dłuższe nie są obsługiwane.
- Użyj
completetrybu danych wyjściowych z agregacjami okien, aby zachować cały stan okna na czas nieokreślony. Użyj trybu wyjściowegoappendz odpowiednim znakiem wodnym, aby ograniczyć przyrost stanu i zapobiec problemom z pamięcią w przypadku dużych zbiorów danych. Aby uzyskać więcej informacji na temat zachowania trybu danych wyjściowych, zobacz Watermarks and output mode for windowed aggregations (Znaki wodne i tryb danych wyjściowych dla agregacji okien).