Udostępnij za pośrednictwem


Wybieranie trybu danych wyjściowych dla przesyłania strumieniowego ze strukturą

W tym artykule omówiono wybieranie trybu danych wyjściowych dla stanowego przesyłania strumieniowego. Tylko strumienie stanowe zawierające agregacje wymagają konfiguracji trybu wyjściowego.

Sprzężenia obsługują tylko tryb danych wyjściowych dołączania, a tryb wyjściowy nie ma wpływu na deduplikację. Dowolne operatory mapGroupsWithState stanowe i flatMapGroupsWithState emitują rekordy przy użyciu własnej logiki niestandardowej, więc tryb wyjściowy strumienia nie ma wpływu na ich zachowanie.

W przypadku przesyłania strumieniowego bezstanowego wszystkie tryby wyjściowe zachowują się tak samo.

Aby poprawnie skonfigurować tryb wyjściowy, musisz zrozumieć stanowe przesyłanie strumieniowe, znaki wodne i wyzwalacze. Odwiedź następujące artykuły:

Co to jest tryb wyjściowy?

Tryb wyjściowy zapytania przesyłania strumieniowego ze strukturą określa, które rekordy operatory zapytania emitują podczas każdego wyzwalacza. Trzy typy rekordów, które mogą być emitowane, to:

  • Rejestruje, że przyszłe przetwarzanie nie zmienia się.
  • Rekordy, które uległy zmianie od ostatniego wyzwalacza.
  • Wszystkie rekordy w tabeli stanów.

Znajomość typów rekordów do emisji jest ważna dla operatorów stanowych, ponieważ określony wiersz generowany przez operator stanowy może zmienić się z wyzwalacza na wyzwalacz. Na przykład, gdy operator agregacji przesyłania strumieniowego otrzymuje więcej wierszy dla określonego okna, wartości agregacji tego okna mogą ulec zmianie między wyzwalaczami.

W przypadku operatorów bezstanowych rozróżnienie między typami rekordów nie ma wpływu na zachowanie operatora. Rekordy operator bezstanowy emitowany podczas wyzwalacza są zawsze rekordami źródłowymi przetwarzanymi podczas tego wyzwalacza.

Dostępne tryby wyjściowe

Istnieją trzy tryby wyjściowe, które informują operator, które rekordy mają być emitowane podczas określonego wyzwalacza:

Tryb danych wyjściowych opis
Tryb dołączania (ustawienie domyślne) Domyślnie zapytania przesyłane strumieniowo są uruchamiane w trybie dołączania. W tym trybie operatory emitują tylko wiersze, które nie zmieniają się w przyszłych wyzwalaczach. Operatory stanowe używają znaku wodnego, aby określić, kiedy tak się stanie.
Tryb aktualizacji W trybie aktualizacji operatory emitują wszystkie wiersze, które zmieniły się podczas wyzwalacza, nawet jeśli emitowany rekord może ulec zmianie w kolejnym wyzwalaczu.
Tryb ukończenia Tryb kompletny działa tylko z agregacjami przesyłania strumieniowego. W trybie pełnym wszystkie wynikowe wiersze tworzone przez operatora są emitowane podrzędnie.

Zagadnienia dotyczące środowiska produkcyjnego

W przypadku wielu stanowych operacji przesyłania strumieniowego należy wybrać między trybami dołączania i aktualizacji. W poniższych sekcjach opisano zagadnienia, które mogą informować o twojej decyzji.

Uwaga

Tryb kompletny ma pewne aplikacje, ale może działać słabo w miarę skalowania danych. Usługa Databricks zaleca używanie zmaterializowanych widoków w celu uzyskania gwarancji semantycznych skojarzonych z trybem pełnym z przetwarzaniem przyrostowym dla wielu operacji stanowych. Zobacz Używanie zmaterializowanych widoków w usłudze Databricks SQL.

Semantyka aplikacji

Semantyka aplikacji opisuje sposób używania danych przesyłanych strumieniowo przez aplikacje podrzędne.

Jeśli usługi podrzędne muszą wykonać jedną akcję dla każdego podrzędnego zapisu, w większości przypadków użyj trybu dołączania. Jeśli na przykład masz usługę powiadomień podrzędnych wysyłającą powiadomienia dla każdego nowego rekordu zapisanego w ujściu, tryb dołączania gwarantuje, że każdy rekord jest zapisywany tylko raz. Tryb aktualizacji zapisuje rekord za każdym razem, gdy zmienia się informacje o stanie, co spowodowałoby liczne aktualizacje.

Jeśli usługi podrzędne potrzebują nowych wyników, tryb aktualizacji gwarantuje, że ujście pozostanie tak aktualne, jak to możliwe. Przykłady obejmują model uczenia maszynowego, który odczytuje funkcje w czasie rzeczywistym lub pulpit nawigacyjny analizy śledzący agregacje w czasie rzeczywistym.

Zgodność operatora i ujścia

Przesyłanie strumieniowe ze strukturą nie obsługuje wszystkich operacji dostępnych na platformie Apache Spark, a niektóre operacje przesyłania strumieniowego nie są obsługiwane we wszystkich trybach wyjściowych. Aby uzyskać więcej informacji na temat ograniczeń operatora, zobacz dokumentację przesyłania strumieniowego systemu operacyjnego.

