Udostępnij za pośrednictwem


Tryb czasu rzeczywistego w streamingu ustrukturyzowanym

Na tej stronie opisano sposób korzystania z trybu czasu rzeczywistego z przesyłaniem strumieniowym ze strukturą, w tym z tym, co to jest i jak działa.

Aby uzyskać instrukcje dotyczące konfiguracji krok po kroku, zobacz Wprowadzenie do trybu w czasie rzeczywistym. Przykłady kodu można znaleźć w temacie Przykłady trybu czasu rzeczywistego. Aby uzyskać informacje o obsługiwanych źródłach, ujściach, operatorach i ograniczeniach, zobacz Informacje o trybie czasu rzeczywistego.

Co to jest tryb czasu rzeczywistego?

Tryb czasu rzeczywistego jest typem wyzwalacza dla struktur przetwarzania strumieniowego, który umożliwia przetwarzanie danych z ultra niskimi opóźnieniami, a całkowite opóźnienie może być nawet pięć milisekund. Użyj trybu czasu rzeczywistego dla obciążeń operacyjnych, które wymagają natychmiastowej reakcji na dane przesyłane strumieniowo, takie jak wykrywanie oszustw, personalizacja w czasie rzeczywistym i natychmiastowe systemy podejmowania decyzji.

Obciążenia operacyjne i analityczne

Obciążenia przesyłania strumieniowego mogą być szeroko podzielone na obciążenia operacyjne i obciążenia analityczne:

  • Obciążenia operacyjne zużywają dane w czasie rzeczywistym, stosują logikę biznesową i wyzwalają akcje podrzędne lub decyzje.
  • Obciążenia analityczne używają pozyskiwania i przekształcania danych, zwykle zgodnie z architekturą medalionu (na przykład pozyskiwaniu danych do tablic brązowych, srebrnych i złotych).

Oto kilka przykładów obciążeń operacyjnych:

  • Blokowanie lub flagowanie transakcji karty kredytowej w czasie rzeczywistym, jeśli wynik oszustwa przekracza próg, na podstawie czynników takich jak nietypowa lokalizacja, duży rozmiar transakcji lub szybkie wzorce wydatków.
  • Dostarczanie wiadomości promocyjnej, gdy dane strumienia kliknięć pokazują, że użytkownik przegląda dżinsy przez pięć minut, oferując 25% rabat, jeśli kupi w ciągu najbliższych 15 minut.

Ogólnie rzecz biorąc, obciążenia operacyjne charakteryzują się wymogiem opóźnienia end-to-end poniżej jednej sekundy. Można to osiągnąć w trybie czasu rzeczywistego w Strukturalnym Przetwarzaniu Strumieniowym Apache Spark.

Jak tryb czasu rzeczywistego osiąga małe opóźnienie

Tryb czasu rzeczywistego poprawia architekturę wykonywania przez:

  • Wykonywanie długotrwałych partii (wartość domyślna to pięć minut), w których system przetwarza dane w miarę ich udostępniania w źródle.
  • Planowanie wszystkich etapów zapytania jednocześnie. Wymaga to, aby liczba dostępnych slotów zadań była równa lub większa od liczby zadań we wszystkich etapach partii.
  • Przekazywanie danych między etapami zaraz po ich utworzeniu przy użyciu metody przesyłania strumieniowego shuffle.

Na końcu przetwarzania partii i przed rozpoczęciem następnej partii, Structured Streaming wykonuje punkty kontrolne postępu i publikuje metryki. Czas trwania partii wpływa na częstotliwość tworzenia punktów kontrolnych:

  • Dłuższe partie: rzadsze zapisywanie punktów kontrolnych, co oznacza dłuższe powtarzanie w przypadku awarii i opóźnioną dostępność wyników pomiarów.
  • Krótsze partie: częstsze tworzenie punktów kontrolnych, co może mieć wpływ na opóźnienie.

Databricks zaleca przeprowadzenie testów wydajności w trybie rzeczywistym w odniesieniu do docelowego obciążenia, aby znaleźć odpowiedni interwał wyzwalania.

Kiedy używać trybu czasu rzeczywistego

Wybierz tryb czasu rzeczywistego, gdy przypadek użycia wymaga:

  • Opóźnienie podrzędne: aplikacje, które muszą reagować na dane w milisekundach, takie jak systemy wykrywania oszustw, które muszą blokować transakcje w czasie rzeczywistym.
  • Podejmowanie decyzji operacyjnych: Systemy, które wyzwalają natychmiastowe akcje na podstawie danych przychodzących, takich jak oferty w czasie rzeczywistym, alerty lub powiadomienia.
  • Ciągłe przetwarzanie: obciążenia, w których dane muszą być przetwarzane natychmiast po ich nadejściu, a nie w okresowych partiach.

