Verwenden des Echtzeitmodus in Lakeflow Spark Declarative Pipelines

Important

Der Echtzeitmodus in Lakeflow Spark Declarative Pipelines befindet sich in der öffentlichen Vorschau auf Databricks Runtime 18.1.2 im Vorschaukanal.

Der Echtzeitmodus ermöglicht die Ultra-Low-Latenz-Datenverarbeitung mit End-to-End-Latenz so niedrig wie fünf Millisekunden. Verwenden Sie den Echtzeitmodus für Betriebsarbeitslasten, die sofortige Reaktion auf Streamingdaten erfordern, z. B. Betrugserkennung und Echtzeitpersonalisierung.

Der Echtzeitmodus ist auch direkt in Structured Streaming außerhalb von Pipelines verfügbar. Sehen Sie sich den Echtzeitmodus im strukturierten Streaming an.

So erzielt der Echtzeitmodus eine niedrige Latenz

Der Echtzeitmodus unterscheidet sich von der standardmäßigen kontinuierlichen Verarbeitung auf drei wichtige Arten:

  • Lange laufende Batches: Das System verarbeitet Daten, sobald es in der Quelle innerhalb langer Batches verfügbar wird (Standardwert ist fünf Minuten).
  • Gleichzeitige Phasenplanung: Alle Abfragephasen werden gleichzeitig geplant. Die Computeressource muss über genügend verfügbare Vorgangsplätze verfügen, um alle Phasen gleichzeitig abzudecken. Siehe Dimensionierung der Rechenleistung.
  • Streaming-Shuffle: Daten werden zwischen den Phasen weitergegeben, sobald sie erzeugt werden, anstatt zu warten, bis eine vorgelagerte Phase abgeschlossen ist, bevor die nachgelagerte Phase beginnt.

Das Checkpoint-Intervall (konfiguriert über pipelines.trigger.interval) legt fest, wie häufig Zustände und Quell-Offsets in einem dauerhaften Speicher persistiert werden. Längere Intervalle verringern den Checkpointing-Overhead, erhöhen aber die Wiederherstellungszeit nach einem Ausfall und verzögern die Metrikberichterstattung. Kürzere Intervalle verbessern die Haltbarkeit, fügen aber mehr Aufwand hinzu.

Echtzeitmodus und fortlaufende Pipelines

Der Echtzeitmodus ist eine spezielle Art von fortlaufendem Trigger. Der kontinuierliche Modus ist weiterhin erforderlich – der Echtzeitmodus ergänzt ihn um Latenzoptimierungen auf Flussebene. Um den Echtzeitmodus zu verwenden, muss die Pipeline zuerst im fortlaufenden Modus ausgeführt werden. Der Echtzeitmodus wendet dann zusätzliche Optimierungen auf Flussebene an, um eine Sub-Second-Latenz zu erzielen, die über die standardmäßige fortlaufende Verarbeitung hinausgeht.

Für das Aktivieren des Echtzeitmodus sind drei Konfigurationsschritte erforderlich:

  1. Legen Sie die Pipeline auf den fortlaufenden Modus fest.
  2. Aktivieren Sie den Echtzeitmodus auf Pipelineebene.
  3. Definieren Sie einen Echtzeitaktualisierungsfluss.

Requirements

Requirement Value
Databricks-Laufzeit 18.1.2 im SDP-Vorschaukanal
Computetyp Klassische Compute- oder Serverlose

Konfigurieren des Echtzeitmodus

Schritt 1: Festlegen der Pipeline auf den fortlaufenden Modus

Legen Sie in den Pipelineeinstellungen den Pipelinemodus auf "Fortlaufend" fest, oder legen Sie ihn in der Pipeline-JSON fest:

{
  "continuous": true
}

Schritt 2: Aktivieren des Echtzeitmodus auf Pipelineebene

Fügen Sie in den Pipelineeinstellungen den folgenden Schlüssel zur Spark-Konfiguration unter "Erweiterte > Spark-Konfiguration" hinzu:

