Udostępnij za pośrednictwem


Dokumentacja trybu w czasie rzeczywistym

Ta strona zawiera informacje referencyjne dotyczące trybu czasu rzeczywistego w przesyłania strumieniowego ze strukturą, w tym obsługiwane środowiska, języki, źródła, ujścia, operatory i znane ograniczenia.

Obsługiwane środowiska, języki i tryby

Obsługiwane języki: Tryb czasu rzeczywistego obsługuje języki Scala, Java i Python.

Obsługiwane typy obliczeń:

Typ środowiska obliczeniowego Wsparte
Dedykowany (dawniej: dla pojedynczego użytkownika)
Standardowa (dawniej: udostępnione) √ (tylko język Python)
Lakeflow Spark Deklaratywne potoki klasyczne Niewspierane
Deklaratywne potoki Lakeflow Spark bezserwerowe Niewspierane
Serverless Niewspierane

Obsługiwane tryby wykonywania:

Tryb wykonywania Wsparte
Tryb aktualizacji
tryb dołączania Niewspierane
Tryb pełny Niewspierane

Obsługa źródła i ujścia

Źródło lub ujście Jako źródło Jako ujście
Apache Kafka
Event Hubs (przy użyciu łącznika Kafka)
Kinesis √ (tylko tryb EFO) Niewspierane
AWS MSK Niewspierane
Delta Niewspierane Niewspierane
Google Pub/Sub (usługa przesyłania wiadomości) Niewspierane Niewspierane
Apache Pulsar Niewspierane Niewspierane
Dowolne ujścia (przy użyciu forEachWriter) Nie dotyczy

Obsługiwane operatory

Operatorów Wsparte
Operacje bezstanowe
Selekcja
Projekcja
funkcje zdefiniowane przez użytkownika (UDF)
Scala UDF √ (z pewnymi ograniczeniami)
Python UDF √ (z pewnymi ograniczeniami)
Agregacja
sum
liczba
max
min
avg
Funkcje agregacji
Windowing
Tumbling
Sliding
Session Niewspierane
Deduplikacja
usuńDuplikaty √ (stan jest niezwiązany)
UsuńDuplikatyWZnakuWodnym Niewspierane
Strumień / Łączenie tabel
Tabela nadawania (powinna być mała)
Strumień — przyłączanie do strumienia Niewspierane
(płaskie)MapGroupsWithState Niewspierane
transformWithState √ (z pewnymi różnicami)
związek √ (z pewnymi ograniczeniami)
forEach
forEachBatch Niewspierane
mapPartitions Nieobsługiwane (zobacz ograniczenie)

Uwagi specjalne

Niektóre operatory i funkcje mają konkretne zagadnienia lub różnice w przypadku użycia w trybie czasu rzeczywistego.

transformWithState w trybie czasu rzeczywistego

W przypadku tworzenia niestandardowych aplikacji stanowych usługa Databricks obsługuje API transformWithState w strukturze strumieniowej Apache Spark. Aby uzyskać więcej informacji na temat interfejsu API i przykładów kodu, zapoznaj się z Tworzenie niestandardowej aplikacji stanowej.

Istnieją jednak pewne różnice między działaniem interfejsu API w trybie pracy w czasie rzeczywistym a tradycyjnymi zapytaniami strumieniowymi korzystającymi z architektury mikrosadowej.

  • Tryb czasu rzeczywistego wywołuje metodę handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) dla każdego wiersza.
    • Iterator inputRows zwraca pojedynczą wartość. Tryb mikrosadowy wywołuje go raz dla każdego klucza, a iterator inputRows zwraca wszystkie wartości dla danego klucza w danej mikrosadowej partii.
    • Uwzględnij tę różnicę podczas pisania kodu
  • Czasomierze czasu zdarzeń nie są obsługiwane w trybie czasu rzeczywistego.
  • W trybie czasu rzeczywistego zegary są opóźnione w uruchamianiu w zależności od przybycia danych.
    • Jeśli czasomierz jest zaplanowany na 10:00:00, ale żadne dane nie docierają, czasomierz nie jest uruchamiany natychmiast.
    • Jeśli dane pojawią się o godzinie 10:00:10, czasomierz zostanie wyzwolony z 10-sekundowym opóźnieniem.
    • Jeśli żadne dane nie docierają, a długotrwała partia kończy działanie, czasomierz jest uruchamiany przed zakończeniem partii.

Funkcje zdefiniowane przez użytkownika języka Python w trybie czasu rzeczywistego

