Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Auf dieser Seite werden die voraussetzungen und die Konfiguration beschrieben, die zum Ausführen von Echtzeitmodusabfragen im strukturierten Streaming erforderlich sind. Ein schrittweises Lernprogramm finden Sie im Lernprogramm: Ausführen einer Echtzeit-Streaming-Workload. Konzeptionelle Informationen zum Echtzeitmodus finden Sie im Echtzeitmodus im strukturierten Streaming.
Voraussetzungen
Um den Echtzeitmodus zu verwenden, müssen Sie Ihre Berechnung so konfigurieren, dass sie die folgenden Anforderungen erfüllt:
- Verwenden Sie den dedizierten Zugriffsmodus auf klassischem Computing. Standardzugriffsmodus, Lakeflow Spark Declarative Pipelines und serverlose Cluster werden nicht unterstützt.
- Verwenden Sie Databricks Runtime 16.4 LTS und höher.
- Deaktivieren Sie die automatische Skalierung.
- Photon deaktivieren.
- Setzen Sie
spark.databricks.streaming.realTimeMode.enabledauftrue. - Schalten Sie Spotinstanzen aus, um Unterbrechungen zu vermeiden.
Anweisungen zum Erstellen und Konfigurieren einer klassischen Compute-Instanz finden Sie in der Referenz zur Computekonfiguration.
Abfragekonfiguration
Um eine Abfrage im Echtzeitmodus auszuführen, müssen Sie den Echtzeittrigger aktivieren. Echtzeittrigger werden nur im Updatemodus unterstützt.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Skalierung von Berechnungen
Sie können einen Echtzeitauftrag pro Rechenressource ausführen, wenn die Rechenressource über genügend Task-Slots verfügt.
Um im Modus mit geringer Latenz ausgeführt zu werden, muss die Gesamtanzahl der verfügbaren Aufgabenplätze größer oder gleich der Anzahl der Aufgaben in allen Abfragephasen sein.
Slot-Berechnungsbeispiele
| Pipelinetyp | Konfiguration | Erforderliche Steckplätze |
|---|---|---|
| Einzelstufenzustandslos (Kafka Source + Sink) |
maxPartitions = 8 |
8 Steckplätze |
| Zweistufiger Zustand (Kafka-Quelle + Shuffle) |
maxPartitions = 8, Shuffle-Partitionen = 20 |
28 Steckplätze (8 + 20) |
| Dreistufige (Kafka-Quelle + *shuffle* + *repartition*) |
maxPartitions = 8, zwei Shuffle-Phasen mit jeweils 20 |
48 Steckplätze (8 + 20 + 20) |
Wenn Sie maxPartitions nicht festlegen, verwenden Sie die Anzahl der Partitionen im Kafka-Thema.