spark.databricks.streaming.realTimeMode.enabled = true

Sie können dies auch in der Pipeline-JSON festlegen:

{
  "continuous": true,
  "spark_conf": {
    "spark.databricks.streaming.realTimeMode.enabled": "true"
  }
}

Schritt 3: Definieren eines Echtzeitaktualisierungsflusses

Der Echtzeitmodus erfordert einen Aktualisierungsfluss. Verwenden Sie dp.create_sink(), um das Ausgabeziel zu definieren, und verwenden Sie dann den @dp.update_flow-Decorator, wobei pipelines.trigger auf "RealTime" gesetzt ist und target auf den Sink verweist.

from pyspark import pipelines as dp

# Define the output sink
dp.create_sink(
    "my_kafka_sink",
    "kafka",
    {
        "kafka.bootstrap.servers": "<bootstrap-servers>",
        "topic": "<output-topic>",
    }
)

# Define the real-time update flow targeting the sink
@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",  # optional; defaults to 5 minutes
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "<bootstrap-servers>")
            .option("subscribe", "<input-topic>")
            .load()
    )

Konfigurationsparameter auf Flussebene:

Parameter Erforderlich Vorgabe Description
pipelines.trigger Yes Auf "RealTime" festlegen, um den Echtzeitmodus für diesen Ablauf zu aktivieren.
pipelines.trigger.interval No "5 minutes" Prüfpunktintervall. Steuert, wie oft Zustand und Offsets gespeichert werden. Kürzere Werte verbessern die Wiederherstellbarkeit; längere Werte reduzieren den Mehraufwand.

Code-Beispiele

Kafka-zu-Kafka

Aus einem Kafka-Topic lesen und in ein Kafka-Ausgabeziel schreiben:

from pyspark import pipelines as dp

dp.create_sink("kafka_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="kafka_rtm_flow",
    target="kafka_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def kafka_rtm_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .option("startingOffsets", "latest")
            .load()
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    )

Anreichern mit einem Broadcast-Join

Verbinden eines Kafka-Datenstroms mit einer statischen Nachschlagetabelle Es werden nur Broadcast-Joins (stream-to-static) unterstützt. Stream-zu-Stream-Verknüpfungen werden im Echtzeitmodus nicht unterstützt.

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr

dp.create_sink("enriched_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": enriched_output_topic,
})

@dp.update_flow(
    name="enriched_events_flow",
    target="enriched_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def enriched_events():
    lookup = spark.read.table("catalog.schema.lookup_table")
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .withColumn("event_key", expr("CAST(value AS STRING)"))
            .join(broadcast(lookup), expr("event_key = lookup_key"))
            .select("event_key", "lookup_value", "timestamp")
    )

Aggregation

Zählen von Ereignissen mithilfe eines zustandsbehafteten Schlüssels groupBy. Stellen Sie spark.sql.shuffle.partitions so ein, dass es der Anzahl der Eingabepartitionen für zustandsbehaftete Operationen entspricht:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
        "spark.sql.shuffle.partitions": "8",
    }
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
            .groupBy(col("event_type"))
            .count()
    )

Unterstützte Quellen und Empfänger

Konnektor Als Quelle Als Spüle Hinweise
Apache Kafka
AWS MSK Verwendet die kafka-kompatible Schnittstelle.
Azure Event Hubs (Kafka Connector) Verwendet die kafka-kompatible Schnittstelle.
Amazon Kinesis Nicht unterstützt Nur für den EFO-Modus (Enhanced Fan-Out) verwenden.
Delta Nicht unterstützt Nicht unterstützt

Dimensionierung der Computekapazität

Sie können eine Echtzeitpipeline pro Computeressource ausführen, wenn die Berechnung über genügend Vorgangsplätze verfügt. Verfügbare Aufgabenplätze müssen alle Aufgaben in allen Abfragephasen abdecken.

