Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Important
Der Echtzeitmodus in Lakeflow Spark Declarative Pipelines befindet sich in der öffentlichen Vorschau auf Databricks Runtime 18.1.2 im Vorschaukanal.
Der Echtzeitmodus ermöglicht die Ultra-Low-Latenz-Datenverarbeitung mit End-to-End-Latenz so niedrig wie fünf Millisekunden. Verwenden Sie den Echtzeitmodus für Betriebsarbeitslasten, die sofortige Reaktion auf Streamingdaten erfordern, z. B. Betrugserkennung und Echtzeitpersonalisierung.
Der Echtzeitmodus ist auch direkt in Structured Streaming außerhalb von Pipelines verfügbar. Sehen Sie sich den Echtzeitmodus im strukturierten Streaming an.
So erzielt der Echtzeitmodus eine niedrige Latenz
Der Echtzeitmodus unterscheidet sich von der standardmäßigen kontinuierlichen Verarbeitung auf drei wichtige Arten:
- Lange laufende Batches: Das System verarbeitet Daten, sobald es in der Quelle innerhalb langer Batches verfügbar wird (Standardwert ist fünf Minuten).
- Gleichzeitige Phasenplanung: Alle Abfragephasen werden gleichzeitig geplant. Die Computeressource muss über genügend verfügbare Vorgangsplätze verfügen, um alle Phasen gleichzeitig abzudecken. Siehe Dimensionierung der Rechenleistung.
- Streaming-Shuffle: Daten werden zwischen den Phasen weitergegeben, sobald sie erzeugt werden, anstatt zu warten, bis eine vorgelagerte Phase abgeschlossen ist, bevor die nachgelagerte Phase beginnt.
Das Checkpoint-Intervall (konfiguriert über pipelines.trigger.interval) legt fest, wie häufig Zustände und Quell-Offsets in einem dauerhaften Speicher persistiert werden. Längere Intervalle verringern den Checkpointing-Overhead, erhöhen aber die Wiederherstellungszeit nach einem Ausfall und verzögern die Metrikberichterstattung. Kürzere Intervalle verbessern die Haltbarkeit, fügen aber mehr Aufwand hinzu.
Echtzeitmodus und fortlaufende Pipelines
Der Echtzeitmodus ist eine spezielle Art von fortlaufendem Trigger. Der kontinuierliche Modus ist weiterhin erforderlich – der Echtzeitmodus ergänzt ihn um Latenzoptimierungen auf Flussebene. Um den Echtzeitmodus zu verwenden, muss die Pipeline zuerst im fortlaufenden Modus ausgeführt werden. Der Echtzeitmodus wendet dann zusätzliche Optimierungen auf Flussebene an, um eine Sub-Second-Latenz zu erzielen, die über die standardmäßige fortlaufende Verarbeitung hinausgeht.
Für das Aktivieren des Echtzeitmodus sind drei Konfigurationsschritte erforderlich:
- Legen Sie die Pipeline auf den fortlaufenden Modus fest.
- Aktivieren Sie den Echtzeitmodus auf Pipelineebene.
- Definieren Sie einen Echtzeitaktualisierungsfluss.
Requirements
| Requirement | Value |
|---|---|
| Databricks-Laufzeit | 18.1.2 im SDP-Vorschaukanal |
| Computetyp | Klassische Compute- oder Serverlose |
Konfigurieren des Echtzeitmodus
Schritt 1: Festlegen der Pipeline auf den fortlaufenden Modus
Legen Sie in den Pipelineeinstellungen den Pipelinemodus auf "Fortlaufend" fest, oder legen Sie ihn in der Pipeline-JSON fest:
{
"continuous": true
}
Schritt 2: Aktivieren des Echtzeitmodus auf Pipelineebene
Fügen Sie in den Pipelineeinstellungen den folgenden Schlüssel zur Spark-Konfiguration unter "Erweiterte > Spark-Konfiguration" hinzu:
spark.databricks.streaming.realTimeMode.enabled = true
Sie können dies auch in der Pipeline-JSON festlegen:
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}
Schritt 3: Definieren eines Echtzeitaktualisierungsflusses
Der Echtzeitmodus erfordert einen Aktualisierungsfluss. Verwenden Sie dp.create_sink(), um das Ausgabeziel zu definieren, und verwenden Sie dann den @dp.update_flow-Decorator, wobei pipelines.trigger auf "RealTime" gesetzt ist und target auf den Sink verweist.
from pyspark import pipelines as dp
# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)
# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes", # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)
Konfigurationsparameter auf Flussebene:
| Parameter | Erforderlich | Vorgabe | Description |
|---|---|---|---|
pipelines.trigger |
Yes | — | Auf "RealTime" festlegen, um den Echtzeitmodus für diesen Ablauf zu aktivieren. |
pipelines.trigger.interval |
No | "5 minutes" |
Prüfpunktintervall. Steuert, wie oft Zustand und Offsets gespeichert werden. Kürzere Werte verbessern die Wiederherstellbarkeit; längere Werte reduzieren den Mehraufwand. |
Code-Beispiele
Kafka-zu-Kafka
Aus einem Kafka-Topic lesen und in ein Kafka-Ausgabeziel schreiben:
from pyspark import pipelines as dp
dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)
Anreichern mit einem Broadcast-Join
Verbinden eines Kafka-Datenstroms mit einer statischen Nachschlagetabelle Es werden nur Broadcast-Joins (stream-to-static) unterstützt. Stream-zu-Stream-Verknüpfungen werden im Echtzeitmodus nicht unterstützt.
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr
dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})
@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)
Aggregation
Zählen von Ereignissen mithilfe eines zustandsbehafteten Schlüssels groupBy. Stellen Sie spark.sql.shuffle.partitions so ein, dass es der Anzahl der Eingabepartitionen für zustandsbehaftete Operationen entspricht:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
"spark.sql.shuffle.partitions": "8",
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)
Unterstützte Quellen und Empfänger
| Konnektor | Als Quelle | Als Spüle | Hinweise |
|---|---|---|---|
| Apache Kafka | ✓ | ✓ | — |
| AWS MSK | ✓ | ✓ | Verwendet die kafka-kompatible Schnittstelle. |
| Azure Event Hubs (Kafka Connector) | ✓ | ✓ | Verwendet die kafka-kompatible Schnittstelle. |
| Amazon Kinesis | ✓ | Nicht unterstützt | Nur für den EFO-Modus (Enhanced Fan-Out) verwenden. |
| Delta | Nicht unterstützt | Nicht unterstützt | — |
Dimensionierung der Computekapazität
Sie können eine Echtzeitpipeline pro Computeressource ausführen, wenn die Berechnung über genügend Vorgangsplätze verfügt. Verfügbare Aufgabenplätze müssen alle Aufgaben in allen Abfragephasen abdecken.
| Pipelinetyp | Configuration | Erforderliche Aufgabenplätze |
|---|---|---|
| Einzelstufenzustandslos (Kafka Source + Sink) |
maxPartitions = 8 |
8 |
| Zweistufiger Zustand (Kafka-Quelle + Shuffle) |
maxPartitions = 8, Shuffle-Partitionen = 20 |
28 (8 + 20) |
| Drei Stufen (Kafka-Quelle + zwei Shuffles) |
maxPartitions = 8, zwei Shuffle-Phasen mit jeweils 20 |
48 (8 + 20 + 20) |
Wenn Sie maxPartitions nicht festlegen, verwenden Sie die Anzahl der Partitionen im Kafka-Thema.
Operatorunterstützung
| Kategorie | Bediener | Unterstützt |
|---|---|---|
| Zustandslos | Auswahl, Projektion | ✓ |
| UDFs | Skala UDF | (*) (mit Einschränkungen) |
| UDFs | Python-Benutzerdefinierte Funktion (UDF) | (*) (mit Einschränkungen) |
| Aggregation | Summe, Anzahl, Max, Min, Durchschnitt | ✓ |
| Windowing | Stürzen, Gleiten | ✓ |
| Windowing | Session | Nicht unterstützt |
| Deduplication | dropDuplicates |
✔ (ungebundener Zustand) |
| Deduplication | dropDuplicatesWithinWatermark |
Nicht unterstützt |
| Verknüpfungen | Broadcast-Tabellenbeitritt | ✓ |
| Verknüpfungen | Stream-zu-Stream-Verknüpfung | Nicht unterstützt |
| Custom | transformWithState |
• (mit Verhaltensunterschieden) |
| Custom | union |
(*) (mit Einschränkungen) |
| Custom | forEach |
Nicht unterstützt |
| Custom | flatMapGroupsWithState |
Nicht unterstützt |
| Custom | mapPartitions |
Nicht unterstützt |
| Custom | forEachBatch |
Nicht unterstützt |
transformWithState im Echtzeitmodus
transformWithState wird im Echtzeitmodus mit den folgenden Unterschieden gegenüber der Mikrobatch-Verarbeitung unterstützt:
-
handleInputRowswird einmal pro Zeile und nicht einmal pro Schlüssel pro Batch aufgerufen. DerinputRowsIterator liefert einen einzelnen Wert pro Aufruf. - Ereigniszeitgeber werden nicht unterstützt. Timer für die Verarbeitungszeit werden ausgelöst, wenn ein lange laufender Batch beendet wird, ohne dass Daten angekommen sind.
-
transformWithStateInPandaswird nicht unterstützt.
Pandas UDFs im Echtzeitmodus
Um die Latenz bei Pandas-UDFs zu minimieren, setzen Sie spark.sql.execution.arrow.maxRecordsPerBatch auf 1. Dadurch wird die Latenz auf Kosten des Durchsatzes optimiert. Wenn der Durchsatz ebenfalls wichtig ist, legen Sie diesen Wert auf 100 oder höher fest.
Überwachen der Leistung des Echtzeitmodus
Im Echtzeitmodus werden Latenzmetriken im StreamingQueryProgresslatencies Feld verfügbar gemacht. Greifen Sie auf diese Metriken über eine StreamingQueryListener oder durch Überprüfen der lastProgress-Eigenschaft der Streamingabfrage zu.
| Metric | Description |
|---|---|
processingLatencyMs |
Zeit zwischen dem Einlesen eines Datensatzes durch den Flow und seiner vollständigen Verarbeitung durch den Flow |
sourceQueuingLatencyMs |
Zeit zwischen dem erfolgreichen Schreiben eines Datensatzes in den Nachrichtenbus (z. B. Protokollanfügezeit in Kafka) und wann er zum ersten Mal vom Fluss gelesen wird |
e2eLatencyMs |
Gesamte End-to-End-Latenz vom Zeitpunkt der Erstellung des Datensatzes an der Quelle bis zu dem Zeitpunkt, an dem er vom Datenfluss vollständig verarbeitet wurde |
Jede Metrik wird in Form von p50-, p90-, p95- und p99-Perzentilen angegeben.
Einschränkungen
Ein Echtzeitfluss pro Pipeline wird empfohlen. Mehrere Abläufe sind zulässig, aber Konflikte um Task-Slots zwischen Abläufen erhöhen die Latenz.
Eine vollständige Liste der Operator- und Quellbeschränkungen finden Sie unter Einschränkungen des Echtzeitmodus.