Dokumentacja trybu w czasie rzeczywistym

Obsługiwane języki

Tryb czasu rzeczywistego obsługuje język Scala, Java i Python.

Typy obliczeniowe

Tryb czasu rzeczywistego obsługuje następujące typy obliczeń:

Typ środowiska obliczeniowego Wsparte
Dedykowany (dawniej: dla pojedynczego użytkownika)
Standardowa (dawniej: udostępnione) √ (tylko Python)
Lakeflow Spark Deklaratywne potoki klasyczne Nieobsługiwane (patrz uwaga poniżej)
Deklaratywne potoki Lakeflow Spark bezserwerowe Nieobsługiwane (patrz uwaga poniżej)
Serverless Niewspierane

Uwaga / Notatka

Tryb czasu rzeczywistego nie jest obsługiwany jako bezpośredni typ wyzwalacza przesyłania strumieniowego ze strukturą w potokach deklaratywnych spark lakeflow. Potoki deklaratywne platformy Spark lakeflow obsługują jednak tryb czasu rzeczywistego za pośrednictwem konfiguracji na poziomie potoku. Zobacz Używanie trybu czasu rzeczywistego w potokach deklaratywnych spark w usłudze Lakeflow.

W przypadku obciążeń wrażliwych na opóźnienia z funkcjami zdefiniowanymi przez użytkownika usługa Databricks zaleca korzystanie z trybu dedykowanego dostępu. Zobacz Funkcje tabeli.

Tryby wykonywania

Tryb czasu rzeczywistego obsługuje tylko tryb aktualizacji:

Tryb wykonywania Wsparte
Tryb aktualizacji
tryb dołączania Niewspierane
Tryb pełny Niewspierane

Źródła i ujścia

Tryb czasu rzeczywistego obsługuje następujące źródła i ujścia:

Źródło lub ujście Jako źródło Jako ujście
Apache Kafka
Event Hubs (przy użyciu łącznika Kafka)
Kinesis √ (tylko tryb EFO) Niewspierane
AWS MSK Niewspierane
Delta Niewspierane Niewspierane
Google Pub/Sub (usługa przesyłania wiadomości) Niewspierane Niewspierane
Apache Pulsar Niewspierane Niewspierane
Dowolne ujścia (przy użyciu forEachWriter) Nie dotyczy

Operatorów

Tryb czasu rzeczywistego obsługuje większość operatorów przesyłania strumieniowego ze strukturą:

Operacje bezstanowe

Obsługujący Wsparte
Selekcja
Projekcja

UDFs

Obsługujący Wsparte
Scala UDF √ (z pewnymi ograniczeniami)
Python funkcji zdefiniowanej przez użytkownika √ (z pewnymi ograniczeniami)

Aggregation

Obsługujący Wsparte
sum
liczba
max
min
avg
Funkcje agregacji

Windowing

Obsługujący Wsparte
Tumbling
Sliding
Session Niewspierane

Deduplication

Obsługujący Wsparte
usuńDuplikaty √ (stan jest niezwiązany)
UsuńDuplikatyWZnakuWodnym Niewspierane

Przesyłanie strumieniowe do sprzężenia tabeli

Obsługujący Wsparte
Rozgłaszaj sprzężenia tabeli (tabela powinna być mała)
Przesyłanie strumieniowe do sprzężenia strumieniowego Niewspierane
(płaskie)MapGroupsWithState Niewspierane
transformWithState √ (z pewnymi różnicami)
związek √ (z pewnymi ograniczeniami)
forEach
forEachBatch Niewspierane
mapPartitions Nieobsługiwane (zobacz ograniczenie)

Uwagi specjalne

Niektóre operatory i funkcje mają konkretne zagadnienia lub różnice w przypadku użycia w trybie czasu rzeczywistego.

transformWithState w trybie czasu rzeczywistego

W przypadku tworzenia niestandardowych aplikacji stanowych usługa Databricks obsługuje API transformWithState w strukturze strumieniowej Apache Spark. Aby uzyskać więcej informacji na temat interfejsu API i przykładów kodu, zapoznaj się z Tworzenie niestandardowej aplikacji stanowej.

