Udostępnij za pośrednictwem


Tryb czasu rzeczywistego w streamingu ustrukturyzowanym

Tryb czasu rzeczywistego jest typem wyzwalacza dla struktur przetwarzania strumieniowego, który umożliwia przetwarzanie danych z ultra niskimi opóźnieniami, a całkowite opóźnienie może być nawet pięć milisekund. Użyj trybu czasu rzeczywistego dla obciążeń operacyjnych, które wymagają natychmiastowej reakcji na dane przesyłane strumieniowo, takie jak wykrywanie oszustw, personalizacja w czasie rzeczywistym i natychmiastowe systemy podejmowania decyzji.

Tryb czasu rzeczywistego jest dostępny w środowisku Databricks Runtime 16.4 LTS lub nowszym. Aby uzyskać instrukcje dotyczące konfiguracji krok po kroku, zobacz Wprowadzenie do trybu w czasie rzeczywistym. Przykłady kodu można znaleźć w temacie Przykłady trybu czasu rzeczywistego.

Co to jest tryb czasu rzeczywistego?

Obciążenia operacyjne i analityczne

Obciążenia przesyłania strumieniowego można szeroko podzielić na obciążenia analityczne i obciążenia operacyjne:

  • Obciążenia analityczne używają pozyskiwania i przekształcania danych, zwykle zgodnie z architekturą medalionu (na przykład pozyskiwaniu danych do tablic brązowych, srebrnych i złotych).
  • Obciążenia operacyjne zużywają dane w czasie rzeczywistym, stosują logikę biznesową i wyzwalają akcje podrzędne lub decyzje.

Oto kilka przykładów obciążeń operacyjnych:

  • Blokowanie lub flagowanie transakcji karty kredytowej w czasie rzeczywistym, jeśli wynik oszustwa przekracza próg, na podstawie czynników takich jak nietypowa lokalizacja, duży rozmiar transakcji lub szybkie wzorce wydatków.
  • Dostarczanie wiadomości promocyjnej, gdy dane strumienia kliknięć pokazują, że użytkownik przegląda dżinsy przez pięć minut, oferując 25% rabat, jeśli kupi w ciągu najbliższych 15 minut.

Ogólnie rzecz biorąc, obciążenia operacyjne charakteryzują się wymogiem opóźnienia end-to-end poniżej jednej sekundy. Można to osiągnąć w trybie czasu rzeczywistego w Strukturalnym Przetwarzaniu Strumieniowym Apache Spark.

Jak tryb czasu rzeczywistego osiąga małe opóźnienie

Tryb czasu rzeczywistego poprawia architekturę wykonywania przez:

  • Wykonywanie długotrwałych partii (wartość domyślna to pięć minut), w których system przetwarza dane w miarę ich udostępniania w źródle.
  • Planowanie wszystkich etapów zapytania jednocześnie. Wymaga to, aby liczba dostępnych slotów zadań była równa lub większa od liczby zadań we wszystkich etapach partii.
  • Przekazywanie danych między etapami zaraz po ich utworzeniu przy użyciu metody przesyłania strumieniowego shuffle.

Na końcu przetwarzania partii i przed rozpoczęciem następnej partii, Structured Streaming wykonuje punkty kontrolne postępu i publikuje metryki. Czas trwania partii wpływa na częstotliwość tworzenia punktów kontrolnych:

  • Dłuższe partie: rzadsze zapisywanie punktów kontrolnych, co oznacza dłuższe powtarzanie w przypadku awarii i opóźnioną dostępność wyników pomiarów.
  • Krótsze partie: częstsze tworzenie punktów kontrolnych, co może mieć wpływ na opóźnienie.

Databricks zaleca przeprowadzenie testów wydajności w trybie rzeczywistym w odniesieniu do docelowego obciążenia, aby znaleźć odpowiedni interwał wyzwalania.

Kiedy używać trybu czasu rzeczywistego