Nie wszystkie ujścia obsługują wszystkie tryby wyjściowe. Zarówno usługa Delta Lake, która obsługuje wszystkie tabele zarządzane przez wykaz aparatu Unity, jak i platformę Kafka obsługują wszystkie tryby wyjściowe. Aby uzyskać więcej informacji na temat zgodności ujścia, zobacz dokumentację przesyłania strumieniowego systemu operacyjnego.

Opóźnienie i koszty

Tryb danych wyjściowych ma wpływ na czas, jaki musi upłynąć przed zapisaniem rekordu, a częstotliwość i ilość zapisanych danych mogą mieć wpływ na koszty związane z potokami przesyłania strumieniowego.

Tryb dołączania wymusza operatorom stanowym emitowanie wyników tylko po sfinalizowaniu wyników stanowych, co jest co najmniej tak długo, jak opóźnienie znaku wodnego. Opóźnienie limitu 1 hour w trybie danych wyjściowych dołączania oznacza, że rekordy mają co najmniej 1-godzinne opóźnienie przed emitowanym podrzędnym.

Tryb aktualizacji powoduje jedno zapis na wyzwalacz na wartość zagregowaną. Jeśli opłaty za ujście na zapis na rekord mogą być kosztowne, jeśli rekordy są aktualizowane wiele razy przed upływem opóźnienia limitu.

Przykłady konfiguracji

W poniższych przykładach kodu pokazano konfigurowanie trybu danych wyjściowych dla aktualizacji przesyłania strumieniowego do tabel wykazu aparatu Unity:

Python

# Append output mode (default)
(df.writeStream
  .toTable("target_table")
)

# Append output mode (same as default behavior)
(df.writeStream
  .outputMode("append")
  .toTable("target_table")
)

# Update output mode
(df.writeStream
  .outputMode("update")
  .toTable("target_table")
)

# Complete output mode
(df.writeStream
  .outputMode("complete")
  .toTable("target_table")
)

Scala

// Append output mode (default)
df.writeStream
  .toTable("target_table")

// Append output mode (same as default behavior)
df.writeStream
  .outputMode("append")
  .toTable("target_table")

// Update output mode
df.writeStream
  .outputMode("update")
  .toTable("target_table")

// Complete output mode
df.writeStream
  .outputMode("complete")
  .toTable("target_table")

Zobacz dokumentację systemu operacyjnego dla PySpark DataStreamWriter.outputMode lub Scala DataStreamWriter.outputMode.

Przykład stanowego przesyłania strumieniowego i trybów wyjściowych

Poniższy przykład ma pomóc w sposobie interakcji trybu danych wyjściowych z znakami wodnymi na potrzeby przesyłania strumieniowego stanowego.

Rozważ agregację przesyłania strumieniowego, która oblicza całkowity przychód wygenerowany co godzinę w sklepie z opóźnieniem limitu 15 minut. Pierwszy mikrobajt przetwarza następujące rekordy:

  • $15 o 2:40pm
  • $10 o 14:30
  • $30 o 15:10

W tym momencie znak wodny silnika jest 2:55, ponieważ odejmuje 15 minut (opóźnienie) od maksymalnego czasu widocznego (15:10). Operator agregacji przesyłania strumieniowego ma następujące elementy w swoim stanie:

  • [2pm, 3pm]: $25
  • [3pm, 4pm]: $30

W poniższej tabeli przedstawiono, co się stanie w każdym trybie danych wyjściowych:

Tryb danych wyjściowych Wynik i przyczyna
Dołączanie Operator agregacji przesyłania strumieniowego nie emituje niczego podrzędnego. Wynika to z faktu, że oba te okna mogą ulec zmianie w miarę pojawiania się nowych wartości z kolejnym wyzwalaczem: znak wodny 2:55pm wskazuje, że rekordy po godzinie 2:55 mogą nadal przybywać, a te rekordy mogą należeć do [2pm, 3pm] okna lub [3pm, 4pm] okna.
Zaktualizuj Operator emituje oba rekordy, ponieważ oba rekordy otrzymały aktualizacje.
Ukończ Operator emituje wszystkie rekordy.

Teraz załóżmy, że strumień odbiera jeszcze jeden rekord:

  • $20 o 15:20

Znak wodny aktualizuje się do 15:05, ponieważ silnik odejmuje 15 minut od 15:20. W tym momencie operator agregacji przesyłania strumieniowego ma następujące elementy w stanie:

  • [2pm, 3pm]: $25
  • [3pm, 4pm]: $50

W poniższej tabeli przedstawiono, co się stanie w każdym trybie danych wyjściowych:

Tryb danych wyjściowych Wynik i przyczyna
Dołączanie Operator agregacji przesyłania strumieniowego obserwuje znak wodny 15:05 jest większy niż koniec [2pm, 3pm] okna. Zgodnie z definicją znaku wodnego to okno nie może już ulec zmianie, więc emituje [2pm, 3pm] okno.
Zaktualizuj Operator agregacji przesyłania strumieniowego [3pm, 4pm] emituje okno, ponieważ wartość stanu zmieniła się z $30 na $50.
Ukończ Operator emituje wszystkie rekordy.

Poniżej przedstawiono podsumowanie zachowania operatorów stanowych w każdym trybie dołączania:

  • W trybie dołączania zapisuj rekordy raz po opóźnieniu znaku wodnego.
  • W trybie aktualizacji zapisuj rekordy, które uległy zmianie od poprzedniego wyzwalacza.
  • W trybie pełnym zapisz wszystkie rekordy, które kiedykolwiek zostały wygenerowane przez operator stanowy.