Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Tryb czasu rzeczywistego jest typem wyzwalacza dla struktur przetwarzania strumieniowego, który umożliwia przetwarzanie danych z ultra niskimi opóźnieniami, a całkowite opóźnienie może być nawet pięć milisekund. Użyj trybu czasu rzeczywistego dla obciążeń operacyjnych, które wymagają natychmiastowej reakcji na dane przesyłane strumieniowo, takie jak wykrywanie oszustw, personalizacja w czasie rzeczywistym i natychmiastowe systemy podejmowania decyzji.
Tryb czasu rzeczywistego jest dostępny w środowisku Databricks Runtime 16.4 LTS lub nowszym. Aby uzyskać instrukcje dotyczące konfiguracji krok po kroku, zobacz Wprowadzenie do trybu w czasie rzeczywistym. Przykłady kodu można znaleźć w temacie Przykłady trybu czasu rzeczywistego.
Co to jest tryb czasu rzeczywistego?
Obciążenia operacyjne i analityczne
Obciążenia przesyłania strumieniowego można szeroko podzielić na obciążenia analityczne i obciążenia operacyjne:
- Obciążenia analityczne używają pozyskiwania i przekształcania danych, zwykle zgodnie z architekturą medalionu (na przykład pozyskiwaniu danych do tablic brązowych, srebrnych i złotych).
- Obciążenia operacyjne zużywają dane w czasie rzeczywistym, stosują logikę biznesową i wyzwalają akcje podrzędne lub decyzje.
Oto kilka przykładów obciążeń operacyjnych:
- Blokowanie lub flagowanie transakcji karty kredytowej w czasie rzeczywistym, jeśli wynik oszustwa przekracza próg, na podstawie czynników takich jak nietypowa lokalizacja, duży rozmiar transakcji lub szybkie wzorce wydatków.
- Dostarczanie wiadomości promocyjnej, gdy dane strumienia kliknięć pokazują, że użytkownik przegląda dżinsy przez pięć minut, oferując 25% rabat, jeśli kupi w ciągu najbliższych 15 minut.
Ogólnie rzecz biorąc, obciążenia operacyjne charakteryzują się wymogiem opóźnienia end-to-end poniżej jednej sekundy. Można to osiągnąć w trybie czasu rzeczywistego w Strukturalnym Przetwarzaniu Strumieniowym Apache Spark.
Jak tryb czasu rzeczywistego osiąga małe opóźnienie
Tryb czasu rzeczywistego poprawia architekturę wykonywania przez:
- Wykonywanie długotrwałych partii (wartość domyślna to pięć minut), w których system przetwarza dane w miarę ich udostępniania w źródle.
- Planowanie wszystkich etapów zapytania jednocześnie. Wymaga to, aby liczba dostępnych slotów zadań była równa lub większa od liczby zadań we wszystkich etapach partii.
- Przekazywanie danych między etapami zaraz po ich utworzeniu przy użyciu metody przesyłania strumieniowego shuffle.
Na końcu przetwarzania partii i przed rozpoczęciem następnej partii, Structured Streaming wykonuje punkty kontrolne postępu i publikuje metryki. Czas trwania partii wpływa na częstotliwość tworzenia punktów kontrolnych:
- Dłuższe partie: rzadsze zapisywanie punktów kontrolnych, co oznacza dłuższe powtarzanie w przypadku awarii i opóźnioną dostępność wyników pomiarów.
- Krótsze partie: częstsze tworzenie punktów kontrolnych, co może mieć wpływ na opóźnienie.
Databricks zaleca przeprowadzenie testów wydajności w trybie rzeczywistym w odniesieniu do docelowego obciążenia, aby znaleźć odpowiedni interwał wyzwalania.
Kiedy używać trybu czasu rzeczywistego
Wybierz tryb czasu rzeczywistego, gdy przypadek użycia wymaga:
- Opóźnienie podrzędne: aplikacje, które muszą reagować na dane w milisekundach, takie jak systemy wykrywania oszustw, które muszą blokować transakcje w czasie rzeczywistym.
- Podejmowanie decyzji operacyjnych: Systemy, które wyzwalają natychmiastowe akcje na podstawie danych przychodzących, takich jak oferty w czasie rzeczywistym, alerty lub powiadomienia.
- Ciągłe przetwarzanie: obciążenia, w których dane muszą być przetwarzane natychmiast po ich nadejściu, a nie w okresowych partiach.
Użyj trybu mikrosadowego (domyślnego wyzwalacza przesyłania strumieniowego ze strukturą), gdy:
- Przetwarzanie analityczne: potoki ETL, przekształcenia danych i implementacje architektury medalonu, w których wymagania dotyczące opóźnienia są mierzone w sekundach lub minutach.
- Optymalizacja kosztów: obciążenia, w których opóźnienie podrzędne sekundy nie jest wymagane, ponieważ tryb czasu rzeczywistego wymaga dedykowanych zasobów obliczeniowych.
- Częstotliwość punktów kontrolnych ma znaczenie: aplikacje, które korzystają z częstszego tworzenia punktów kontrolnych w celu szybszego odzyskiwania.
Wymagania i konfiguracja
Tryb czasu rzeczywistego ma określone wymagania dotyczące konfiguracji obliczeń i konfiguracji zapytań. W tej sekcji opisano wymagania wstępne i kroki konfiguracji wymagane do korzystania z trybu czasu rzeczywistego.
Wymagania wstępne
Aby korzystać z trybu czasu rzeczywistego, należy spełnić następujące wymagania:
- Databricks Runtime 16.4 LTS lub nowszy: tryb czasu rzeczywistego jest dostępny tylko w wersji DBR 16.4 LTS i nowszych.
- Dedykowane zasoby obliczeniowe: należy użyć dedykowanego (wcześniej przeznaczonego dla jednego użytkownika) zasobu obliczeniowego. Standardowe (wcześniej udostępnione), deklaratywne potoki Lakeflow Spark i klastry bezserwerowe nie są obsługiwane.
- Brak skalowania automatycznego: skalowanie automatyczne musi być wyłączone.
- Brak Photon: przyspieszanie Photon nie jest obsługiwane w trybie czasu rzeczywistego.
-
Konfiguracja platformy Spark: musisz ustawić wartość
spark.databricks.streaming.realTimeMode.enabledtrue.
Konfiguracja obliczeniowa
Skonfiguruj obliczenia przy użyciu następujących ustawień:
- Ustaw
spark.databricks.streaming.realTimeMode.enablednatruew konfiguracji Spark. - Wyłącz skalowanie automatyczne.
- Wyłącz przyspieszanie Photon.
- Upewnij się, że zasoby obliczeniowe są skonfigurowane jako klaster dedykowany (a nie standardowy, Lakeflow Spark Declarative Pipelines ani bezserwerowy).
Aby uzyskać instrukcje krok po kroku dotyczące tworzenia i konfigurowania zasobów obliczeniowych dla trybu czasu rzeczywistego, zobacz Wprowadzenie do trybu czasu rzeczywistego.
Konfiguracja zapytania
Aby uruchomić zapytanie w trybie czasu rzeczywistego, należy włączyć wyzwalacz w czasie rzeczywistym. Wyzwalacze w czasie rzeczywistym są obsługiwane tylko w trybie aktualizacji.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Ustalanie rozmiaru zasobów obliczeniowych
Możesz uruchomić jedno zadanie w czasie rzeczywistym na zasobie obliczeniowym, jeśli ma wystarczającą liczbę miejsc na zadania.
Aby uruchomić w trybie niskiego opóźnienia, łączna liczba dostępnych miejsc zadań musi być większa lub równa liczbie zadań we wszystkich etapach przetwarzania zapytania.
Przykłady obliczeń miejsca
| Typ potoku | Konfiguracja | Wymagane miejsca |
|---|---|---|
| Jednostopniowy, bezstanowy proces (źródło i odbiornik Kafka) |
maxPartitions = 8 |
8 miejsc |
| Dwuetapowe stanowiskowe (źródło Kafka + shuffle) |
maxPartitions = 8, partycje mieszania = 20 |
28 miejsc (8 + 20) |
| Trzy etapy (źródło Kafka + shuffle + ponowne partycjonowanie) |
maxPartitions = 8, dwa etapy mieszania, każdy po 20 elementów |
48 gniazd (8 + 20 + 20) |
Jeśli nie ustawisz maxPartitions, użyj liczby partycji w temacie w Kafka.
Kluczowe zagadnienia
Podczas konfigurowania obliczeń należy wziąć pod uwagę następujące kwestie:
- W przeciwieństwie do trybu mikrosadowego, zadania w czasie rzeczywistym mogą pozostawać bezczynne podczas oczekiwania na dane, więc odpowiednie dopasowanie rozmiaru jest niezbędne, aby uniknąć marnowania zasobów.
- Celuj w celu osiągnięcia docelowego poziomu wykorzystania (na przykład 50%) przez dostrajanie:
-
maxPartitions(dla platformy Kafka) -
spark.sql.shuffle.partitions(dla etapów tasowania)
-
- Usługa Databricks zaleca ustawienie
maxPartitionstak, aby każde zadanie obsługiwało wiele partycji platformy Kafka w celu zmniejszenia obciążenia. - Dostosuj gniazda zadań przypadające na pracownika, aby dopasować obciążenie pracą do prostych zadań jednofazowych.
- W przypadku zadań wymagających intensywnego mieszania eksperymentuj, aby znaleźć minimalną liczbę partycji mieszania, które unikają opóźnień, a następnie dostosuj wartość początkową. Obliczenia nie będą planować zadania, jeśli nie ma wystarczającej liczby miejsc.
Note
W środowisku Databricks Runtime 16.4 LTS i nowszym wszystkie potoki przetwarzania w czasie rzeczywistym używają punktów kontrolnych w wersji 2 (checkpoint v2), co umożliwia bezproblemowe przełączanie między trybami czasu rzeczywistego i mikrosadowego.
Techniki optymalizacji
Użyj następujących technik, aby zmniejszyć opóźnienie w trybie czasu rzeczywistego:
- Śledzenie postępu asynchronicznego: przenosi zapisy do dzienników przesunięcia i zatwierdzania do wątku asynchronicznego, co zmniejsza czas międzysadowy dla zapytań bezstanowych.
- Asynchroniczne tworzenie punktów kontrolnych stanu: rozpoczyna przetwarzanie następnej mikropartii natychmiast po zakończeniu obliczeń, bez potrzeby oczekiwania na tworzenie punktów kontrolnych stanu, co zmniejsza opóźnienie zapytań stanowych.
Note
Żadna technika nie jest domyślnie włączona. Należy je włączyć oddzielnie.
monitorowanie i obserwowanie
Mierzenie wydajności zapytań jest niezbędne w przypadku obciążeń w czasie rzeczywistym. W trybie czasu rzeczywistego tradycyjne metryki procesów wsadowych nie odzwierciedlają rzeczywistego opóźnienia, więc konieczne są alternatywne podejścia.
Kompleksowe opóźnienie jest specyficzne dla obciążenia, a czasami może być dokładnie mierzone tylko za pomocą logiki biznesowej. Jeśli na przykład źródłowy znacznik czasu jest przesyłany w Kafka, możesz obliczyć opóźnienie jako różnicę między sygnaturą czasową danych wyjściowych w Kafka a sygnaturą czasową źródła.
Możesz również oszacować kompleksowe opóźnienie przy użyciu wbudowanych metryk i interfejsów API opisanych poniżej.
Wbudowane metryki z StreamingQueryProgress
Następujące metryki są uwzględniane w StreamingQueryProgress zdarzeniu, które jest automatycznie rejestrowane w dziennikach sterowników. Dostęp do nich można również uzyskać za pośrednictwem funkcji wywołania zwrotnego StreamingQueryListeneronQueryProgress().
QueryProgressEvent.json() lub toString() uwzględniają dodatkowe metryki trybu w czasie rzeczywistym.
- Opóźnienie przetwarzania (processingLatencyMs). Czas, który upłynął między momentem, gdy zapytanie w trybie rzeczywistym odczytuje rekord, a momentem, gdy zapytanie zapisuje go w kolejnym etapie przetwarzania. W przypadku zapytań jednoetapowych oznacza to ten sam czas trwania co opóźnienie E2E. System raportuje tę metrykę na każde zadanie.
- Źródłowe opóźnienie kolejkowania (sourceQueuingLatencyMs). Czas, jaki mija od momentu zapisania rekordu przez system w magistrali komunikatów, na przykład czas dołączenia dziennika w Kafka, zanim zapytanie w trybie czasu rzeczywistego po raz pierwszy odczyta rekord. System raportuje tę metrykę na każde zadanie.
- Opóźnienie E2E (e2eLatencyMs). Czas między momentem, kiedy system zapisuje rekord do magistrali komunikatów, a momentem, kiedy zapytanie w trybie rzeczywistym zapisuje rekord dalej w procesie. System agreguje tę metrykę na każdą partię we wszystkich rekordach przetwarzanych przez wszystkie zadania.
Przykład:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
Dedykowany pomiar opóźnienia za pomocą API Observe
Interfejs API obserwowania pomaga zmierzyć opóźnienie bez uruchamiania innego zadania. Jeśli masz sygnaturę czasową źródła, która przybliża czas przybycia danych źródłowych, możesz oszacować opóźnienie każdej partii przy użyciu interfejsu API obserwowania. Przekaż znacznik czasu przed dotarciem do ujścia:
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
W tym przykładzie bieżący znacznik czasu jest rejestrowany przed wyprowadzeniem wpisu, a opóźnienie jest szacowane przez obliczenie różnicy między tym znacznikiem czasu a znacznikiem czasu źródłowego rekordu. Wyniki są uwzględniane w raportach postępu i udostępniane odbiornikom. Oto przykładowe dane wyjściowe:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
Obsługa funkcji i ograniczenia
W tej sekcji opisano obsługiwane funkcje i bieżące ograniczenia trybu w czasie rzeczywistym, w tym zgodne środowiska, języki, źródła, ujścia, operatory i specjalne zagadnienia dotyczące określonych funkcji.
Obsługiwane środowiska, języki i tryby
Obsługiwane języki: Tryb czasu rzeczywistego obsługuje języki Scala, Java i Python.
Obsługiwane typy obliczeń:
| Typ środowiska obliczeniowego | Supported |
|---|---|
| Dedykowany (dawniej: dla pojedynczego użytkownika) | ✓ |
| Standardowa (dawniej: udostępnione) | √ (tylko język Python) |
| Lakeflow Spark Deklaratywne potoki klasyczne | Niewspierane |
| Deklaratywne potoki Lakeflow Spark bezserwerowe | Niewspierane |
| Serverless | Niewspierane |
Obsługiwane tryby wykonywania:
| Tryb wykonywania | Supported |
|---|---|
| Tryb aktualizacji | ✓ |
| tryb dołączania | Niewspierane |
| Tryb pełny | Niewspierane |
Obsługa źródła i ujścia
| Źródło lub ujście | Jako źródło | Jako ujście |
|---|---|---|
| Apache Kafka | ✓ | ✓ |
| Event Hubs (przy użyciu łącznika Kafka) | ✓ | ✓ |
| Kinesis | √ (tylko tryb EFO) | Niewspierane |
| AWS MSK | ✓ | Niewspierane |
| Delta | Niewspierane | Niewspierane |
| Google Pub/Sub (usługa przesyłania wiadomości) | Niewspierane | Niewspierane |
| Apache Pulsar | Niewspierane | Niewspierane |
Dowolne ujścia (przy użyciu forEachWriter) |
Nie dotyczy | ✓ |
Obsługiwane operatory
| Operators | Supported |
|---|---|
| Operacje bezstanowe | |
| Selection | ✓ |
| Projection | ✓ |
| funkcje zdefiniowane przez użytkownika (UDF) | |
| Scala UDF | √ (z pewnymi ograniczeniami) |
| Python UDF | √ (z pewnymi ograniczeniami) |
| Agregacja | |
| sum | ✓ |
| count | ✓ |
| max | ✓ |
| min | ✓ |
| avg | ✓ |
| Funkcje agregacji | ✓ |
| Windowing | |
| Tumbling | ✓ |
| Sliding | ✓ |
| Session | Niewspierane |
| Deduplikacja | |
| usuńDuplikaty | √ (stan jest niezwiązany) |
| UsuńDuplikatyWZnakuWodnym | Niewspierane |
| Strumień / Łączenie tabel | |
| Tabela nadawania (powinna być mała) | ✓ |
| Strumień — przyłączanie do strumienia | Niewspierane |
| (płaskie)MapGroupsWithState | Niewspierane |
| transformWithState | √ (z pewnymi różnicami) |
| związek | √ (z pewnymi ograniczeniami) |
| forEach | ✓ |
| forEachBatch | Niewspierane |
| mapPartitions | Nieobsługiwane (zobacz ograniczenie) |
Uwagi specjalne
Niektóre operatory i funkcje mają konkretne zagadnienia lub różnice w przypadku użycia w trybie czasu rzeczywistego.
transformWithState w trybie czasu rzeczywistego
W przypadku tworzenia niestandardowych aplikacji stanowych usługa Databricks obsługuje API transformWithState w strukturze strumieniowej Apache Spark. Aby uzyskać więcej informacji na temat interfejsu API i przykładów kodu, zapoznaj się z Tworzenie niestandardowej aplikacji stanowej.
Istnieją jednak pewne różnice między działaniem interfejsu API w trybie pracy w czasie rzeczywistym a tradycyjnymi zapytaniami strumieniowymi korzystającymi z architektury mikrosadowej.
- Tryb czasu rzeczywistego wywołuje metodę
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)dla każdego wiersza.- Iterator
inputRowszwraca pojedynczą wartość. Tryb mikrosadowy wywołuje go raz dla każdego klucza, a iteratorinputRowszwraca wszystkie wartości dla danego klucza w danej mikrosadowej partii. - Należy pamiętać o tej różnicy podczas pisania kodu.
- Iterator
- Czasomierze czasu zdarzeń nie są obsługiwane w trybie czasu rzeczywistego.
- W trybie czasu rzeczywistego zegary są opóźnione w uruchamianiu w zależności od przybycia danych.
- Jeśli czasomierz jest zaplanowany na 10:00:00, ale żadne dane nie docierają, czasomierz nie jest uruchamiany natychmiast.
- Jeśli dane pojawią się o godzinie 10:00:10, czasomierz zostanie wyzwolony z 10-sekundowym opóźnieniem.
- Jeśli żadne dane nie docierają, a długotrwała partia kończy działanie, czasomierz jest uruchamiany przed zakończeniem partii.
Funkcje zdefiniowane przez użytkownika języka Python w trybie czasu rzeczywistego
Usługa Databricks obsługuje większość funkcji zdefiniowanych przez użytkownika (UDF) języka Python w trybie czasu rzeczywistego:
| Kategoria | Typ UDF | Supported |
|---|---|---|
| Bezstanowa | Funkcja UDF skalarna języka Python (funkcje skalarne zdefiniowane przez użytkownika — Python) | ✓ |
| Bezstanowa | Funkcja skalarna Arrow zdefiniowana przez użytkownika (UDF) | ✓ |
| Bezstanowa | Funkcja UDF skalarna biblioteki Pandas (funkcje zdefiniowane przez użytkownika biblioteki Pandas) | ✓ |
| Bezstanowa | Funkcja strzałkowa (mapInArrow) |
✓ |
| Bezstanowa | Funkcja Pandas (Map) | ✓ |
| Grupowanie stanowe (UDAF) |
transformWithState (Row tylko interfejs) |
✓ |
| Grupowanie stanowe (UDAF) | applyInPandasWithState |
Niewspierane |
| Grupowanie niestanowe (UDAF) | apply |
Niewspierane |
| Grupowanie niestanowe (UDAF) | applyInArrow |
Niewspierane |
| Grupowanie niestanowe (UDAF) | applyInPandas |
Niewspierane |
| Funkcja tabeli | UDTF (funkcje tabeli zdefiniowane przez użytkownika w języku Python (UDTFs)) | Niewspierane |
| Funkcja tabeli | UC UDF | Niewspierane |
Istnieje kilka kwestii, które należy wziąć pod uwagę podczas korzystania z funkcji UDF języka Python w trybie czasu rzeczywistego:
- Aby zminimalizować opóźnienie, skonfiguruj rozmiar partii strzałki (
spark.sql.execution.arrow.maxRecordsPerBatch) na 1.- Kompromis: ta konfiguracja optymalizuje opóźnienia kosztem przepływności. W przypadku większości obciążeń to ustawienie jest zalecane.
- Zwiększ rozmiar partii tylko wtedy, gdy wymagana jest większa przepływność w celu uwzględnienia woluminu wejściowego, akceptując potencjalny wzrost opóźnienia.
- Funkcje Pandas UDF oraz inne funkcje nie działają dobrze z rozmiarem partii Arrow ustawionym na 1.
- Jeśli używasz UDF-ów lub funkcji pandas, ustaw rozmiar partii Arrow na wyższą wartość (na przykład 100 lub więcej).
- Należy pamiętać, że oznacza to większe opóźnienie. Usługa Databricks zaleca używanie funkcji Arrow UDF, jeśli to możliwe.
- Ze względu na problem z wydajnością biblioteki pandas funkcja transformWithState jest obsługiwana tylko w interfejsie
Row.
Limitations
Ograniczenia źródła
W przypadku kinesis tryb czasu rzeczywistego nie obsługuje trybu sondowania. Co więcej, częste ponowne partycjonowania mogą mieć negatywny wpływ na opóźnienie.
Ograniczenia unii
Operator Unii ma pewne ograniczenia:
- Tryb czasu rzeczywistego nie obsługuje samodzielnej unii:
- Kafka: Nie można używać tego samego obiektu ramki danych źródłowych do łączenia pochodnych ramek danych z niego. Obejście: Użyj różnych ramek danych odczytywanych z tego samego źródła.
- Kinesis: nie można połączyć ramek danych pochodzących z tego samego źródła Kinesis z tą samą konfiguracją. Obejście: Oprócz używania różnych ramek danych można przypisać inną opcję "consumerName" do każdej ramki danych.
- Tryb czasu rzeczywistego nie obsługuje operatorów stanowych (na przykład
aggregate, ,deduplicatetransformWithState) zdefiniowanych przed Unią. - Tryb czasu rzeczywistego nie obsługuje łączenia ze źródłami wsadowymi.
Ograniczenie usługi MapPartitions
mapPartitions W języku Scala i podobnymi interfejsami API języka Python (mapInPandas, mapInArrow) wykonaj iterator całej partycji wejściowej i utwórz iterator całego danych wyjściowych z dowolnym mapowaniem między danymi wejściowymi i wyjściowymi. Te interfejsy programowania aplikacji (API) mogą powodować problemy z wydajnością w trybie przesyłania strumieniowego w czasie rzeczywistym, blokując cały strumień wyjściowy, co zwiększa opóźnienie. Semantyka tych interfejsów API nie wspiera dobrze propagacji znaków wodnych.
Użyj skalarnych funkcji zdefiniowanych przez użytkownika w połączeniu z przekształcaniem złożonych typów danych lub filter zamiast tego, aby uzyskać podobną funkcjonalność.
Następne kroki
Teraz, gdy już wiesz, jaki jest tryb czasu rzeczywistego i jak go skonfigurować, zapoznaj się z tymi zasobami, aby rozpocząć implementowanie aplikacji przesyłania strumieniowego w czasie rzeczywistym:
- Wprowadzenie do trybu czasu rzeczywistego — postępuj zgodnie z instrukcjami krok po kroku, aby skonfigurować obliczenia i uruchomić pierwsze zapytanie przesyłania strumieniowego w czasie rzeczywistym.
- Przykłady kodu trybu w czasie rzeczywistym — zapoznaj się z przykładami roboczymi, takimi jak źródła i ujścia platformy Kafka, zapytania stanowe, agregacje i ujścia niestandardowe.
- Pojęcia dotyczące przesyłania strumieniowego ze strukturą — poznaj podstawowe pojęcia dotyczące przesyłania strumieniowego ze strukturą w usłudze Databricks.