Použijte režim v reálném čase v Lakeflow Spark Declarative Pipelines

Important

Režim v reálném čase v deklarativních kanálech Lakeflow Sparku je ve verzi Public Preview v databricks Runtime 18.1.2 v kanálu Preview.

Režim v reálném čase umožňuje zpracování dat s ultranízkou latencí, přičemž koncová latence může být pouhých pět milisekund. Pro provozní úlohy, které vyžadují okamžitou reakci na streamovaná data, jako je detekce podvodů a přizpůsobení v reálném čase, použijte režim v reálném čase.

Režim v reálném čase je také dostupný přímo v nástroji Structured Streaming i mimo pipeline. Viz režim v reálném čase ve strukturovaném streamování.

Jak režim v reálném čase dosahuje nízké latence

Režim v reálném čase se liší od standardního průběžného zpracování třemi klíčovými způsoby:

  • Dlouhotrvající dávky: Systém zpracovává data, jakmile jsou ve zdroji k dispozici v dlouhotrvajících dávkách (výchozí hodnota je pět minut).
  • Souběžné plánování fází: Všechny fáze dotazu jsou naplánovány současně. Výpočetní prostředek musí mít dostatek dostupných slotů úloh pro souběžné pokrytí všech fází. Viz Určení velikosti výpočetních prostředků.
  • Streamované promíchání: Data se předávají mezi fázemi, jakmile jsou vytvořena, místo aby se čekalo na dokončení předchozí fáze před spuštěním následující fáze.

Interval kontrolního bodu (nakonfigurovaný prostřednictvím pipelines.trigger.interval) určuje, jak často se posuny stavu a zdroje uchovávají v trvalém úložišti. Delší intervaly snižují režii spojenou s vytvářením kontrolních bodů, ale prodlužují dobu zotavení po poruše a zpožďují vykazování metrik. Kratší intervaly zlepšují odolnost, ale zvyšují režii.

Režim v reálném čase a průběžné kanály

Režim reálného času je specializovaný typ kontinuálního spouštěče. Nepřetržitý režim je stále vyžadován — režim reálného času navíc přidává optimalizace latence na úrovni datového toku. Chcete-li použít režim v reálném čase, pipeline musí nejprve běžet v kontinuálním režimu. Režim v reálném čase pak použije další optimalizace na úrovni toku, aby se dosáhlo latence podsekundy nad rámec toho, co poskytuje standardní průběžné zpracování.

Povolení režimu v reálném čase vyžaduje tři kroky konfigurace:

  1. Nastavte kanál na nepřetržitý režim.
  2. Povolte režim v reálném čase na úrovni kanálu.
  3. Definujte tok aktualizace v reálném čase.

Requirements

Requirement Value
Databricks Runtime 18.1.2 v kanálu SDP Preview
Typ výpočetních prostředků Klasické výpočetní prostředí nebo bezserverové prostředí

Konfigurace režimu v reálném čase

Krok 1: Nastavte pipeline do kontinuálního režimu

V nastavení kanálu nastavte režim kanálu na Průběžný nebo ho nastavte ve formátu JSON kanálu:

{
  "continuous": true
}

Krok 2: Povolení režimu v reálném čase na úrovni kanálu

V nastavení kanálu přidejte do konfigurace Sparku následující klíč v části Pokročilá > konfigurace Sparku:

spark.databricks.streaming.realTimeMode.enabled = true

Můžete to také nastavit v JSON kanálu:

{
  "continuous": true,
  "spark_conf": {
    "spark.databricks.streaming.realTimeMode.enabled": "true"
  }
}

Krok 3: Definování toku aktualizace v reálném čase

Režim v reálném čase vyžaduje tok aktualizace. Pomocí dp.create_sink() definujte výstupní cíl a poté použijte dekorátor @dp.update_flow s parametrem pipelines.trigger nastaveným na "RealTime" a s parametrem target odkazujícím na jímku.

from pyspark import pipelines as dp

# Define the output sink
dp.create_sink(
    "my_kafka_sink",
    "kafka",
    {
        "kafka.bootstrap.servers": "<bootstrap-servers>",
        "topic": "<output-topic>",
    }
)

# Define the real-time update flow targeting the sink
@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",  # optional; defaults to 5 minutes
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "<bootstrap-servers>")
            .option("subscribe", "<input-topic>")
            .load()
    )

Parametry konfigurace na úrovni toku:

Parameter Povinné Výchozí Description
pipelines.trigger Ano Nastavte na "RealTime", chcete-li pro tento tok povolit režim v reálném čase.
pipelines.trigger.interval Ne "5 minutes" Interval kontrolního bodu Určuje, jak často se stav a offsety ukládají. Kratší hodnoty zlepšují obnovitelnost; delší hodnoty snižují režijní náklady.

Příklady kódu

z Kafka do Kafka

Čtení z tématu Kafka a zápis do výstupního cíle Kafka:

from pyspark import pipelines as dp

dp.create_sink("kafka_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="kafka_rtm_flow",
    target="kafka_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def kafka_rtm_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .option("startingOffsets", "latest")
            .load()
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    )

Obohaťte pomocí broadcast spojení