Jednak interfejs API działa inaczej w trybie czasu rzeczywistego niż w zapytaniach mikrosadowych.

  • Tryb czasu rzeczywistego wywołuje metodę handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) dla każdego wiersza.
    • Iterator inputRows zwraca pojedynczą wartość. Tryb mikrosadowy wywołuje go raz dla każdego klucza, a iterator inputRows zwraca wszystkie wartości dla danego klucza w danej mikrosadowej partii.
    • Uwzględnij tę różnicę podczas pisania kodu
  • Czasomierze czasu zdarzeń nie są obsługiwane w trybie czasu rzeczywistego.
  • transformWithStateInPandas nie jest obsługiwany w trybie czasu rzeczywistego. Zamiast tego użyj interfejsu API opartego na transformWithState wierszach, który używa Row obiektów, a nie ramek danych biblioteki pandas.
  • W trybie czasu rzeczywistego zegary są opóźnione w uruchamianiu w zależności od przybycia danych.
    • Jeśli czasomierz jest zaplanowany na 10:00:00, ale żadne dane nie docierają, czasomierz nie jest uruchamiany natychmiast.
    • Jeśli dane pojawią się o godzinie 10:00:10, czasomierz zostanie wyzwolony z 10-sekundowym opóźnieniem.
    • Jeśli żadne dane nie docierają, a długotrwała partia kończy działanie, czasomierz jest uruchamiany przed zakończeniem partii.

Uwaga / Notatka

W środowisku Databricks Runtime 18.1 lub nowszym, jeśli używasz transformWithState i trybu czasu rzeczywistego dla Python z niską przepływnością, mniej niż 5 rekordów na sekundę, może być widoczne zwiększone opóźnienie do kilkuset milisekund. Usługa Databricks zaleca uaktualnienie do środowiska Databricks Runtime 18.2 lub nowszego w celu rozwiązania problemu.

Python UDF w trybie czasu rzeczywistego

Usługa Databricks obsługuje większość funkcji zdefiniowanych przez użytkownika (UDF) Python w trybie czasu rzeczywistego:

Bezstanowa

Typ UDF Wsparte
Python funkcji skalarnych UDF (Użytkowników zdefiniowanych przez użytkownika — Python)
Funkcja skalarna Arrow zdefiniowana przez użytkownika (UDF)
Funkcja UDF skalarna biblioteki Pandas (funkcje zdefiniowane przez użytkownika biblioteki Pandas)
Funkcja strzałkowa (mapInArrow)
Funkcja Pandas (Map)

Grupowanie stanowe (UDAF)

Typ UDF Wsparte
transformWithState (tylko Row interfejs)
transformWithStateInPandas Niewspierane. Zamiast tego użyj interfejsu API opartego na transformWithState wierszach, który używa Row obiektów, a nie ramek danych biblioteki pandas. Aby uzyskać szczegółowe informacje, zobacz transformWithStateInPandas nieobsługiwane .
applyInPandasWithState Niewspierane

Grupowanie niestanowe (UDAF)

Typ UDF Wsparte
apply Niewspierane
applyInArrow Niewspierane
applyInPandas Niewspierane

Funkcje tabeli

Typ UDF Wsparte
UDTF (Python funkcje tabeli zdefiniowane przez użytkownika (UDTFs)) Niewspierane
UC UDF Niewspierane

Istnieje kilka kwestii, które należy wziąć pod uwagę podczas korzystania z funkcji zdefiniowanych przez użytkownika Python w trybie czasu rzeczywistego:

  • Aby zminimalizować opóźnienie, ustaw rozmiar partii strzałki (spark.sql.execution.arrow.maxRecordsPerBatch) na 1.
    • Kompromis: ta konfiguracja optymalizuje opóźnienia kosztem przepływności. W przypadku większości obciążeń to ustawienie jest zalecane.
    • Zwiększ rozmiar partii tylko wtedy, gdy wymagana jest większa przepływność w celu uwzględnienia woluminu wejściowego, akceptując potencjalny wzrost opóźnienia.
  • Funkcje Pandas UDF oraz inne funkcje nie działają dobrze z rozmiarem partii Arrow ustawionym na 1.
    • Jeśli używasz UDF-ów lub funkcji pandas, ustaw rozmiar partii Arrow na wyższą wartość (na przykład 100 lub więcej).
    • Oznacza to większe opóźnienie. Usługa Databricks zaleca używanie funkcji UDF strzałki, jeśli jest to możliwe.
  • transformWithStateInPandas nie jest obsługiwany w trybie czasu rzeczywistego. Zamiast tego użyj interfejsu API opartego na transformWithState wierszach, który używa Row obiektów, a nie ramek danych biblioteki pandas. Zobacz transformWithStateInPandas nieobsługiwane i Rejezys przykład pracy Python przy użyciu interfejsu API opartego na wierszach.
  • W przypadku obciążeń wrażliwych na opóźnienia z funkcjami zdefiniowanymi przez użytkownika usługa Databricks zaleca korzystanie z trybu dedykowanego dostępu. W standardowym trybie dostępu obciążenie związane z izolacją zabezpieczeń może spowolnić wydajność funkcji zdefiniowanej przez użytkownika.