Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Na tej stronie opisano wymagania wstępne i konfigurację wymaganą do uruchamiania zapytań w trybie czasu rzeczywistego w funkcji przesyłania strumieniowego ze strukturą. Aby zapoznać się z samouczkiem krok po kroku, odwiedź Samouczek: uruchamianie zadania przesyłania strumieniowego w czasie rzeczywistym. Aby uzyskać informacje koncepcyjne na temat trybu czasu rzeczywistego, zobacz Tryb czasu rzeczywistego w Structured Streaming.
Warunki wstępne
Aby korzystać z trybu czasu rzeczywistego, należy skonfigurować zasoby obliczeniowe, aby spełniały następujące wymagania:
- Użyj klasycznych obliczeń. Obsługiwane są dedykowane i standardowe tryby dostępu. Tryb standardowego dostępu jest obsługiwany tylko dla Python. Potoki deklaratywne i klastry bezserwerowe usługi Lakeflow Spark nie są obsługiwane.
- Użyj środowiska Databricks Runtime 16.4 LTS lub nowszego.
- Wyłącz skalowanie automatyczne.
- Wyłącz photon.
- Ustaw opcję
spark.databricks.streaming.realTimeMode.enablednatrue. - Wyłącz wystąpienia typu spot, aby uniknąć przerw.
W przypadku obciążeń wrażliwych na opóźnienia z funkcjami UDF usługa Databricks zaleca korzystanie z dedykowanego trybu dostępu. Zobacz Funkcje tabeli.
Aby uzyskać instrukcje dotyczące tworzenia i konfigurowania klasycznych obliczeń, zobacz Dokumentacja konfiguracji obliczeń.
Konfiguracja zapytania
Aby uruchomić zapytanie w trybie czasu rzeczywistego, należy włączyć wyzwalacz w czasie rzeczywistym. Wyzwalacze w czasie rzeczywistym są obsługiwane tylko w trybie aktualizacji.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala - język programowania
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Ustalanie rozmiaru zasobów obliczeniowych
Możesz uruchomić jedno zadanie w czasie rzeczywistym na zasobie obliczeniowym, jeśli ma wystarczającą liczbę miejsc na zadania.
Aby uruchomić w trybie niskiego opóźnienia, łączna liczba dostępnych miejsc zadań musi być większa lub równa liczbie zadań we wszystkich etapach przetwarzania zapytania.
Przykłady obliczeń miejsca
| Typ potoku | Konfiguracja | Wymagane miejsca |
|---|---|---|
| Jednostopniowy, bezstanowy proces (źródło i odbiornik Kafka) |
maxPartitions = 8 |
8 miejsc |
| Dwuetapowe stanowiskowe (źródło Kafka + shuffle) |
maxPartitions = 8, partycje mieszania = 20 |
28 miejsc (8 + 20) |
| Trzy etapy (źródło Kafka + shuffle + ponowne partycjonowanie) |
maxPartitions = 8, dwa etapy mieszania, każdy po 20 elementów |
48 gniazd (8 + 20 + 20) |
Jeśli nie ustawisz maxPartitions, użyj liczby partycji w temacie w Kafka.