Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Tato stránka popisuje požadavky a konfiguraci potřebné ke spouštění dotazů v režimu v reálném čase ve strukturovaném streamování. Podrobný kurz najdete v kurzu : Spuštění úlohy streamování v reálném čase. Koncepční informace o režimu v reálném čase najdete v režimu v reálném čase ve strukturovaném streamování.
Požadavky
Pokud chcete používat režim v reálném čase, musíte nakonfigurovat výpočetní prostředky tak, aby splňovaly následující požadavky:
- Používejte klasické výpočetní prostředky. Podporují se vyhrazené a standardní režimy přístupu. Režim standardního přístupu je podporován pouze pro Python. Deklarativní kanály Sparku Lakeflow a bezserverové clustery se nepodporují.
- Použijte Databricks Runtime 16.4 LTS a vyšší.
- Vypněte automatické škálování.
- Vypněte Photon.
- Nastavte
spark.databricks.streaming.realTimeMode.enabledna hodnotutrue. - Vypněte spotové instance, abyste se vyhnuli přerušení.
V případě úloh citlivých na latenci s funkcemi definovanými uživatelem doporučuje Databricks používat vyhrazený režim přístupu. Viz Funkce tabulky.
Pokyny k vytváření a konfiguraci klasických výpočetních prostředků najdete v referenčních informacích ke konfiguraci výpočetních prostředků.
Konfigurace dotazu
Pokud chcete spustit dotaz v režimu v reálném čase, musíte povolit trigger v reálném čase. Triggery v reálném čase se podporují jenom v režimu aktualizace.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Stanovení velikosti výpočetní kapacity
Pokud výpočetní prostředky mají dostatek slotů úloh, můžete spustit jednu úlohu v reálném čase na výpočetní prostředek.
Pokud chcete běžet v režimu s nízkou latencí, musí být celkový počet dostupných slotů úloh větší nebo roven počtu úkolů ve všech fázích dotazu.
Příklady výpočtů slotů
| Typ kanálu | Konfigurace | Požadované sloty |
|---|---|---|
| Jednostupňová bezstavová konfigurace (zdroj Kafka + výstup) |
maxPartitions = 8 |
8 slotů |
| Stavová dvoufázová fáze (zdroj Kafka + shuffle) |
maxPartitions = 8, prohazovací oddíly = 20 |
28 slotů (8 + 20) |
| Třístupňový (zdroj Kafka + shuffle + přerozdělení) |
maxPartitions = 8, dvě fáze mixování po 20 každá |
48 slotů (8 + 20 + 20) |
Pokud nenastavíte maxPartitions, použijte počet partitionů v tématu Kafka.