Użyj trybu mikrosadowego (domyślnego wyzwalacza przesyłania strumieniowego ze strukturą), gdy:

  • Przetwarzanie analityczne: potoki ETL, przekształcenia danych i implementacje architektury medalonu, w których wymagania dotyczące opóźnienia są mierzone w sekundach lub minutach.
  • Optymalizacja kosztów: obciążenia, w których opóźnienie podrzędne sekundy nie jest wymagane, ponieważ tryb czasu rzeczywistego wymaga dedykowanych zasobów obliczeniowych.
  • Częstotliwość punktów kontrolnych ma znaczenie: aplikacje, które korzystają z częstszego tworzenia punktów kontrolnych w celu szybszego odzyskiwania.

Wymagania i konfiguracja

Tryb czasu rzeczywistego ma określone wymagania dotyczące konfiguracji obliczeń i konfiguracji zapytań. W tej sekcji opisano wymagania wstępne i kroki konfiguracji wymagane do korzystania z trybu czasu rzeczywistego.

Warunki wstępne

Aby korzystać z trybu czasu rzeczywistego, należy skonfigurować zasoby obliczeniowe, aby spełniały następujące wymagania:

  • Databricks Runtime 16.4 LTS lub nowszy: tryb czasu rzeczywistego jest dostępny tylko w wersji DBR 16.4 LTS i nowszych.
  • Dedykowane zasoby obliczeniowe: należy użyć dedykowanego (wcześniej przeznaczonego dla jednego użytkownika) zasobu obliczeniowego. Standardowe (wcześniej udostępnione), deklaratywne potoki Lakeflow Spark i klastry bezserwerowe nie są obsługiwane.
  • Wyłącz skalowanie automatyczne: skalowanie automatyczne nie jest obsługiwane.
  • Wyłącz Photon: Przyspieszanie Photon nie jest obsługiwane.
  • Konfiguracja platformy Spark: musisz ustawić wartość spark.databricks.streaming.realTimeMode.enabledtrue.

Aby uzyskać instrukcje krok po kroku dotyczące tworzenia i konfigurowania zasobów obliczeniowych dla trybu czasu rzeczywistego, zobacz Wprowadzenie do trybu czasu rzeczywistego.

Konfiguracja zapytania

Aby uruchomić zapytanie w trybie czasu rzeczywistego, należy włączyć wyzwalacz w czasie rzeczywistym. Wyzwalacze w czasie rzeczywistym są obsługiwane tylko w trybie aktualizacji.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Ustalanie rozmiaru zasobów obliczeniowych

Możesz uruchomić jedno zadanie w czasie rzeczywistym na zasobie obliczeniowym, jeśli ma wystarczającą liczbę miejsc na zadania.

Aby uruchomić w trybie niskiego opóźnienia, łączna liczba dostępnych miejsc zadań musi być większa lub równa liczbie zadań we wszystkich etapach przetwarzania zapytania.

Przykłady obliczeń miejsca

Typ potoku Konfiguracja Wymagane miejsca
Jednostopniowy, bezstanowy proces (źródło i odbiornik Kafka) maxPartitions = 8 8 miejsc
Dwuetapowe stanowiskowe (źródło Kafka + shuffle) maxPartitions = 8, partycje mieszania = 20 28 miejsc (8 + 20)
Trzy etapy (źródło Kafka + shuffle + ponowne partycjonowanie) maxPartitions = 8, dwa etapy mieszania, każdy po 20 elementów 48 gniazd (8 + 20 + 20)

Jeśli nie ustawisz maxPartitions, użyj liczby partycji w temacie w Kafka.

Wydajność

Aby uzyskać wskazówki dotyczące dostrajania zasobów obliczeniowych, techniki optymalizacji opóźnień i monitorowanie zapytań, zobacz Optymalizowanie i monitorowanie wydajności zapytań w trybie czasu rzeczywistego.

Obsługa funkcji i ograniczenia

Aby uzyskać pełną listę obsługiwanych środowisk, języków, typów obliczeniowych, źródeł, ujść, operatorów i znanych ograniczeń, zobacz Informacje o trybie czasu rzeczywistego.

Następne kroki

Teraz, gdy już wiesz, jaki jest tryb czasu rzeczywistego i jak go skonfigurować, zapoznaj się z tymi zasobami, aby rozpocząć implementowanie aplikacji przesyłania strumieniowego w czasie rzeczywistym: