Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Na tej stronie opisano sposób korzystania z trybu czasu rzeczywistego z przesyłaniem strumieniowym ze strukturą, w tym z tym, co to jest i jak działa.
Aby uzyskać instrukcje dotyczące konfiguracji krok po kroku, zobacz Wprowadzenie do trybu w czasie rzeczywistym. Przykłady kodu można znaleźć w temacie Przykłady trybu czasu rzeczywistego. Aby uzyskać informacje o obsługiwanych źródłach, ujściach, operatorach i ograniczeniach, zobacz Informacje o trybie czasu rzeczywistego.
Co to jest tryb czasu rzeczywistego?
Tryb czasu rzeczywistego jest typem wyzwalacza dla struktur przetwarzania strumieniowego, który umożliwia przetwarzanie danych z ultra niskimi opóźnieniami, a całkowite opóźnienie może być nawet pięć milisekund. Użyj trybu czasu rzeczywistego dla obciążeń operacyjnych, które wymagają natychmiastowej reakcji na dane przesyłane strumieniowo, takie jak wykrywanie oszustw, personalizacja w czasie rzeczywistym i natychmiastowe systemy podejmowania decyzji.
Obciążenia operacyjne i analityczne
Obciążenia przesyłania strumieniowego mogą być szeroko podzielone na obciążenia operacyjne i obciążenia analityczne:
- Obciążenia operacyjne zużywają dane w czasie rzeczywistym, stosują logikę biznesową i wyzwalają akcje podrzędne lub decyzje.
- Obciążenia analityczne używają pozyskiwania i przekształcania danych, zwykle zgodnie z architekturą medalionu (na przykład pozyskiwaniu danych do tablic brązowych, srebrnych i złotych).
Oto kilka przykładów obciążeń operacyjnych:
- Blokowanie lub flagowanie transakcji karty kredytowej w czasie rzeczywistym, jeśli wynik oszustwa przekracza próg, na podstawie czynników takich jak nietypowa lokalizacja, duży rozmiar transakcji lub szybkie wzorce wydatków.
- Dostarczanie wiadomości promocyjnej, gdy dane strumienia kliknięć pokazują, że użytkownik przegląda dżinsy przez pięć minut, oferując 25% rabat, jeśli kupi w ciągu najbliższych 15 minut.
Ogólnie rzecz biorąc, obciążenia operacyjne charakteryzują się wymogiem opóźnienia end-to-end poniżej jednej sekundy. Można to osiągnąć w trybie czasu rzeczywistego w Strukturalnym Przetwarzaniu Strumieniowym Apache Spark.
Jak tryb czasu rzeczywistego osiąga małe opóźnienie
Tryb czasu rzeczywistego poprawia architekturę wykonywania przez:
- Wykonywanie długotrwałych partii (wartość domyślna to pięć minut), w których system przetwarza dane w miarę ich udostępniania w źródle.
- Planowanie wszystkich etapów zapytania jednocześnie. Wymaga to, aby liczba dostępnych slotów zadań była równa lub większa od liczby zadań we wszystkich etapach partii.
- Przekazywanie danych między etapami zaraz po ich utworzeniu przy użyciu metody przesyłania strumieniowego shuffle.
Na końcu przetwarzania partii i przed rozpoczęciem następnej partii, Structured Streaming wykonuje punkty kontrolne postępu i publikuje metryki. Czas trwania partii wpływa na częstotliwość tworzenia punktów kontrolnych:
- Dłuższe partie: rzadsze zapisywanie punktów kontrolnych, co oznacza dłuższe powtarzanie w przypadku awarii i opóźnioną dostępność wyników pomiarów.
- Krótsze partie: częstsze tworzenie punktów kontrolnych, co może mieć wpływ na opóźnienie.
Databricks zaleca przeprowadzenie testów wydajności w trybie rzeczywistym w odniesieniu do docelowego obciążenia, aby znaleźć odpowiedni interwał wyzwalania.
Kiedy używać trybu czasu rzeczywistego
Wybierz tryb czasu rzeczywistego, gdy przypadek użycia wymaga:
- Opóźnienie podrzędne: aplikacje, które muszą reagować na dane w milisekundach, takie jak systemy wykrywania oszustw, które muszą blokować transakcje w czasie rzeczywistym.
- Podejmowanie decyzji operacyjnych: Systemy, które wyzwalają natychmiastowe akcje na podstawie danych przychodzących, takich jak oferty w czasie rzeczywistym, alerty lub powiadomienia.
- Ciągłe przetwarzanie: obciążenia, w których dane muszą być przetwarzane natychmiast po ich nadejściu, a nie w okresowych partiach.
Użyj trybu mikrosadowego (domyślnego wyzwalacza przesyłania strumieniowego ze strukturą), gdy:
- Przetwarzanie analityczne: potoki ETL, przekształcenia danych i implementacje architektury medalonu, w których wymagania dotyczące opóźnienia są mierzone w sekundach lub minutach.
- Optymalizacja kosztów: obciążenia, w których opóźnienie podrzędne sekundy nie jest wymagane, ponieważ tryb czasu rzeczywistego wymaga dedykowanych zasobów obliczeniowych.
- Częstotliwość punktów kontrolnych ma znaczenie: aplikacje, które korzystają z częstszego tworzenia punktów kontrolnych w celu szybszego odzyskiwania.
Wymagania i konfiguracja
Tryb czasu rzeczywistego ma określone wymagania dotyczące konfiguracji obliczeń i konfiguracji zapytań. W tej sekcji opisano wymagania wstępne i kroki konfiguracji wymagane do korzystania z trybu czasu rzeczywistego.
Warunki wstępne
Aby korzystać z trybu czasu rzeczywistego, należy skonfigurować zasoby obliczeniowe, aby spełniały następujące wymagania:
- Databricks Runtime 16.4 LTS lub nowszy: tryb czasu rzeczywistego jest dostępny tylko w wersji DBR 16.4 LTS i nowszych.
- Dedykowane zasoby obliczeniowe: należy użyć dedykowanego (wcześniej przeznaczonego dla jednego użytkownika) zasobu obliczeniowego. Standardowe (wcześniej udostępnione), deklaratywne potoki Lakeflow Spark i klastry bezserwerowe nie są obsługiwane.
- Wyłącz skalowanie automatyczne: skalowanie automatyczne nie jest obsługiwane.
- Wyłącz Photon: Przyspieszanie Photon nie jest obsługiwane.
-
Konfiguracja platformy Spark: musisz ustawić wartość
spark.databricks.streaming.realTimeMode.enabledtrue.
Aby uzyskać instrukcje krok po kroku dotyczące tworzenia i konfigurowania zasobów obliczeniowych dla trybu czasu rzeczywistego, zobacz Wprowadzenie do trybu czasu rzeczywistego.
Konfiguracja zapytania
Aby uruchomić zapytanie w trybie czasu rzeczywistego, należy włączyć wyzwalacz w czasie rzeczywistym. Wyzwalacze w czasie rzeczywistym są obsługiwane tylko w trybie aktualizacji.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Ustalanie rozmiaru zasobów obliczeniowych
Możesz uruchomić jedno zadanie w czasie rzeczywistym na zasobie obliczeniowym, jeśli ma wystarczającą liczbę miejsc na zadania.
Aby uruchomić w trybie niskiego opóźnienia, łączna liczba dostępnych miejsc zadań musi być większa lub równa liczbie zadań we wszystkich etapach przetwarzania zapytania.
Przykłady obliczeń miejsca
| Typ potoku | Konfiguracja | Wymagane miejsca |
|---|---|---|
| Jednostopniowy, bezstanowy proces (źródło i odbiornik Kafka) |
maxPartitions = 8 |
8 miejsc |
| Dwuetapowe stanowiskowe (źródło Kafka + shuffle) |
maxPartitions = 8, partycje mieszania = 20 |
28 miejsc (8 + 20) |
| Trzy etapy (źródło Kafka + shuffle + ponowne partycjonowanie) |
maxPartitions = 8, dwa etapy mieszania, każdy po 20 elementów |
48 gniazd (8 + 20 + 20) |
Jeśli nie ustawisz maxPartitions, użyj liczby partycji w temacie w Kafka.
Wydajność
Aby uzyskać wskazówki dotyczące dostrajania zasobów obliczeniowych, techniki optymalizacji opóźnień i monitorowanie zapytań, zobacz Optymalizowanie i monitorowanie wydajności zapytań w trybie czasu rzeczywistego.
Obsługa funkcji i ograniczenia
Aby uzyskać pełną listę obsługiwanych środowisk, języków, typów obliczeniowych, źródeł, ujść, operatorów i znanych ograniczeń, zobacz Informacje o trybie czasu rzeczywistego.
Następne kroki
Teraz, gdy już wiesz, jaki jest tryb czasu rzeczywistego i jak go skonfigurować, zapoznaj się z tymi zasobami, aby rozpocząć implementowanie aplikacji przesyłania strumieniowego w czasie rzeczywistym:
- Wprowadzenie do trybu czasu rzeczywistego: postępuj zgodnie z instrukcjami krok po kroku, aby skonfigurować obliczenia i uruchomić pierwsze zapytanie przesyłania strumieniowego w czasie rzeczywistym.
- Przykłady kodu trybu w czasie rzeczywistym: zapoznaj się z przykładami roboczymi, takimi jak źródła i ujścia platformy Kafka, zapytania stanowe, agregacje i ujścia niestandardowe.
- Wydajność; Dostosuj obliczenia, zmniejsz opóźnienie za pomocą technik optymalizacji asynchronicznej i zmierz wydajność za pomocą wbudowanych metryk.
- Dokumentacja trybu w czasie rzeczywistym: Przejrzyj obsługiwane środowiska, języki, źródła, ujścia, operatory i znane ograniczenia.
- Pojęcia dotyczące przesyłania strumieniowego ze strukturą: poznaj podstawowe pojęcia dotyczące przesyłania strumieniowego ze strukturą w usłudze Databricks.