Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Important
Režim v reálném čase v deklarativních kanálech Lakeflow Sparku je ve verzi Public Preview v databricks Runtime 18.1.2 v kanálu Preview.
Režim v reálném čase umožňuje zpracování dat s ultranízkou latencí, přičemž koncová latence může být pouhých pět milisekund. Pro provozní úlohy, které vyžadují okamžitou reakci na streamovaná data, jako je detekce podvodů a přizpůsobení v reálném čase, použijte režim v reálném čase.
Režim v reálném čase je také dostupný přímo v nástroji Structured Streaming i mimo pipeline. Viz režim v reálném čase ve strukturovaném streamování.
Jak režim v reálném čase dosahuje nízké latence
Režim v reálném čase se liší od standardního průběžného zpracování třemi klíčovými způsoby:
- Dlouhotrvající dávky: Systém zpracovává data, jakmile jsou ve zdroji k dispozici v dlouhotrvajících dávkách (výchozí hodnota je pět minut).
- Souběžné plánování fází: Všechny fáze dotazu jsou naplánovány současně. Výpočetní prostředek musí mít dostatek dostupných slotů úloh pro souběžné pokrytí všech fází. Viz Určení velikosti výpočetních prostředků.
- Streamované promíchání: Data se předávají mezi fázemi, jakmile jsou vytvořena, místo aby se čekalo na dokončení předchozí fáze před spuštěním následující fáze.
Interval kontrolního bodu (nakonfigurovaný prostřednictvím pipelines.trigger.interval) určuje, jak často se posuny stavu a zdroje uchovávají v trvalém úložišti. Delší intervaly snižují režii spojenou s vytvářením kontrolních bodů, ale prodlužují dobu zotavení po poruše a zpožďují vykazování metrik. Kratší intervaly zlepšují odolnost, ale zvyšují režii.
Režim v reálném čase a průběžné kanály
Režim reálného času je specializovaný typ kontinuálního spouštěče. Nepřetržitý režim je stále vyžadován — režim reálného času navíc přidává optimalizace latence na úrovni datového toku. Chcete-li použít režim v reálném čase, pipeline musí nejprve běžet v kontinuálním režimu. Režim v reálném čase pak použije další optimalizace na úrovni toku, aby se dosáhlo latence podsekundy nad rámec toho, co poskytuje standardní průběžné zpracování.
Povolení režimu v reálném čase vyžaduje tři kroky konfigurace:
- Nastavte kanál na nepřetržitý režim.
- Povolte režim v reálném čase na úrovni kanálu.
- Definujte tok aktualizace v reálném čase.
Requirements
| Requirement | Value |
|---|---|
| Databricks Runtime | 18.1.2 v kanálu SDP Preview |
| Typ výpočetních prostředků | Klasické výpočetní prostředí nebo bezserverové prostředí |
Konfigurace režimu v reálném čase
Krok 1: Nastavte pipeline do kontinuálního režimu
V nastavení kanálu nastavte režim kanálu na Průběžný nebo ho nastavte ve formátu JSON kanálu:
{
"continuous": true
}
Krok 2: Povolení režimu v reálném čase na úrovni kanálu
V nastavení kanálu přidejte do konfigurace Sparku následující klíč v části Pokročilá > konfigurace Sparku:
spark.databricks.streaming.realTimeMode.enabled = true
Můžete to také nastavit v JSON kanálu:
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}
Krok 3: Definování toku aktualizace v reálném čase
Režim v reálném čase vyžaduje tok aktualizace. Pomocí dp.create_sink() definujte výstupní cíl a poté použijte dekorátor @dp.update_flow s parametrem pipelines.trigger nastaveným na "RealTime" a s parametrem target odkazujícím na jímku.
from pyspark import pipelines as dp
# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)
# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes", # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)
Parametry konfigurace na úrovni toku:
| Parameter | Povinné | Výchozí | Description |
|---|---|---|---|
pipelines.trigger |
Ano | — | Nastavte na "RealTime", chcete-li pro tento tok povolit režim v reálném čase. |
pipelines.trigger.interval |
Ne | "5 minutes" |
Interval kontrolního bodu Určuje, jak často se stav a offsety ukládají. Kratší hodnoty zlepšují obnovitelnost; delší hodnoty snižují režijní náklady. |
Příklady kódu
z Kafka do Kafka
Čtení z tématu Kafka a zápis do výstupního cíle Kafka:
from pyspark import pipelines as dp
dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)
Obohaťte pomocí broadcast spojení
Připojení datového proudu Kafka ke statické vyhledávací tabulce Podporovány jsou pouze operace JOIN typu broadcast (stream-to-static). Spojení stream-to-stream nejsou v režimu v reálném čase podporována.
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr
dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})
@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)
Agregace
Počítejte události podle klíče pomocí stavového groupBy. Nastavit spark.sql.shuffle.partitions tak, aby odpovídal počtu vstupních oddílů pro stavové operace:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
"spark.sql.shuffle.partitions": "8",
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)
Podporované zdroje a jímky
| Connector | Jako zdroj | Jako jímka | Poznámky |
|---|---|---|---|
| Apache Kafka | ✓ | ✓ | — |
| AWS MSK | ✓ | ✓ | Používá rozhraní kompatibilní se systémem Kafka. |
| Azure Event Hubs (konektor Kafka) | ✓ | ✓ | Používá rozhraní kompatibilní se systémem Kafka. |
| Amazon Kinesis | ✓ | Nepodporováno | Používá se pouze pro režim EFO (enhanced Fan-Out). |
| Delta | Nepodporováno | Nepodporováno | — |
Určení velikosti výpočetního výkonu
Pokud má výpočetní prostředek dostatek slotů úloh, můžete na něm spustit jednu pipeline v reálném čase. Dostupné sloty úkolů musí zahrnovat všechny úkoly ve všech fázích dotazu.
| Typ kanálu | Configuration | Požadovaný počet slotů pro úlohy |
|---|---|---|
| Jednostupňová bezstavová konfigurace (zdroj Kafka + výstup) |
maxPartitions = 8 |
8 |
| Stavová dvoufázová fáze (zdroj Kafka + shuffle) |
maxPartitions = 8, prohazovací oddíly = 20 |
28 (8 + 20) |
| Třífázová (zdroj Kafka + dva náhodné prohazy) |
maxPartitions = 8, dvě fáze mixování po 20 každá |
48 (8 + 20 + 20) |
Pokud nenastavíte maxPartitions, použijte počet partitionů v tématu Kafka.
Podpora operátorů
| Kategorie | Operator | Podporováno |
|---|---|---|
| Bezstavové | Výběr, promítání | ✓ |
| UDFs | Scala UDF | – (s omezeními) |
| UDFs | Uživatelsky definovaná funkce v Pythonu | – (s omezeními) |
| Agregace | součet, počet, maximum, minimum, průměr | ✓ |
| Windowing | Kutálení, posouvání | ✓ |
| Windowing | Session | Nepodporováno |
| Deduplication | dropDuplicates |
– (nevázaný stav) |
| Deduplication | dropDuplicatesWithinWatermark |
Nepodporováno |
| Joins | Spojení tabulky všesměrové vysílání | ✓ |
| Joins | Připojení stream-to-stream | Nepodporováno |
| Custom | transformWithState |
– (s rozdíly chování) |
| Custom | union |
– (s omezeními) |
| Custom | forEach |
Nepodporováno |
| Custom | flatMapGroupsWithState |
Nepodporováno |
| Custom | mapPartitions |
Nepodporováno |
| Custom | forEachBatch |
Nepodporováno |
transformWithState v režimu v reálném čase
transformWithState je podporován v režimu v reálném čase s následujícími rozdíly od mikrodávkového zpracování:
-
handleInputRowsse vyvolá jednou pro každý řádek, nikoli jednou pro každý klíč v dávce.inputRowsiterátor vrací při každém vyvolání jednu hodnotu. - Časovače času událostí nejsou podporovány. Časovače času zpracování se spustí, když se dlouhotrvající dávka ukončí, pokud nedorazila žádná data.
-
transformWithStateInPandasse nepodporuje.
Uživatelsky definované funkce Pandas v reálném čase
Chcete-li minimalizovat latenci u rozhraní UDF pandas, nastavte spark.sql.execution.arrow.maxRecordsPerBatch na 1. To optimalizuje latenci na úkor propustnosti. Pokud je propustnost také důležitá, nastavte tuto hodnotu na 100 nebo vyšší.
Monitorování výkonu režimu v reálném čase
Režim v reálném čase zveřejňuje metriky latence v StreamingQueryProgress rámci latencies pole. K těmto metrikám můžete přistupovat prostřednictvím StreamingQueryListener nebo kontrolou vlastnosti lastProgress u streamovacího dotazu.
| Metrika | Description |
|---|---|
processingLatencyMs |
Doba mezi načtením záznamu tokem a jejím úplným zpracováním |
sourceQueuingLatencyMs |
Doba mezi okamžikem, kdy je záznam úspěšně zapsán do sběrnice zpráv (například čas přidání do logu v systému Kafka), a okamžikem, kdy jej datový tok poprvé přečte |
e2eLatencyMs |
Celková celková latence od okamžiku, kdy se záznam vytvoří ve zdroji až po jeho úplné zpracování tokem |
Každá metrika se hlásí jako percentily p50, p90, p95 a p99.
Omezení
Doporučuje se jeden tok v reálném čase na jednu pipeline. Je povoleno více toků, ale soupeření o sloty úloh mezi toky zvyšuje latenci.
Úplný seznam omezení operátorů a zdrojů najdete v tématu Omezení režimu v reálném čase.