Realtimemodus gebruiken in Lakeflow Spark-declaratieve pijplijnen

Important

De real-time-modus in Lakeflow Spark Declarative Pipelines is beschikbaar als Openbare preview in Databricks Runtime 18.1.2 op het preview-kanaal.

De real-timemodus maakt gegevensverwerking met een ultralage latentie mogelijk, met een end-to-end latentie van slechts vijf milliseconden. Gebruik de realtimemodus voor operationele workloads waarvoor directe reactie op streaminggegevens is vereist, zoals fraudedetectie en realtime personalisatie.

De realtimemodus is ook rechtstreeks beschikbaar in Structured Streaming buiten pijplijnen. Bekijk de real-time modus in Structured Streaming.

Hoe de realtime-modus lage latentie bereikt

Realtimemodus verschilt op drie belangrijke manieren van standaard continue verwerking:

  • Langlopende batches: het systeem verwerkt gegevens naarmate deze beschikbaar is in de bron binnen langlopende batches (standaard is vijf minuten).
  • Gelijktijdige planning van fasen: Alle queryfasen worden tegelijkertijd gepland. De rekenbron moet over voldoende beschikbare taakslots beschikken zodat alle fasen gelijktijdig kunnen worden uitgevoerd. Zie Rekengrootte.
  • Streaming shuffle: gegevens worden doorgegeven tussen fasen zodra deze worden geproduceerd, in plaats van te wachten tot een upstream-fase is voltooid voordat de downstreamfase wordt gestart.

Het controlepuntinterval (geconfigureerd via pipelines.trigger.interval) bepaalt hoe vaak status- en bron offsets behouden blijven voor duurzame opslag. Langere intervallen verminderen de overhead van controlepunten, maar verhogen de hersteltijd na een fout- en vertragingsrapportage met metrische gegevens. Kortere intervallen verbeteren de duurzaamheid, maar voegen overhead toe.

Realtimemodus en continue pijplijnen

De real-time-modus is een specifieke vorm van een doorlopende trigger. Continue modus is nog steeds vereist: realtimemodus voegt latentieoptimalisaties op stroomniveau toe. Als u de realtime-modus wilt gebruiken, moet de pijplijn eerst in de continue modus draaien. De realtimemodus past vervolgens extra optimalisaties toe op stroomniveau om een latentie van sub-seconde te bereiken dan wat standaard continue verwerking biedt.

Voor het inschakelen van de realtimemodus zijn drie configuratiestappen vereist:

  1. Stel de pijplijn in op continue modus.
  2. Schakel de realtimemodus in op pijplijnniveau.
  3. Definieer een real-time updateflow.

Requirements

Requirement Value
Databricks Runtime 18.1.2 op het SDP Preview-kanaal
Rekentype Klassiek rekenproces of serverloos

Realtime-modus configureren

Stap 1: De pijplijn instellen op continue modus

Stel in de pijplijninstellingen de pijplijnmodus in op Doorlopend of stel deze in de pijplijn-JSON in:

{
  "continuous": true
}

Stap 2: Realtimemodus inschakelen op pijplijnniveau

Voeg in uw pijplijninstellingen de volgende sleutel toe aan de Spark-configuratie onder Geavanceerde > Spark-configuratie:

spark.databricks.streaming.realTimeMode.enabled = true

U kunt dit ook instellen in de pijplijn-JSON:

{
  "continuous": true,
  "spark_conf": {
    "spark.databricks.streaming.realTimeMode.enabled": "true"
  }
}

Stap 3: Een realtime updatestroom definiëren

Voor de realtimemodus is een updatestroom vereist. Gebruik dp.create_sink() dit om het uitvoerdoel te definiëren en gebruik vervolgens de @dp.update_flow decorator die pipelines.trigger is ingesteld op "RealTime" en target wijst naar de sink.

from pyspark import pipelines as dp

# Define the output sink
dp.create_sink(
    "my_kafka_sink",
    "kafka",
    {
        "kafka.bootstrap.servers": "<bootstrap-servers>",
        "topic": "<output-topic>",
    }
)

# Define the real-time update flow targeting the sink
@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",  # optional; defaults to 5 minutes
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "<bootstrap-servers>")
            .option("subscribe", "<input-topic>")
            .load()
    )

Configuratieparameters op stroomniveau:

Parameter Verplicht Verstek Description
pipelines.trigger Yes Stel in op "RealTime" om de real-time-modus voor deze flow in te schakelen.
pipelines.trigger.interval No "5 minutes" Controlepuntinterval. Hiermee wordt bepaald hoe vaak status en offsets worden weggeschreven. Kortere waarden verbeteren de herstelbaarheid; langere waarden verminderen de overhead.

Codevoorbeelden

Kafka naar Kafka

Lees vanuit een Kafka-onderwerp en schrijf naar een Kafka-uitvoerdoel:

from pyspark import pipelines as dp

dp.create_sink("kafka_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="kafka_rtm_flow",
    target="kafka_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def kafka_rtm_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .option("startingOffsets", "latest")
            .load()
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    )

Verrijken met een uitzendingsdeelname

Voeg een Kafka-stream toe aan een statische opzoektabel. Alleen broadcast-samenvoegingen (stream-to-static) worden ondersteund. Stream-to-stream-joins worden niet ondersteund in de realtime-modus.

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr

dp.create_sink("enriched_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": enriched_output_topic,
})

@dp.update_flow(
    name="enriched_events_flow",
    target="enriched_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def enriched_events():
    lookup = spark.read.table("catalog.schema.lookup_table")
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .withColumn("event_key", expr("CAST(value AS STRING)"))
            .join(broadcast(lookup), expr("event_key = lookup_key"))
            .select("event_key", "lookup_value", "timestamp")
    )

Aggregatie

Gebeurtenissen per sleutel tellen met behulp van een groupBy met status. Stel spark.sql.shuffle.partitions in op hetzelfde aantal als het aantal invoerpartities voor stateful bewerkingen:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
        "spark.sql.shuffle.partitions": "8",
    }
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
            .groupBy(col("event_type"))
            .count()
    )

Ondersteunde bronnen en sinks

Connector Als bron Als gegevenssink Opmerkingen
Apache Kafka
AWS MSK Maakt gebruik van de kafka-compatibele interface.
Azure Event Hubs (Kafka connector) Maakt gebruik van de kafka-compatibele interface.
Amazon Kinesis Niet ondersteund Alleen gebruiken voor de EFO-modus (Enhanced Fan-Out).
Delta Niet ondersteund Niet ondersteund

Grootte berekenen

U kunt één realtime pijplijn per compute-resource uitvoeren als de compute-resource voldoende taakslots heeft. Beschikbare taakslots moeten alle taken in alle fasen van de query dekken.

Pijplijntype Configuration Vereiste taaksloten
Staatloos met één fase (Kafka-bron + sink) maxPartitions = 8 8
Twee-fasen toestandsafhankelijk systeem (Kafka-bron + shuffle) maxPartitions = 8, partities verdelen = 20 28 (8 + 20)
Drie fasen (Kafka-bron + twee willekeurige volgordes) maxPartitions = 8, twee willekeurige fasen van elk 20 48 (8 + 20 + 20)

Als u maxPartitions niet instelt, gebruikt u het aantal partities van het Kafka-topic.

Operatorondersteuning

Categorie Operator Supported
Staatloos: Selectie, projectie
UDFs Scala UDF ✓ (met beperkingen)
UDFs Python UDF ✓ (met beperkingen)
Aggregatie som, aantal, max, min, gem.
Windowing Tumbling, glijdend
Windowing Session Niet ondersteund
Ontdubbeling dropDuplicates ✓ (niet-gebonden status)
Ontdubbeling dropDuplicatesWithinWatermark Niet ondersteund
Joins Tabeldeelname uitzenden
Joins Samenvoeging van stroom met stroom Niet ondersteund
Custom transformWithState ✓ (met gedragsverschillen)
Custom union ✓ (met beperkingen)
Custom forEach Niet ondersteund
Custom flatMapGroupsWithState Niet ondersteund
Custom mapPartitions Niet ondersteund
Custom forEachBatch Niet ondersteund

transformWithState in realtimemodus

transformWithState wordt ondersteund in realtime-modus met de volgende verschillen van microbatchverwerking:

  • handleInputRows wordt eenmaal per rij aangeroepen in plaats van één keer per sleutel per batch. De inputRows iterator resulteert in één waarde per aanroep.
  • Event-timetimers worden niet ondersteund. Verwerkingstijdtimer wordt geactiveerd wanneer een langlopende batch wordt beëindigd als er geen gegevens zijn aangekomen.
  • transformWithStateInPandas wordt niet ondersteund.

Pandas UDF's in realtimemodus

Als u de latentie met pandas UDF's wilt minimaliseren, stelt u deze in op spark.sql.execution.arrow.maxRecordsPerBatch1. Hierbij wordt de latentie geoptimaliseerd ten koste van de doorvoer. Als doorvoer ook belangrijk is, stelt u deze waarde in op 100 of hoger.

Prestaties van realtimemodus bewaken

In de realtime-modus worden latentiemetrieken in StreamingQueryProgress onder het veld latencies weergegeven. Benader deze statistieken via een StreamingQueryListener of door de eigenschap lastProgress van de streaming-query te inspecteren.

Metrische gegevens Description
processingLatencyMs Tijd tussen het moment waarop een record wordt gelezen door de stroom en wanneer deze volledig wordt verwerkt door de stroom
sourceQueuingLatencyMs Tijd tussen het moment waarop een record succesvol naar de berichtenbus is geschreven (bijvoorbeeld de logtoevoegtijd in Kafka) en het moment waarop het voor het eerst door de gegevensstroom wordt gelezen
e2eLatencyMs Totale end-to-end-latentie vanaf het moment dat het record bij de bron wordt aangemaakt tot het moment dat het volledig door de gegevensstroom is verwerkt

Elke metrische waarde wordt gerapporteerd als p50, p90, p95 en p99 percentielen.

Limitations

Eén realtimegegevensstroom per pipeline wordt aanbevolen. Er zijn meerdere stromen toegestaan, maar taaksiteconflicten tussen stromen verhogen de latentie.

Zie Realtime modusbeperkingen voor een volledige lijst met operator- en bronbeperkingen.

Aanvullende bronnen