Wybierz tryb czasu rzeczywistego, gdy przypadek użycia wymaga:

  • Opóźnienie podrzędne: aplikacje, które muszą reagować na dane w milisekundach, takie jak systemy wykrywania oszustw, które muszą blokować transakcje w czasie rzeczywistym.
  • Podejmowanie decyzji operacyjnych: Systemy, które wyzwalają natychmiastowe akcje na podstawie danych przychodzących, takich jak oferty w czasie rzeczywistym, alerty lub powiadomienia.
  • Ciągłe przetwarzanie: obciążenia, w których dane muszą być przetwarzane natychmiast po ich nadejściu, a nie w okresowych partiach.

Użyj trybu mikrosadowego (domyślnego wyzwalacza przesyłania strumieniowego ze strukturą), gdy:

  • Przetwarzanie analityczne: potoki ETL, przekształcenia danych i implementacje architektury medalonu, w których wymagania dotyczące opóźnienia są mierzone w sekundach lub minutach.
  • Optymalizacja kosztów: obciążenia, w których opóźnienie podrzędne sekundy nie jest wymagane, ponieważ tryb czasu rzeczywistego wymaga dedykowanych zasobów obliczeniowych.
  • Częstotliwość punktów kontrolnych ma znaczenie: aplikacje, które korzystają z częstszego tworzenia punktów kontrolnych w celu szybszego odzyskiwania.

Wymagania i konfiguracja

Tryb czasu rzeczywistego ma określone wymagania dotyczące konfiguracji obliczeń i konfiguracji zapytań. W tej sekcji opisano wymagania wstępne i kroki konfiguracji wymagane do korzystania z trybu czasu rzeczywistego.

Wymagania wstępne

Aby korzystać z trybu czasu rzeczywistego, należy spełnić następujące wymagania:

  • Databricks Runtime 16.4 LTS lub nowszy: tryb czasu rzeczywistego jest dostępny tylko w wersji DBR 16.4 LTS i nowszych.
  • Dedykowane zasoby obliczeniowe: należy użyć dedykowanego (wcześniej przeznaczonego dla jednego użytkownika) zasobu obliczeniowego. Standardowe (wcześniej udostępnione), deklaratywne potoki Lakeflow Spark i klastry bezserwerowe nie są obsługiwane.
  • Brak skalowania automatycznego: skalowanie automatyczne musi być wyłączone.
  • Brak Photon: przyspieszanie Photon nie jest obsługiwane w trybie czasu rzeczywistego.
  • Konfiguracja platformy Spark: musisz ustawić wartość spark.databricks.streaming.realTimeMode.enabledtrue.

Konfiguracja obliczeniowa

Skonfiguruj obliczenia przy użyciu następujących ustawień:

  • Ustaw spark.databricks.streaming.realTimeMode.enabled na true w konfiguracji Spark.
  • Wyłącz skalowanie automatyczne.
  • Wyłącz przyspieszanie Photon.
  • Upewnij się, że zasoby obliczeniowe są skonfigurowane jako klaster dedykowany (a nie standardowy, Lakeflow Spark Declarative Pipelines ani bezserwerowy).

Aby uzyskać instrukcje krok po kroku dotyczące tworzenia i konfigurowania zasobów obliczeniowych dla trybu czasu rzeczywistego, zobacz Wprowadzenie do trybu czasu rzeczywistego.

Konfiguracja zapytania

Aby uruchomić zapytanie w trybie czasu rzeczywistego, należy włączyć wyzwalacz w czasie rzeczywistym. Wyzwalacze w czasie rzeczywistym są obsługiwane tylko w trybie aktualizacji.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Ustalanie rozmiaru zasobów obliczeniowych

Możesz uruchomić jedno zadanie w czasie rzeczywistym na zasobie obliczeniowym, jeśli ma wystarczającą liczbę miejsc na zadania.

Aby uruchomić w trybie niskiego opóźnienia, łączna liczba dostępnych miejsc zadań musi być większa lub równa liczbie zadań we wszystkich etapach przetwarzania zapytania.

Przykłady obliczeń miejsca

Typ potoku Konfiguracja Wymagane miejsca
Jednostopniowy, bezstanowy proces (źródło i odbiornik Kafka) maxPartitions = 8 8 miejsc
Dwuetapowe stanowiskowe (źródło Kafka + shuffle) maxPartitions = 8, partycje mieszania = 20 28 miejsc (8 + 20)
Trzy etapy (źródło Kafka + shuffle + ponowne partycjonowanie) maxPartitions = 8, dwa etapy mieszania, każdy po 20 elementów 48 gniazd (8 + 20 + 20)

Jeśli nie ustawisz maxPartitions, użyj liczby partycji w temacie w Kafka.

Kluczowe zagadnienia

Podczas konfigurowania obliczeń należy wziąć pod uwagę następujące kwestie:

  • W przeciwieństwie do trybu mikrosadowego, zadania w czasie rzeczywistym mogą pozostawać bezczynne podczas oczekiwania na dane, więc odpowiednie dopasowanie rozmiaru jest niezbędne, aby uniknąć marnowania zasobów.
  • Celuj w celu osiągnięcia docelowego poziomu wykorzystania (na przykład 50%) przez dostrajanie:
    • maxPartitions (dla platformy Kafka)
    • spark.sql.shuffle.partitions (dla etapów tasowania)
  • Usługa Databricks zaleca ustawienie maxPartitions tak, aby każde zadanie obsługiwało wiele partycji platformy Kafka w celu zmniejszenia obciążenia.
  • Dostosuj gniazda zadań przypadające na pracownika, aby dopasować obciążenie pracą do prostych zadań jednofazowych.
  • W przypadku zadań wymagających intensywnego mieszania eksperymentuj, aby znaleźć minimalną liczbę partycji mieszania, które unikają opóźnień, a następnie dostosuj wartość początkową. Obliczenia nie będą planować zadania, jeśli nie ma wystarczającej liczby miejsc.

Note

W środowisku Databricks Runtime 16.4 LTS i nowszym wszystkie potoki przetwarzania w czasie rzeczywistym używają punktów kontrolnych w wersji 2 (checkpoint v2), co umożliwia bezproblemowe przełączanie między trybami czasu rzeczywistego i mikrosadowego.

Techniki optymalizacji

Użyj następujących technik, aby zmniejszyć opóźnienie w trybie czasu rzeczywistego:

  • Śledzenie postępu asynchronicznego: przenosi zapisy do dzienników przesunięcia i zatwierdzania do wątku asynchronicznego, co zmniejsza czas międzysadowy dla zapytań bezstanowych.
  • Asynchroniczne tworzenie punktów kontrolnych stanu: rozpoczyna przetwarzanie następnej mikropartii natychmiast po zakończeniu obliczeń, bez potrzeby oczekiwania na tworzenie punktów kontrolnych stanu, co zmniejsza opóźnienie zapytań stanowych.

Note

Żadna technika nie jest domyślnie włączona. Należy je włączyć oddzielnie.

monitorowanie i obserwowanie

Mierzenie wydajności zapytań jest niezbędne w przypadku obciążeń w czasie rzeczywistym. W trybie czasu rzeczywistego tradycyjne metryki procesów wsadowych nie odzwierciedlają rzeczywistego opóźnienia, więc konieczne są alternatywne podejścia.

Kompleksowe opóźnienie jest specyficzne dla obciążenia, a czasami może być dokładnie mierzone tylko za pomocą logiki biznesowej. Jeśli na przykład źródłowy znacznik czasu jest przesyłany w Kafka, możesz obliczyć opóźnienie jako różnicę między sygnaturą czasową danych wyjściowych w Kafka a sygnaturą czasową źródła.