Usługa Databricks obsługuje większość funkcji zdefiniowanych przez użytkownika (UDF) języka Python w trybie czasu rzeczywistego:

Kategoria Typ UDF Wsparte
Bezstanowa Funkcja UDF skalarna języka Python (funkcje skalarne zdefiniowane przez użytkownika — Python)
Bezstanowa Funkcja skalarna Arrow zdefiniowana przez użytkownika (UDF)
Bezstanowa Funkcja UDF skalarna biblioteki Pandas (funkcje zdefiniowane przez użytkownika biblioteki Pandas)
Bezstanowa Funkcja strzałkowa (mapInArrow)
Bezstanowa Funkcja Pandas (Map)
Grupowanie stanowe (UDAF) transformWithState (Row tylko interfejs)
Grupowanie stanowe (UDAF) applyInPandasWithState Niewspierane
Grupowanie niestanowe (UDAF) apply Niewspierane
Grupowanie niestanowe (UDAF) applyInArrow Niewspierane
Grupowanie niestanowe (UDAF) applyInPandas Niewspierane
Funkcja tabeli UDTF (funkcje tabeli zdefiniowane przez użytkownika w języku Python (UDTFs)) Niewspierane
Funkcja tabeli UC UDF Niewspierane

Istnieje kilka kwestii, które należy wziąć pod uwagę podczas korzystania z funkcji UDF języka Python w trybie czasu rzeczywistego:

  • Aby zminimalizować opóźnienie, skonfiguruj rozmiar partii strzałki (spark.sql.execution.arrow.maxRecordsPerBatch) na 1.
    • Kompromis: ta konfiguracja optymalizuje opóźnienia kosztem przepływności. W przypadku większości obciążeń to ustawienie jest zalecane.
    • Zwiększ rozmiar partii tylko wtedy, gdy wymagana jest większa przepływność w celu uwzględnienia woluminu wejściowego, akceptując potencjalny wzrost opóźnienia.
  • Funkcje Pandas UDF oraz inne funkcje nie działają dobrze z rozmiarem partii Arrow ustawionym na 1.
    • Jeśli używasz UDF-ów lub funkcji pandas, ustaw rozmiar partii Arrow na wyższą wartość (na przykład 100 lub więcej).
    • Oznacza to większe opóźnienie. Usługa Databricks zaleca używanie funkcji UDF strzałki, jeśli jest to możliwe.
  • Ze względu na problem z wydajnością biblioteki pandas funkcja transformWithState jest obsługiwana tylko w interfejsie Row .

Ograniczenia

Ograniczenia źródła

W przypadku kinesis tryb czasu rzeczywistego nie obsługuje trybu sondowania. Co więcej, częste ponowne partycjonowania mogą mieć negatywny wpływ na opóźnienie.

Ograniczenia unii

Operator Unii ma pewne ograniczenia:

  • Tryb czasu rzeczywistego nie obsługuje samodzielnej unii:
    • Kafka: Nie można używać tego samego obiektu ramki danych źródłowych do łączenia pochodnych ramek danych z niego. Obejście: Użyj różnych ramek danych odczytywanych z tego samego źródła.
    • Kinesis: nie można połączyć ramek danych pochodzących z tego samego źródła Kinesis z tą samą konfiguracją. Obejście: Oprócz używania różnych ramek danych można przypisać inną opcję "consumerName" do każdej ramki danych.
  • Tryb czasu rzeczywistego nie obsługuje operatorów stanowych (na przykład aggregate, , deduplicatetransformWithState) zdefiniowanych przed Unią.
  • Tryb czasu rzeczywistego nie obsługuje łączenia ze źródłami wsadowymi.

Ograniczenie usługi MapPartitions

mapPartitions W języku Scala i podobnymi interfejsami API języka Python (mapInPandas, mapInArrow) przyjmuje iterator całej partycji wejściowej i tworzy iterator całego danych wyjściowych z dowolnym mapowaniem między danymi wejściowymi i wyjściowymi. Te interfejsy programowania aplikacji (API) mogą powodować problemy z wydajnością w trybie przesyłania strumieniowego w czasie rzeczywistym, blokując cały strumień wyjściowy, co zwiększa opóźnienie. Semantyka tych interfejsów API nie wspiera dobrze propagacji znaków wodnych.

Użyj skalarnych funkcji zdefiniowanych przez użytkownika w połączeniu z przekształcaniem złożonych typów danych lub filter zamiast tego, aby uzyskać podobną funkcjonalność.