Připojení datového proudu Kafka ke statické vyhledávací tabulce Podporovány jsou pouze operace JOIN typu broadcast (stream-to-static). Spojení stream-to-stream nejsou v režimu v reálném čase podporována.

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr

dp.create_sink("enriched_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": enriched_output_topic,
})

@dp.update_flow(
    name="enriched_events_flow",
    target="enriched_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def enriched_events():
    lookup = spark.read.table("catalog.schema.lookup_table")
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .withColumn("event_key", expr("CAST(value AS STRING)"))
            .join(broadcast(lookup), expr("event_key = lookup_key"))
            .select("event_key", "lookup_value", "timestamp")
    )

Agregace

Počítejte události podle klíče pomocí stavového groupBy. Nastavit spark.sql.shuffle.partitions tak, aby odpovídal počtu vstupních oddílů pro stavové operace:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
        "spark.sql.shuffle.partitions": "8",
    }
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
            .groupBy(col("event_type"))
            .count()
    )

Podporované zdroje a jímky

Connector Jako zdroj Jako jímka Poznámky
Apache Kafka
AWS MSK Používá rozhraní kompatibilní se systémem Kafka.
Azure Event Hubs (konektor Kafka) Používá rozhraní kompatibilní se systémem Kafka.
Amazon Kinesis Nepodporováno Používá se pouze pro režim EFO (enhanced Fan-Out).
Delta Nepodporováno Nepodporováno

Určení velikosti výpočetního výkonu

Pokud má výpočetní prostředek dostatek slotů úloh, můžete na něm spustit jednu pipeline v reálném čase. Dostupné sloty úkolů musí zahrnovat všechny úkoly ve všech fázích dotazu.

Typ kanálu Configuration Požadovaný počet slotů pro úlohy
Jednostupňová bezstavová konfigurace (zdroj Kafka + výstup) maxPartitions = 8 8
Stavová dvoufázová fáze (zdroj Kafka + shuffle) maxPartitions = 8, prohazovací oddíly = 20 28 (8 + 20)
Třífázová (zdroj Kafka + dva náhodné prohazy) maxPartitions = 8, dvě fáze mixování po 20 každá 48 (8 + 20 + 20)

Pokud nenastavíte maxPartitions, použijte počet partitionů v tématu Kafka.

Podpora operátorů

Kategorie Operator Podporováno
Bezstavové Výběr, promítání
UDFs Scala UDF – (s omezeními)
UDFs Uživatelsky definovaná funkce v Pythonu – (s omezeními)
Agregace součet, počet, maximum, minimum, průměr
Windowing Kutálení, posouvání
Windowing Session Nepodporováno
Deduplication dropDuplicates – (nevázaný stav)
Deduplication dropDuplicatesWithinWatermark Nepodporováno
Joins Spojení tabulky všesměrové vysílání
Joins Připojení stream-to-stream Nepodporováno
Custom transformWithState – (s rozdíly chování)
Custom union – (s omezeními)
Custom forEach Nepodporováno
Custom flatMapGroupsWithState Nepodporováno
Custom mapPartitions Nepodporováno
Custom forEachBatch Nepodporováno

transformWithState v režimu v reálném čase

transformWithState je podporován v režimu v reálném čase s následujícími rozdíly od mikrodávkového zpracování:

  • handleInputRows se vyvolá jednou pro každý řádek, nikoli jednou pro každý klíč v dávce. inputRows iterátor vrací při každém vyvolání jednu hodnotu.
  • Časovače času událostí nejsou podporovány. Časovače času zpracování se spustí, když se dlouhotrvající dávka ukončí, pokud nedorazila žádná data.
  • transformWithStateInPandas se nepodporuje.

Uživatelsky definované funkce Pandas v reálném čase

Chcete-li minimalizovat latenci u rozhraní UDF pandas, nastavte spark.sql.execution.arrow.maxRecordsPerBatch na 1. To optimalizuje latenci na úkor propustnosti. Pokud je propustnost také důležitá, nastavte tuto hodnotu na 100 nebo vyšší.

Monitorování výkonu režimu v reálném čase

Režim v reálném čase zveřejňuje metriky latence v StreamingQueryProgress rámci latencies pole. K těmto metrikám můžete přistupovat prostřednictvím StreamingQueryListener nebo kontrolou vlastnosti lastProgress u streamovacího dotazu.

Metrika Description
processingLatencyMs Doba mezi načtením záznamu tokem a jejím úplným zpracováním
sourceQueuingLatencyMs Doba mezi okamžikem, kdy je záznam úspěšně zapsán do sběrnice zpráv (například čas přidání do logu v systému Kafka), a okamžikem, kdy jej datový tok poprvé přečte
e2eLatencyMs Celková celková latence od okamžiku, kdy se záznam vytvoří ve zdroji až po jeho úplné zpracování tokem

Každá metrika se hlásí jako percentily p50, p90, p95 a p99.

Omezení

Doporučuje se jeden tok v reálném čase na jednu pipeline. Je povoleno více toků, ale soupeření o sloty úloh mezi toky zvyšuje latenci.

Úplný seznam omezení operátorů a zdrojů najdete v tématu Omezení režimu v reálném čase.

Dodatečné zdroje