Konfigurowanie trybu czasu rzeczywistego

Na tej stronie opisano wymagania wstępne i konfigurację wymaganą do uruchamiania zapytań w trybie czasu rzeczywistego w funkcji przesyłania strumieniowego ze strukturą. Aby zapoznać się z samouczkiem krok po kroku, odwiedź Samouczek: uruchamianie zadania przesyłania strumieniowego w czasie rzeczywistym. Aby uzyskać informacje koncepcyjne na temat trybu czasu rzeczywistego, zobacz Tryb czasu rzeczywistego w Structured Streaming.

Warunki wstępne

Aby korzystać z trybu czasu rzeczywistego, należy skonfigurować zasoby obliczeniowe, aby spełniały następujące wymagania:

  • Użyj klasycznych obliczeń. Obsługiwane są dedykowane i standardowe tryby dostępu. Tryb standardowego dostępu jest obsługiwany tylko dla Python. Potoki deklaratywne i klastry bezserwerowe usługi Lakeflow Spark nie są obsługiwane.
  • Użyj środowiska Databricks Runtime 16.4 LTS lub nowszego.
  • Wyłącz skalowanie automatyczne.
  • Wyłącz photon.
  • Ustaw opcję spark.databricks.streaming.realTimeMode.enabled na true.
  • Wyłącz wystąpienia typu spot, aby uniknąć przerw.

W przypadku obciążeń wrażliwych na opóźnienia z funkcjami UDF usługa Databricks zaleca korzystanie z dedykowanego trybu dostępu. Zobacz Funkcje tabeli.

Aby uzyskać instrukcje dotyczące tworzenia i konfigurowania klasycznych obliczeń, zobacz Dokumentacja konfiguracji obliczeń.

Konfiguracja zapytania

Aby uruchomić zapytanie w trybie czasu rzeczywistego, należy włączyć wyzwalacz w czasie rzeczywistym. Wyzwalacze w czasie rzeczywistym są obsługiwane tylko w trybie aktualizacji.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala - język programowania

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Ustalanie rozmiaru zasobów obliczeniowych

Możesz uruchomić jedno zadanie w czasie rzeczywistym na zasobie obliczeniowym, jeśli ma wystarczającą liczbę miejsc na zadania.

Aby uruchomić w trybie niskiego opóźnienia, łączna liczba dostępnych miejsc zadań musi być większa lub równa liczbie zadań we wszystkich etapach przetwarzania zapytania.

Przykłady obliczeń miejsca

Typ potoku Konfiguracja Wymagane miejsca
Jednostopniowy, bezstanowy proces (źródło i odbiornik Kafka) maxPartitions = 8 8 miejsc
Dwuetapowe stanowiskowe (źródło Kafka + shuffle) maxPartitions = 8, partycje mieszania = 20 28 miejsc (8 + 20)
Trzy etapy (źródło Kafka + shuffle + ponowne partycjonowanie) maxPartitions = 8, dwa etapy mieszania, każdy po 20 elementów 48 gniazd (8 + 20 + 20)

Jeśli nie ustawisz maxPartitions, użyj liczby partycji w temacie w Kafka.

Dodatkowe zasoby