Możesz również oszacować kompleksowe opóźnienie przy użyciu wbudowanych metryk i interfejsów API opisanych poniżej.

Wbudowane metryki z StreamingQueryProgress

Następujące metryki są uwzględniane w StreamingQueryProgress zdarzeniu, które jest automatycznie rejestrowane w dziennikach sterowników. Dostęp do nich można również uzyskać za pośrednictwem funkcji wywołania zwrotnego StreamingQueryListeneronQueryProgress(). QueryProgressEvent.json() lub toString() uwzględniają dodatkowe metryki trybu w czasie rzeczywistym.

  1. Opóźnienie przetwarzania (processingLatencyMs). Czas, który upłynął między momentem, gdy zapytanie w trybie rzeczywistym odczytuje rekord, a momentem, gdy zapytanie zapisuje go w kolejnym etapie przetwarzania. W przypadku zapytań jednoetapowych oznacza to ten sam czas trwania co opóźnienie E2E. System raportuje tę metrykę na każde zadanie.
  2. Źródłowe opóźnienie kolejkowania (sourceQueuingLatencyMs). Czas, jaki mija od momentu zapisania rekordu przez system w magistrali komunikatów, na przykład czas dołączenia dziennika w Kafka, zanim zapytanie w trybie czasu rzeczywistego po raz pierwszy odczyta rekord. System raportuje tę metrykę na każde zadanie.
  3. Opóźnienie E2E (e2eLatencyMs). Czas między momentem, kiedy system zapisuje rekord do magistrali komunikatów, a momentem, kiedy zapytanie w trybie rzeczywistym zapisuje rekord dalej w procesie. System agreguje tę metrykę na każdą partię we wszystkich rekordach przetwarzanych przez wszystkie zadania.

Przykład:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

Dedykowany pomiar opóźnienia za pomocą API Observe

Interfejs API obserwowania pomaga zmierzyć opóźnienie bez uruchamiania innego zadania. Jeśli masz sygnaturę czasową źródła, która przybliża czas przybycia danych źródłowych, możesz oszacować opóźnienie każdej partii przy użyciu interfejsu API obserwowania. Przekaż znacznik czasu przed dotarciem do ujścia:

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

W tym przykładzie bieżący znacznik czasu jest rejestrowany przed wyprowadzeniem wpisu, a opóźnienie jest szacowane przez obliczenie różnicy między tym znacznikiem czasu a znacznikiem czasu źródłowego rekordu. Wyniki są uwzględniane w raportach postępu i udostępniane odbiornikom. Oto przykładowe dane wyjściowe:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Obsługa funkcji i ograniczenia

W tej sekcji opisano obsługiwane funkcje i bieżące ograniczenia trybu w czasie rzeczywistym, w tym zgodne środowiska, języki, źródła, ujścia, operatory i specjalne zagadnienia dotyczące określonych funkcji.

Obsługiwane środowiska, języki i tryby

Obsługiwane języki: Tryb czasu rzeczywistego obsługuje języki Scala, Java i Python.

Obsługiwane typy obliczeń:

Typ środowiska obliczeniowego Supported
Dedykowany (dawniej: dla pojedynczego użytkownika)
Standardowa (dawniej: udostępnione) √ (tylko język Python)
Lakeflow Spark Deklaratywne potoki klasyczne Niewspierane
Deklaratywne potoki Lakeflow Spark bezserwerowe Niewspierane
Serverless Niewspierane

Obsługiwane tryby wykonywania:

Tryb wykonywania Supported
Tryb aktualizacji
tryb dołączania Niewspierane
Tryb pełny Niewspierane

Obsługa źródła i ujścia

Źródło lub ujście Jako źródło Jako ujście
Apache Kafka
Event Hubs (przy użyciu łącznika Kafka)
Kinesis √ (tylko tryb EFO) Niewspierane
AWS MSK Niewspierane
Delta Niewspierane Niewspierane
Google Pub/Sub (usługa przesyłania wiadomości) Niewspierane Niewspierane
Apache Pulsar Niewspierane Niewspierane
Dowolne ujścia (przy użyciu forEachWriter) Nie dotyczy

Obsługiwane operatory

Operators Supported
Operacje bezstanowe
Selection
Projection
funkcje zdefiniowane przez użytkownika (UDF)
Scala UDF √ (z pewnymi ograniczeniami)
Python UDF √ (z pewnymi ograniczeniami)
Agregacja
sum
count
max
min
avg
Funkcje agregacji
Windowing
Tumbling
Sliding
Session Niewspierane
Deduplikacja
usuńDuplikaty √ (stan jest niezwiązany)
UsuńDuplikatyWZnakuWodnym Niewspierane
Strumień / Łączenie tabel
Tabela nadawania (powinna być mała)
Strumień — przyłączanie do strumienia Niewspierane
(płaskie)MapGroupsWithState Niewspierane
transformWithState √ (z pewnymi różnicami)
związek √ (z pewnymi ograniczeniami)
forEach
forEachBatch Niewspierane
mapPartitions Nieobsługiwane (zobacz ograniczenie)

Uwagi specjalne

Niektóre operatory i funkcje mają konkretne zagadnienia lub różnice w przypadku użycia w trybie czasu rzeczywistego.

transformWithState w trybie czasu rzeczywistego

W przypadku tworzenia niestandardowych aplikacji stanowych usługa Databricks obsługuje API transformWithState w strukturze strumieniowej Apache Spark. Aby uzyskać więcej informacji na temat interfejsu API i przykładów kodu, zapoznaj się z Tworzenie niestandardowej aplikacji stanowej.

Istnieją jednak pewne różnice między działaniem interfejsu API w trybie pracy w czasie rzeczywistym a tradycyjnymi zapytaniami strumieniowymi korzystającymi z architektury mikrosadowej.

  • Tryb czasu rzeczywistego wywołuje metodę handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) dla każdego wiersza.
    • Iterator inputRows zwraca pojedynczą wartość. Tryb mikrosadowy wywołuje go raz dla każdego klucza, a iterator inputRows zwraca wszystkie wartości dla danego klucza w danej mikrosadowej partii.
    • Należy pamiętać o tej różnicy podczas pisania kodu.
  • Czasomierze czasu zdarzeń nie są obsługiwane w trybie czasu rzeczywistego.
  • W trybie czasu rzeczywistego zegary są opóźnione w uruchamianiu w zależności od przybycia danych.
    • Jeśli czasomierz jest zaplanowany na 10:00:00, ale żadne dane nie docierają, czasomierz nie jest uruchamiany natychmiast.
    • Jeśli dane pojawią się o godzinie 10:00:10, czasomierz zostanie wyzwolony z 10-sekundowym opóźnieniem.
    • Jeśli żadne dane nie docierają, a długotrwała partia kończy działanie, czasomierz jest uruchamiany przed zakończeniem partii.

Funkcje zdefiniowane przez użytkownika języka Python w trybie czasu rzeczywistego

Usługa Databricks obsługuje większość funkcji zdefiniowanych przez użytkownika (UDF) języka Python w trybie czasu rzeczywistego:

Kategoria Typ UDF Supported
Bezstanowa Funkcja UDF skalarna języka Python (funkcje skalarne zdefiniowane przez użytkownika — Python)
Bezstanowa Funkcja skalarna Arrow zdefiniowana przez użytkownika (UDF)
Bezstanowa Funkcja UDF skalarna biblioteki Pandas (funkcje zdefiniowane przez użytkownika biblioteki Pandas)
Bezstanowa Funkcja strzałkowa (mapInArrow)
Bezstanowa Funkcja Pandas (Map)
Grupowanie stanowe (UDAF) transformWithState (Row tylko interfejs)
Grupowanie stanowe (UDAF) applyInPandasWithState Niewspierane
Grupowanie niestanowe (UDAF) apply Niewspierane
Grupowanie niestanowe (UDAF) applyInArrow Niewspierane
Grupowanie niestanowe (UDAF) applyInPandas Niewspierane
Funkcja tabeli UDTF (funkcje tabeli zdefiniowane przez użytkownika w języku Python (UDTFs)) Niewspierane
Funkcja tabeli UC UDF Niewspierane

