Stosuj znaki wodne, aby kontrolować progi przetwarzania danych

Na tej stronie opisano pojęcia związane z znakami wodnymi i przedstawiono zalecenia dotyczące używania znaków wodnych w typowych operacjach przesyłania strumieniowego stanowego.

Zapytania przesyłane strumieniowo gromadzą dane stanu w czasie. Znaki wodne automatycznie usuwają stare dane stanu, aby zapobiec błędom pamięci i zwiększonemu opóźnieniu przetwarzania.

Co to jest znak wodny?

Podczas przetwarzania Structured Streaming utrzymuje stan między mikropartiami. Zapytania strumieniowe wykorzystują stan do przyrostowego aktualizowania wyników zamiast ponownego przeliczania wszystkiego po każdej mikropartii. Znaki wodne kontrolują próg, gdy zapytanie zatrzymuje przetwarzanie jednostki stanu.

Typowe przykłady jednostek stanu to:

  • Agregacje w przedziale czasu.
  • Unikatowe klucze w sprzężeniach między dwoma strumieniami.

Aby zadeklarować znak wodny w ramce danych przesyłania strumieniowego, określ pole znacznika czasu i próg opóźnienia. Po nadejściu nowych danych menedżer stanu śledzi najnowszy znacznik czasu w określonym polu i przetwarza tylko rekordy w ramach progu opóźnienia.

Zapytania zawsze przetwarzają rekordy, które napływają w wyznaczonym progu. Zapytania mogą nadal przetwarzać rekordy, które docierają poza tym progiem, ale nie jest to gwarantowane.

Poniższy przykład stosuje próg limitu 10 minut do liczby okien:

Python

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

Scala

import org.apache.spark.sql.functions.window

df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()

W tym przykładzie:

  • Kolumna event_time służy do definiowania 10-minutowego znacznika wody i 5-minutowego przewijanego okna.
  • Zliczana jest liczba dla każdego zaobserwowanego elementu id w każdym nienakładającym się 5-minutowym oknie.
  • Informacje o stanie są przechowywane dla każdego zliczenia do momentu, gdy koniec okna będzie o 10 minut wcześniejszy niż najnowsze obserwowane event_time.

Ważne

W operacji groupBy() i window() odwołuj się do kolumn według nazwy, "<colName>" lub col("<colName>"), aby upewnić się, że znacznik czasu zdarzenia zostanie zachowany. W języku Scala można również użyć polecenia $colName.

Jak znaki wodne wpływają na czas przetwarzania i przepływność?

Tryby wyjścia określają, kiedy zapytanie ze znacznikami wodnymi zapisuje dane do ujścia. Znaki wodne są niezbędne do kontrolowania przepływności w strumieniu stanowym, ponieważ zmniejszają łączną ilość informacji o stanie w pamięci. Nie wszystkie tryby wyjściowe są obsługiwane dla wszystkich operacji stanowych. Zobacz Znaczniki wodne i tryb wyjścia dla agregacji okienkowych.

Wybór czasu trwania znaku wodnego wiąże się z pewnymi kompromisami:

  • Krótsze znaczniki wodne obniżają opóźnienie zapytań, ponieważ zapytania przechowują mniej informacji o stanie i zapisują wyniki po upływie każdego interwału znacznika wodnego. Jednak niewielkie watermarki słabo tolerują opóźnione dane.
  • Dłuższe znaki wodne mają wysoką tolerancję późnych danych. Jednak długie watermarki zwiększają opóźnienia zapytań, ponieważ zapytania muszą przechowywać więcej informacji o stanie i czekać z zapisaniem wyników do upływu dłuższego czasu watermarku.

Znaczniki wodne i tryb wyjściowy dla agregacji okienkowych

W poniższej tabeli przedstawiono zachowanie przetwarzania zapytań z agregacją na znaczniku czasu i znakiem wodnym:

Tryb wyjściowy Zachowanie
Dołączanie Zapytanie zapisuje wiersze do tabeli docelowej po przekroczeniu progu watermark. Wszystkie zapisy są opóźnione na podstawie progu latencji. Stary stan agregacji jest porzucany po osiągnięciu progu.
Zaktualizuj Zapytanie zapisuje wiersze w tabeli docelowej w miarę obliczania wyników, a zapytanie może aktualizować i zastępować wiersze w miarę nadejścia nowych danych. Stary stan agregacji jest porzucany po osiągnięciu progu.
Ukończ Stan agregacji nie jest utracony. Zapytanie ponownie zapisuje tabelę docelową dla każdego wyzwalacza.

Znaczniki wodne i tryby wyjściowe dla złączeń strumień–strumień

Sprzężenia między wieloma strumieniami obsługują tylko tryb dołączania. Zapytania zapisują pasujące rekordy dla każdej partii.

W przypadku sprzężeń wewnętrznych usługa Databricks zaleca ustawienie progu limitu dla każdego źródła danych przesyłania strumieniowego, aby umożliwić zapytaniu odrzucenie informacji o stanie starych rekordów. Bez znaczników wodnych Structured Streaming próbuje przy każdym wyzwoleniu połączyć każdy klucz z obu stron złączenia, co może wpływać na wydajność.

W przypadku sprzężeń zewnętrznych watermarking jest obowiązkowy. Gdy rekord jest niezgodny, zapytanie zapisuje wartość null dla tego klucza. Ponieważ złączenia obsługują tylko tryb dopisywania, niedopasowane rekordy nie są zapisywane, dopóki nie upłynie próg dopuszczalnego opóźnienia.

Kontrolowanie progu opóźnionych danych przy użyciu wielu zasad znaków wodnych

W przypadku wielu wejść Structured Streaming można ustawić wiele znaczników wodnych, aby kontrolować progi tolerancji dla opóźnionych danych. Watermarki umożliwiają sterowanie informacjami o stanie i opóźnieniem.

Zapytanie przesyłane strumieniowo może mieć wiele strumieni wejściowych, które są połączone lub złączone. W przypadku operacji stanowych każdy ze strumieni wejściowych może wymagać innego progu tolerancji dla spóźnionych danych. Określ te progi przy użyciu withWatermark("eventTime", delay) dla każdego strumienia wejściowego. Poniżej przedstawiono przykładowe zapytanie z łączeniami strumieni-strumieni.

Python

input_stream1 = ...      # delays up to 1 hour
input_stream2 = ...      # delays up to 2 hours

(input_stream1.withWatermark("eventTime1", "1 hour")
  .join(
    input_stream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)
)

Scala

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

Podczas uruchamiania zapytania z operacjami stanowymi usługa Structured Streaming indywidualnie śledzi maksymalny czas zdarzenia dla każdego strumienia wejściowego, oblicza znaki wodne na podstawie odpowiedniego opóźnienia i określa pojedynczy globalny znak wodny. Domyślnie przesyłanie strumieniowe ze strukturą używa wartości minimalnej jako globalnego limitu. Jeśli jeden ze strumieni zacznie pozostawać w tyle za pozostałymi, minimalny globalny znacznik wodny zapobiega przypadkowemu oznaczeniu danych jako spóźnionych przez zapytanie. Na przykład może się tak zdarzyć, gdy jeden ze strumieni przestaje otrzymywać dane z powodu awarii po stronie źródła. Globalny znacznik wodny bezpiecznie przesuwa się w tempie najwolniejszego strumienia i w razie potrzeby opóźnia wynik zapytania.

Aby zmniejszyć opóźnienie, ustaw spark.sql.streaming.multipleWatermarkPolicy na max (wartość domyślna to min), aby jako globalnego znacznika wodnego użyć znacznika wodnego najszybszego strumienia. Jednak ta konfiguracja usuwa dane z najwolniejszych strumieni. Usługa Databricks zaleca stosowanie tej konfiguracji z ostrożnością.

Stosowanie znaków wodnych do odrębnych operacji

Operacja distinct rejestruje każdy unikalny rekord w stanie. Bez znaku wodnego stan rośnie w nieskończoność i może powodować problemy z pamięcią. Określ znak wodny w polu sygnatury czasowej, aby powiązać stan i usunąć stare rekordy po osiągnięciu progu.

Poniższy przykład stosuje znak wodny dla operacji distinct:

Python

streamingDf = spark.readStream. ...  # columns: eventTime, id, value, ...

# Apply watermark before distinct operation
(streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()
)

Scala

val streamingDf = spark.readStream. ...  // columns: eventTime, id, value, ...

// Apply watermark before distinct operation
streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()

W tym przykładzie zapytanie strumieniowe usuwa zduplikowane rekordy, które napływają w ciągu 1 godziny od ostatnio zaobserwowanego eventTime. Zapytanie odrzuca informacje o stanie deduplikacji po osiągnięciu progu.

Ważne

Aby deduplikować określone kolumny zamiast wszystkich kolumn, użyj polecenia dropDuplicates() lub dropDuplicatesWithinWatermark() zamiast distinct. Zobacz Usuwanie duplikatów w znaku wodnym.

Usuwanie duplikatów w znaku wodnym

W środowisku Databricks Runtime 13.3 LTS lub nowszym można użyć unikalnego identyfikatora do usuwania duplikatów rekordów w obrębie progu watermark.

Przesyłanie strumieniowe ze strukturą gwarantuje dokładnie jednokrotne przetwarzanie, ale nie deduplikuje rekordów ze źródeł danych. Służy dropDuplicatesWithinWatermark do usuwania duplikatów w dowolnym polu, nawet jeśli pola różnią się między zduplikowanymi rekordami, takimi jak czas zdarzenia lub czas przybycia.

W przypadku funkcji dropDuplicatesWithinWatermark zapytania zawsze usuwają zduplikowane rekordy, które docierają w obrębie progu znacznika wodnego. Zapytania mogą również deduplikować rekordy, które napływają po upływie tego progu, ale nie ma na to gwarancji. Aby zagwarantować, że zapytania porzucają wszystkie duplikaty, ustaw próg limitu na większy niż maksymalna różnica sygnatury czasowej między zduplikowanymi zdarzeniami.

Aby użyć metody dropDuplicatesWithinWatermark, należy określić znak wodny:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(Seq("guid"))

Przykłady przypadków użycia

W poniższych przykładach przedstawiono zaawansowane przypadki użycia okien:

Użyj okien stałoczasowych, aby obliczyć godzinowe sumy sprzedaży

Okna stałe mają stały rozmiar i niepokrywające się przedziały. Każdy wiersz wejściowy należy do dokładnie jednego okna. Użyj okien przeskakujących, aby obliczać agregacje dla odrębnych przedziałów czasu, takie jak godzinowe sumy sprzedaży:

Python

from pyspark.sql.functions import window, sum

hourly_sales = (orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "1 hour"))
  .agg(sum("amount").alias("total_sales"))
)

Scala

import org.apache.spark.sql.functions.{window, sum}

val hourlySales = orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "1 hour"))
  .agg(sum($"amount").alias("total_sales"))

W tym przykładzie:

  • window("timestamp", "1 hour") grupuje zamówienia na nienakładające się interwały 1-godzinne, takie jak od 5 do 6:00 i od 6 do 7:00.
  • withWatermark("timestamp", "1 hour") przechowuje stan agregacji każdego okna, dopóki znacznik czasu zakończenia okna nie jest o 1 godzinę wcześniejszy od maksymalnego znacznika czasu zamówienia.

Użyj okien przesuwnych do obliczania agregatów kroczących

Okna przesuwne mają stały rozmiar i przedziały, które mogą się nakładać. Pojedynczy wiersz może należeć do wielu okien. Użyj okien przesuwnych, aby obliczyć agregacje stopniowe, takie jak sprzedaż w okresie 6-godzinnym:

Python

from pyspark.sql.functions import window, sum

rolling_sales = (orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
  .agg(sum("amount").alias("total_sales"))
)

Scala

import org.apache.spark.sql.functions.{window, sum}

val rollingSales = orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "6 hours", "1 hour"))
  .agg(sum($"amount").alias("total_sales"))

W tym przykładzie:

  • window("timestamp", "6 hours", slideDuration="1 hour") grupuje zamówienia w 6-godzinne przedziały, które przesuwają się co 1 godzinę, na przykład od 5:00 do 11:00 oraz od 6:00 do 12:00.
  • withWatermark("timestamp", "1 hour") program przechowuje agregację każdego okna w stanie, dopóki sygnatura czasowa zakończenia okna jest 1 godzina starsza niż maksymalna sygnatura czasowa zamówienia.
  • slideDuration wartość musi być mniejsza lub równa windowDuration.

Sprawdzanie aktywności użytkownika za pomocą okien sesji

Okna sesji nie mają stałego rozmiaru. Okno otwiera się, gdy nadejdzie wiersz, i zamyka się po upływie przerwy, w której nie pojawiają się żadne nowe wiersze. Użyj okien sesji, aby agregować wzrosty aktywności między długimi okresami bezczynności, takimi jak wyświetlenia stron użytkownika w ciągu 30 minut:

Python

from pyspark.sql.functions import session_window, sum

sessionized_page_views = (activity
  .withWatermark("timestamp", "1 hour")
  .groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
  .agg(sum("page_views").alias("total_page_views"))
)

Scala

import org.apache.spark.sql.functions.{session_window, sum}

val sessionizedPageViews = activity
  .withWatermark("timestamp", "1 hour")
  .groupBy($"user_id", session_window($"timestamp", "30 minutes"))
  .agg(sum($"page_views").alias("total_page_views"))

W tym przykładzie:

  • session_window("timestamp", gapDuration="30 minutes") Otwiera okno po nadejściu pierwszego widoku strony. Każdy kolejny widok strony, który pojawia się w ciągu 30 minut, rozszerza okno. Gdy żaden widok strony nie pojawi się w ciągu 30 minut, okno zostanie zamknięte, a następny widok strony uruchomi nowe okno.
  • withWatermark("timestamp", "1 hour") przechowuje zagregowane dane każdej sesji w pamięci stanu, dopóki znacznik czasu zakończenia okna nie będzie o 1 godzinę wcześniejszy niż maksymalny znacznik czasu odsłony strony.
  • Argument timeColumn dla window() i session_window() musi być typu TimestampType lub TimestampNTZType.
  • Służy current_timestamp() do definiowania okien na podstawie czasu przetwarzania, a nie czasu zdarzenia.
  • Czasy trwania okien można ustawić z mikrosekund do dni. Czasy trwania miesiąca i dłuższe nie są obsługiwane.
  • Użyj complete trybu danych wyjściowych z agregacjami okien, aby zachować cały stan okna na czas nieokreślony. Użyj trybu wyjściowego append z odpowiednim znakiem wodnym, aby ograniczyć przyrost stanu i zapobiec problemom z pamięcią w przypadku dużych zbiorów danych. Aby uzyskać więcej informacji na temat zachowania trybu danych wyjściowych, zobacz Watermarks and output mode for windowed aggregations (Znaki wodne i tryb danych wyjściowych dla agregacji okien).