Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ta strona zawiera informacje referencyjne dotyczące trybu czasu rzeczywistego w przesyłania strumieniowego ze strukturą, w tym obsługiwane środowiska, języki, źródła, ujścia, operatory i znane ograniczenia.
Obsługiwane środowiska, języki i tryby
Obsługiwane języki: Tryb czasu rzeczywistego obsługuje języki Scala, Java i Python.
Obsługiwane typy obliczeń:
| Typ środowiska obliczeniowego | Wsparte |
|---|---|
| Dedykowany (dawniej: dla pojedynczego użytkownika) | ✓ |
| Standardowa (dawniej: udostępnione) | √ (tylko język Python) |
| Lakeflow Spark Deklaratywne potoki klasyczne | Niewspierane |
| Deklaratywne potoki Lakeflow Spark bezserwerowe | Niewspierane |
| Serverless | Niewspierane |
Obsługiwane tryby wykonywania:
| Tryb wykonywania | Wsparte |
|---|---|
| Tryb aktualizacji | ✓ |
| tryb dołączania | Niewspierane |
| Tryb pełny | Niewspierane |
Obsługa źródła i ujścia
| Źródło lub ujście | Jako źródło | Jako ujście |
|---|---|---|
| Apache Kafka | ✓ | ✓ |
| Event Hubs (przy użyciu łącznika Kafka) | ✓ | ✓ |
| Kinesis | √ (tylko tryb EFO) | Niewspierane |
| AWS MSK | ✓ | Niewspierane |
| Delta | Niewspierane | Niewspierane |
| Google Pub/Sub (usługa przesyłania wiadomości) | Niewspierane | Niewspierane |
| Apache Pulsar | Niewspierane | Niewspierane |
Dowolne ujścia (przy użyciu forEachWriter) |
Nie dotyczy | ✓ |
Obsługiwane operatory
| Operatorów | Wsparte |
|---|---|
| Operacje bezstanowe | |
| Selekcja | ✓ |
| Projekcja | ✓ |
| funkcje zdefiniowane przez użytkownika (UDF) | |
| Scala UDF | √ (z pewnymi ograniczeniami) |
| Python UDF | √ (z pewnymi ograniczeniami) |
| Agregacja | |
| sum | ✓ |
| liczba | ✓ |
| max | ✓ |
| min | ✓ |
| avg | ✓ |
| Funkcje agregacji | ✓ |
| Windowing | |
| Tumbling | ✓ |
| Sliding | ✓ |
| Session | Niewspierane |
| Deduplikacja | |
| usuńDuplikaty | √ (stan jest niezwiązany) |
| UsuńDuplikatyWZnakuWodnym | Niewspierane |
| Strumień / Łączenie tabel | |
| Tabela nadawania (powinna być mała) | ✓ |
| Strumień — przyłączanie do strumienia | Niewspierane |
| (płaskie)MapGroupsWithState | Niewspierane |
| transformWithState | √ (z pewnymi różnicami) |
| związek | √ (z pewnymi ograniczeniami) |
| forEach | ✓ |
| forEachBatch | Niewspierane |
| mapPartitions | Nieobsługiwane (zobacz ograniczenie) |
Uwagi specjalne
Niektóre operatory i funkcje mają konkretne zagadnienia lub różnice w przypadku użycia w trybie czasu rzeczywistego.
transformWithState w trybie czasu rzeczywistego
W przypadku tworzenia niestandardowych aplikacji stanowych usługa Databricks obsługuje API transformWithState w strukturze strumieniowej Apache Spark. Aby uzyskać więcej informacji na temat interfejsu API i przykładów kodu, zapoznaj się z Tworzenie niestandardowej aplikacji stanowej.
Istnieją jednak pewne różnice między działaniem interfejsu API w trybie pracy w czasie rzeczywistym a tradycyjnymi zapytaniami strumieniowymi korzystającymi z architektury mikrosadowej.
- Tryb czasu rzeczywistego wywołuje metodę
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)dla każdego wiersza.- Iterator
inputRowszwraca pojedynczą wartość. Tryb mikrosadowy wywołuje go raz dla każdego klucza, a iteratorinputRowszwraca wszystkie wartości dla danego klucza w danej mikrosadowej partii. - Uwzględnij tę różnicę podczas pisania kodu
- Iterator
- Czasomierze czasu zdarzeń nie są obsługiwane w trybie czasu rzeczywistego.
- W trybie czasu rzeczywistego zegary są opóźnione w uruchamianiu w zależności od przybycia danych.
- Jeśli czasomierz jest zaplanowany na 10:00:00, ale żadne dane nie docierają, czasomierz nie jest uruchamiany natychmiast.
- Jeśli dane pojawią się o godzinie 10:00:10, czasomierz zostanie wyzwolony z 10-sekundowym opóźnieniem.
- Jeśli żadne dane nie docierają, a długotrwała partia kończy działanie, czasomierz jest uruchamiany przed zakończeniem partii.
Funkcje zdefiniowane przez użytkownika języka Python w trybie czasu rzeczywistego
Usługa Databricks obsługuje większość funkcji zdefiniowanych przez użytkownika (UDF) języka Python w trybie czasu rzeczywistego:
| Kategoria | Typ UDF | Wsparte |
|---|---|---|
| Bezstanowa | Funkcja UDF skalarna języka Python (funkcje skalarne zdefiniowane przez użytkownika — Python) | ✓ |
| Bezstanowa | Funkcja skalarna Arrow zdefiniowana przez użytkownika (UDF) | ✓ |
| Bezstanowa | Funkcja UDF skalarna biblioteki Pandas (funkcje zdefiniowane przez użytkownika biblioteki Pandas) | ✓ |
| Bezstanowa | Funkcja strzałkowa (mapInArrow) |
✓ |
| Bezstanowa | Funkcja Pandas (Map) | ✓ |
| Grupowanie stanowe (UDAF) |
transformWithState (Row tylko interfejs) |
✓ |
| Grupowanie stanowe (UDAF) | applyInPandasWithState |
Niewspierane |
| Grupowanie niestanowe (UDAF) | apply |
Niewspierane |
| Grupowanie niestanowe (UDAF) | applyInArrow |
Niewspierane |
| Grupowanie niestanowe (UDAF) | applyInPandas |
Niewspierane |
| Funkcja tabeli | UDTF (funkcje tabeli zdefiniowane przez użytkownika w języku Python (UDTFs)) | Niewspierane |
| Funkcja tabeli | UC UDF | Niewspierane |
Istnieje kilka kwestii, które należy wziąć pod uwagę podczas korzystania z funkcji UDF języka Python w trybie czasu rzeczywistego:
- Aby zminimalizować opóźnienie, skonfiguruj rozmiar partii strzałki (
spark.sql.execution.arrow.maxRecordsPerBatch) na 1.- Kompromis: ta konfiguracja optymalizuje opóźnienia kosztem przepływności. W przypadku większości obciążeń to ustawienie jest zalecane.
- Zwiększ rozmiar partii tylko wtedy, gdy wymagana jest większa przepływność w celu uwzględnienia woluminu wejściowego, akceptując potencjalny wzrost opóźnienia.
- Funkcje Pandas UDF oraz inne funkcje nie działają dobrze z rozmiarem partii Arrow ustawionym na 1.
- Jeśli używasz UDF-ów lub funkcji pandas, ustaw rozmiar partii Arrow na wyższą wartość (na przykład 100 lub więcej).
- Oznacza to większe opóźnienie. Usługa Databricks zaleca używanie funkcji UDF strzałki, jeśli jest to możliwe.
- Ze względu na problem z wydajnością biblioteki pandas funkcja transformWithState jest obsługiwana tylko w interfejsie
Row.
Ograniczenia
Ograniczenia źródła
W przypadku kinesis tryb czasu rzeczywistego nie obsługuje trybu sondowania. Co więcej, częste ponowne partycjonowania mogą mieć negatywny wpływ na opóźnienie.
Ograniczenia unii
Operator Unii ma pewne ograniczenia:
- Tryb czasu rzeczywistego nie obsługuje samodzielnej unii:
- Kafka: Nie można używać tego samego obiektu ramki danych źródłowych do łączenia pochodnych ramek danych z niego. Obejście: Użyj różnych ramek danych odczytywanych z tego samego źródła.
- Kinesis: nie można połączyć ramek danych pochodzących z tego samego źródła Kinesis z tą samą konfiguracją. Obejście: Oprócz używania różnych ramek danych można przypisać inną opcję "consumerName" do każdej ramki danych.
- Tryb czasu rzeczywistego nie obsługuje operatorów stanowych (na przykład
aggregate, ,deduplicatetransformWithState) zdefiniowanych przed Unią. - Tryb czasu rzeczywistego nie obsługuje łączenia ze źródłami wsadowymi.
Ograniczenie usługi MapPartitions
mapPartitions W języku Scala i podobnymi interfejsami API języka Python (mapInPandas, mapInArrow) przyjmuje iterator całej partycji wejściowej i tworzy iterator całego danych wyjściowych z dowolnym mapowaniem między danymi wejściowymi i wyjściowymi. Te interfejsy programowania aplikacji (API) mogą powodować problemy z wydajnością w trybie przesyłania strumieniowego w czasie rzeczywistym, blokując cały strumień wyjściowy, co zwiększa opóźnienie. Semantyka tych interfejsów API nie wspiera dobrze propagacji znaków wodnych.
Użyj skalarnych funkcji zdefiniowanych przez użytkownika w połączeniu z przekształcaniem złożonych typów danych lub filter zamiast tego, aby uzyskać podobną funkcjonalność.