Nastavení režimu v reálném čase

Tato stránka popisuje požadavky a konfiguraci potřebné ke spouštění dotazů v režimu v reálném čase ve strukturovaném streamování. Podrobný kurz najdete v kurzu : Spuštění úlohy streamování v reálném čase. Koncepční informace o režimu v reálném čase najdete v režimu v reálném čase ve strukturovaném streamování.

Požadavky

Pokud chcete používat režim v reálném čase, musíte nakonfigurovat výpočetní prostředky tak, aby splňovaly následující požadavky:

  • Používejte klasické výpočetní prostředky. Podporují se vyhrazené a standardní režimy přístupu. Režim standardního přístupu je podporován pouze pro Python. Deklarativní kanály Sparku Lakeflow a bezserverové clustery se nepodporují.
  • Použijte Databricks Runtime 16.4 LTS a vyšší.
  • Vypněte automatické škálování.
  • Vypněte Photon.
  • Nastavte spark.databricks.streaming.realTimeMode.enabled na hodnotu true.
  • Vypněte spotové instance, abyste se vyhnuli přerušení.

V případě úloh citlivých na latenci s funkcemi definovanými uživatelem doporučuje Databricks používat vyhrazený režim přístupu. Viz Funkce tabulky.

Pokyny k vytváření a konfiguraci klasických výpočetních prostředků najdete v referenčních informacích ke konfiguraci výpočetních prostředků.

Konfigurace dotazu

Pokud chcete spustit dotaz v režimu v reálném čase, musíte povolit trigger v reálném čase. Triggery v reálném čase se podporují jenom v režimu aktualizace.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Stanovení velikosti výpočetní kapacity

Pokud výpočetní prostředky mají dostatek slotů úloh, můžete spustit jednu úlohu v reálném čase na výpočetní prostředek.

Pokud chcete běžet v režimu s nízkou latencí, musí být celkový počet dostupných slotů úloh větší nebo roven počtu úkolů ve všech fázích dotazu.

Příklady výpočtů slotů

Typ kanálu Konfigurace Požadované sloty
Jednostupňová bezstavová konfigurace (zdroj Kafka + výstup) maxPartitions = 8 8 slotů
Stavová dvoufázová fáze (zdroj Kafka + shuffle) maxPartitions = 8, prohazovací oddíly = 20 28 slotů (8 + 20)
Třístupňový (zdroj Kafka + shuffle + přerozdělení) maxPartitions = 8, dvě fáze mixování po 20 každá 48 slotů (8 + 20 + 20)

Pokud nenastavíte maxPartitions, použijte počet partitionů v tématu Kafka.

Dodatečné zdroje