Pipelinetyp Configuration Erforderliche Aufgabenplätze
Einzelstufenzustandslos (Kafka Source + Sink) maxPartitions = 8 8
Zweistufiger Zustand (Kafka-Quelle + Shuffle) maxPartitions = 8, Shuffle-Partitionen = 20 28 (8 + 20)
Drei Stufen (Kafka-Quelle + zwei Shuffles) maxPartitions = 8, zwei Shuffle-Phasen mit jeweils 20 48 (8 + 20 + 20)

Wenn Sie maxPartitions nicht festlegen, verwenden Sie die Anzahl der Partitionen im Kafka-Thema.

Operatorunterstützung

Kategorie Bediener Unterstützt
Zustandslos Auswahl, Projektion
UDFs Skala UDF (*) (mit Einschränkungen)
UDFs Python-Benutzerdefinierte Funktion (UDF) (*) (mit Einschränkungen)
Aggregation Summe, Anzahl, Max, Min, Durchschnitt
Windowing Stürzen, Gleiten
Windowing Session Nicht unterstützt
Deduplication dropDuplicates ✔ (ungebundener Zustand)
Deduplication dropDuplicatesWithinWatermark Nicht unterstützt
Verknüpfungen Broadcast-Tabellenbeitritt
Verknüpfungen Stream-zu-Stream-Verknüpfung Nicht unterstützt
Custom transformWithState • (mit Verhaltensunterschieden)
Custom union (*) (mit Einschränkungen)
Custom forEach Nicht unterstützt
Custom flatMapGroupsWithState Nicht unterstützt
Custom mapPartitions Nicht unterstützt
Custom forEachBatch Nicht unterstützt

transformWithState im Echtzeitmodus

transformWithState wird im Echtzeitmodus mit den folgenden Unterschieden gegenüber der Mikrobatch-Verarbeitung unterstützt:

  • handleInputRows wird einmal pro Zeile und nicht einmal pro Schlüssel pro Batch aufgerufen. Der inputRows Iterator liefert einen einzelnen Wert pro Aufruf.
  • Ereigniszeitgeber werden nicht unterstützt. Timer für die Verarbeitungszeit werden ausgelöst, wenn ein lange laufender Batch beendet wird, ohne dass Daten angekommen sind.
  • transformWithStateInPandas wird nicht unterstützt.

Pandas UDFs im Echtzeitmodus

Um die Latenz bei Pandas-UDFs zu minimieren, setzen Sie spark.sql.execution.arrow.maxRecordsPerBatch auf 1. Dadurch wird die Latenz auf Kosten des Durchsatzes optimiert. Wenn der Durchsatz ebenfalls wichtig ist, legen Sie diesen Wert auf 100 oder höher fest.

Überwachen der Leistung des Echtzeitmodus

Im Echtzeitmodus werden Latenzmetriken im StreamingQueryProgresslatencies Feld verfügbar gemacht. Greifen Sie auf diese Metriken über eine StreamingQueryListener oder durch Überprüfen der lastProgress-Eigenschaft der Streamingabfrage zu.

Metric Description
processingLatencyMs Zeit zwischen dem Einlesen eines Datensatzes durch den Flow und seiner vollständigen Verarbeitung durch den Flow
sourceQueuingLatencyMs Zeit zwischen dem erfolgreichen Schreiben eines Datensatzes in den Nachrichtenbus (z. B. Protokollanfügezeit in Kafka) und wann er zum ersten Mal vom Fluss gelesen wird
e2eLatencyMs Gesamte End-to-End-Latenz vom Zeitpunkt der Erstellung des Datensatzes an der Quelle bis zu dem Zeitpunkt, an dem er vom Datenfluss vollständig verarbeitet wurde

Jede Metrik wird in Form von p50-, p90-, p95- und p99-Perzentilen angegeben.

Einschränkungen

Ein Echtzeitfluss pro Pipeline wird empfohlen. Mehrere Abläufe sind zulässig, aber Konflikte um Task-Slots zwischen Abläufen erhöhen die Latenz.

Eine vollständige Liste der Operator- und Quellbeschränkungen finden Sie unter Einschränkungen des Echtzeitmodus.

Weitere Ressourcen