Istnieje kilka kwestii, które należy wziąć pod uwagę podczas korzystania z funkcji UDF języka Python w trybie czasu rzeczywistego:

  • Aby zminimalizować opóźnienie, skonfiguruj rozmiar partii strzałki (spark.sql.execution.arrow.maxRecordsPerBatch) na 1.
    • Kompromis: ta konfiguracja optymalizuje opóźnienia kosztem przepływności. W przypadku większości obciążeń to ustawienie jest zalecane.
    • Zwiększ rozmiar partii tylko wtedy, gdy wymagana jest większa przepływność w celu uwzględnienia woluminu wejściowego, akceptując potencjalny wzrost opóźnienia.
  • Funkcje Pandas UDF oraz inne funkcje nie działają dobrze z rozmiarem partii Arrow ustawionym na 1.
    • Jeśli używasz UDF-ów lub funkcji pandas, ustaw rozmiar partii Arrow na wyższą wartość (na przykład 100 lub więcej).
    • Należy pamiętać, że oznacza to większe opóźnienie. Usługa Databricks zaleca używanie funkcji Arrow UDF, jeśli to możliwe.
  • Ze względu na problem z wydajnością biblioteki pandas funkcja transformWithState jest obsługiwana tylko w interfejsie Row .

Limitations

Ograniczenia źródła

W przypadku kinesis tryb czasu rzeczywistego nie obsługuje trybu sondowania. Co więcej, częste ponowne partycjonowania mogą mieć negatywny wpływ na opóźnienie.

Ograniczenia unii

Operator Unii ma pewne ograniczenia:

  • Tryb czasu rzeczywistego nie obsługuje samodzielnej unii:
    • Kafka: Nie można używać tego samego obiektu ramki danych źródłowych do łączenia pochodnych ramek danych z niego. Obejście: Użyj różnych ramek danych odczytywanych z tego samego źródła.
    • Kinesis: nie można połączyć ramek danych pochodzących z tego samego źródła Kinesis z tą samą konfiguracją. Obejście: Oprócz używania różnych ramek danych można przypisać inną opcję "consumerName" do każdej ramki danych.
  • Tryb czasu rzeczywistego nie obsługuje operatorów stanowych (na przykład aggregate, , deduplicatetransformWithState) zdefiniowanych przed Unią.
  • Tryb czasu rzeczywistego nie obsługuje łączenia ze źródłami wsadowymi.

Ograniczenie usługi MapPartitions

mapPartitions W języku Scala i podobnymi interfejsami API języka Python (mapInPandas, mapInArrow) wykonaj iterator całej partycji wejściowej i utwórz iterator całego danych wyjściowych z dowolnym mapowaniem między danymi wejściowymi i wyjściowymi. Te interfejsy programowania aplikacji (API) mogą powodować problemy z wydajnością w trybie przesyłania strumieniowego w czasie rzeczywistym, blokując cały strumień wyjściowy, co zwiększa opóźnienie. Semantyka tych interfejsów API nie wspiera dobrze propagacji znaków wodnych.

Użyj skalarnych funkcji zdefiniowanych przez użytkownika w połączeniu z przekształcaniem złożonych typów danych lub filter zamiast tego, aby uzyskać podobną funkcjonalność.

Następne kroki

Teraz, gdy już wiesz, jaki jest tryb czasu rzeczywistego i jak go skonfigurować, zapoznaj się z tymi zasobami, aby rozpocząć implementowanie aplikacji przesyłania strumieniowego w czasie rzeczywistym: