Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Important
Tato funkce je ve verzi Public Preview.
Tato stránka popisuje režim v reálném čase, typ triggeru pro strukturované streamování, který umožňuje zpracování dat s ultra-nízkou latencí s kompletní latencí až do 5 ms. Tento režim je určený pro provozní úlohy, které vyžadují okamžitou reakci na streamovaná data.
Režim v reálném čase je k dispozici v Databricks Runtime 16.4 LTS a novější.
Provozní úlohy
Úlohy streamování je možné široce rozdělit do analytických úloh a provozních úloh:
- Analytické úlohy využívají příjem a transformaci dat, obvykle podle medailové architektury (například vkládání dat do bronzových, stříbrných a zlatých tabulek).
- Provozní úlohy využívají data v reálném čase, používají obchodní logiku a aktivují podřízené akce nebo rozhodnutí.
Mezi příklady provozních úloh patří:
- Blokování nebo označení transakce platební kartou v reálném čase, pokud skóre podvodu překračuje prahovou hodnotu na základě faktorů, jako je neobvyklé umístění, velká velikost transakce nebo vzorce rychlé útraty.
- Doručení propagační zprávy, když clickstreamová data ukazují, že uživatel po dobu pěti minut prochází džíny a nabízí 25% slevu, pokud si koupí během dalších 15 minut.
Obecně platí, že provozní úlohy jsou charakterizovány potřebou latence pod sekundu. Toho lze dosáhnout pomocí režimu v reálném čase ve strukturovaném streamování Apache Sparku.
Jak režim v reálném čase dosahuje nízké latence
Režim v reálném čase vylepšuje architekturu provádění pomocí následujících možností:
- Spouštění dlouhotrvajících dávek (výchozí hodnota je 5 minut), ve kterých se data zpracovávají, jakmile jsou ve zdroji k dispozici.
- Všechny fáze dotazu jsou naplánovány současně. To vyžaduje, aby počet dostupných úkolových slotů byl roven nebo větší než počet úkolů ve všech fázích dávky.
- Data se mezi fázemi předávají, jakmile byla vytvořena pomocí streamového shuffle.
Na konci zpracování dávky a před spuštěním další dávky strukturované streamování ukládá kontrolní body pokroku a zpřístupňuje metriky poslední dávky. Pokud jsou dávky delší, můžou být tyto aktivity méně časté, což vede k delšímu opakování v případě selhání a zpoždění v dostupnosti metrik. Pokud jsou dávky menší, jsou tyto aktivity naopak častější, což může mít vliv na latenci. Databricks doporučuje porovnávat režim v reálném čase s cílovými úlohami a požadavky, abyste našli odpovídající interval aktivační události.
Konfigurace clusteru
Pokud chcete ve strukturovaném streamování používat režim v reálném čase, musíte nakonfigurovat klasickou úlohu Lakeflow:
V pracovním prostoru Azure Databricks klikněte v levém horním rohu na Nový . Zvolte Další a klikněte na Cluster.
Vymazat akceleraci Photonu.
Zrušte zaškrtnutí políčka Povolit automatické škálování.
V části Rozšířený výkonzrušte zaškrtnutí políčka Použít spotové instance.
V Pokročilém režimu přístupu klepněte na Ruční a vyberte Vyhrazený (dříve: Jeden uživatel).
V části Spark zadejte v konfiguraci Sparku následující:
spark.databricks.streaming.realTimeMode.enabled trueKlikněte na Vytvořit.
Požadavky na velikost clusteru
Pokud má cluster dostatek slotů úloh, můžete spustit jednu úlohu v reálném čase na cluster.
Pokud chcete běžet v režimu s nízkou latencí, musí být celkový počet dostupných slotů úloh větší nebo roven počtu úkolů ve všech fázích dotazu.
Příklady výpočtů slotů
Bezstavový kanál s jednou fází (zdroj Kafka + jímka):
Pokud maxPartitions = 8, potřebujete alespoň 8 slotů. Pokud není parametr maxPartitions nastaven, použijte počet particí tématu Kafka.
Dvoufázový stavový kanál (zdroj Kafka + náhodné prohazování):
Pokud maxPartitions = 8 a shuffle partitions = 20, budete potřebovat 8 + 20 = 28 slotů.
Třífázový kanál (zdroj Kafka + shuffle + repartition):
Při maxPartitions = 8 a dvou fázích přeskupování po 20 potřebujete 8 + 20 + 20 = 48 slotů.
Klíčové aspekty
Při konfiguraci clusteru vezměte v úvahu následující skutečnosti:
- Na rozdíl od režimu mikrodávkového režimu můžou úkoly v reálném čase zůstat nečinné při čekání na data, takže správné nastavení velikosti je nezbytné, aby nedocházelo k plýtvání zdroji.
- Zaměřte se na úroveň cílového využití (např. 50%) laděním:
-
maxPartitions(pro Kafka) -
spark.sql.shuffle.partitions(pro fáze náhodného prohazování)
-
- Databricks doporučuje nastavit maxPartitions tak, aby každá úloha zpracovávala více Kafka partitionů ke snížení režie.
- Upravte sloty úkolů na pracovníka tak, aby odpovídaly zátěži pro jednoduché jednofázové úlohy.
- U úloh s vysokými nároky na shuffle experimentujte s nastavením minimálního počtu oddílů, které se vyhnou zdržení, a odtud jej upravte. Pokud cluster nemá dostatek slotů, úloha se neplánuje.
Note
Od Databricks Runtime 16.4 LTS a dále používají všechny pipelines v reálném čase kontrolní bod v2, což umožňuje bezproblémové přepínání mezi reálným časem a mikrodávkovým režimem.
Konfigurace dotazu
Aktivační událost v reálném čase musíte povolit, aby bylo možné určit, že se má dotaz spustit pomocí režimu s nízkou latencí. Triggery v reálném čase se navíc podporují jenom v režimu aktualizace. Například:
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# in PySpark, realTime trigger requires you to specify the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Observability
Dříve byla latence dotazu od začátku do konce úzce svázána s dobou zpracování dávky, což z ní činilo dobrý indikátor latence dotazu. Tato metoda se však už nepoužívá v režimu v reálném čase, což vyžaduje alternativní přístupy k měření latence. Koncová latence je specifická pro úlohy a někdy se dá přesně měřit pouze s podnikovou logikou. Pokud je například časové razítko zdroje vygenerováno v systému Kafka jako výstup, můžete latenci vypočítat jako rozdíl mezi časovým razítkem výstupu z Kafka a zdrojovým časovým razítkem.
Latenci mezi koncovými body můžete odhadnout několika způsoby na základě částečných informací shromážděných během procesu streamování.
Použijte StreamingQueryProgress
V události jsou zahrnuty StreamingQueryProgress následující metriky, které se automaticky protokolují v protokolech ovladačů. K nim můžete přistupovat také prostřednictvím funkce zpětného StreamingQueryListeneronQueryProgress() volání.
QueryProgressEvent.json() nebo toString() zahrnují další metriky režimu v reálném čase.
- Latence zpracování (processingLatencyMs) Čas, který uplyne od okamžiku, kdy dotazování v režimu reálného času přečte záznam, až po jeho zápis do následující fáze zpracování. U dotazů s jednou fází se měří stejná doba trvání jako latence E2E. Tato metrika se hlásí pro každou úlohu.
- Latence fronty zdroje (sourceQueuingLatencyMs) Doba, která uplyne od chvíle, kdy je záznam úspěšně zapsán do sběrnice zpráv, například čas připojení protokolu v systému Kafka, a okamžik, kdy je záznam poprvé přečtený dotazem v režimu v reálném čase. Tato metrika se hlásí pro každou úlohu.
- E2E latence (e2eLatencyMs). Čas mezi tím, kdy se záznam úspěšně zapíše do sběrnice zpráv a kdy je záznam zapsán dotazem režimu reálného času. Tato metrika se agreguje pro každou dávku napříč všemi záznamy zpracovanými všemi procesy.
Například:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
Použití rozhraní API Pozorování v úlohách
Rozhraní API pro sledování pomáhá měřit latenci bez spuštění jiné úlohy. Pokud máte časové razítko zdroje, které se blíží času příjezdu zdrojových dat a předává se před dosažením jímky, nebo pokud můžete najít způsob, jak časové razítko předat, můžete pomocí rozhraní API sledování odhadnout latenci každé dávky:
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
V tomto příkladu se před výstupem položky zaznamená aktuální časové razítko a latence se odhaduje výpočtem rozdílu mezi tímto časovým razítkem a zdrojovým razítkem záznamu. Výsledky se zahrnou do průběžných zpráv a zpřístupní se posluchačům. Tady je ukázkový výstup:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
Co je podporované?
Environments
| Typ clusteru | Supported |
|---|---|
| Dedicated (dříve: jeden uživatel) | Yes |
| Standard (dříve: sdíleno) | No |
| Klasické deklarativní kanály Lakeflow Spark | No |
| Deklarativní kanály Sparku Lakeflow bez serveru | No |
| Serverless | No |
Languages
| Jazyk | Supported |
|---|---|
| Scala | Yes |
| Java | Yes |
| Python | Yes |
Režimy spouštění
| Režim spuštění | Supported |
|---|---|
| Režim aktualizace | Yes |
| režim přidávání | No |
| Režim dokončení | No |
Sources
| Sources | Supported |
|---|---|
| Apache Kafka | Yes |
| AWS MSK | Yes |
| EventHub (pomocí konektoru Kafka) | Yes |
| Kinesis | Ano (pouze režim EFO) |
| Google Pub/Sub (služba pro zasílání zpráv) | No |
| Apache Pulsar | No |
Sinks
| Sinks | Supported |
|---|---|
| Apache Kafka | Yes |
| EventHub (pomocí konektoru Kafka) | Yes |
| Kinesis | No |
| Google Pub/Sub (služba pro zasílání zpráv) | No |
| Apache Pulsar | No |
| Libovolné jímky (pomocí forEachWriter) | Yes |
Operators
| Operators | Supported |
|---|---|
| Bezstavové operace | |
|
Yes |
|
Yes |
| UDFs | |
|
Ano (s určitými omezeními) |
|
Ano (s určitými omezeními) |
| Aggregation | |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
| Agregační funkce | Yes |
| Windowing | |
|
Yes |
|
Yes |
|
No |
| Deduplication | |
|
Ano (stav je nevázaný) |
|
No |
| Stream – spojení tabulek | |
|
Yes |
| Stream – Spojení streamů | No |
| (plochý)MapGroupsWithState | No |
| transformWithState | Ano (s některými rozdíly) |
| sjednocení | Ano (s určitými omezeními) |
| forEach | Yes |
| forEachBatch | No |
| mapPartitions | Ne (viz omezení) |
Použijte transformWithState v režimu reálného času
Pro vytváření vlastních stavových aplikací podporuje transformWithStateDatabricks rozhraní API ve strukturovaném streamování Apache Sparku. Další informace o rozhraní API a fragmentech kódu najdete v tématu Vytvoření vlastní stavové aplikace .
Existuje však několik rozdílů mezi tím, jak se rozhraní API chová v režimu v reálném čase a tradičními dotazy streamování, které využívají mikrodávkovou architekturu.
- Metoda v reálném čase v režimu
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)se volá pro každý řádek.-
inputRowsIterátor vrátí jednu hodnotu. V mikrodávkovém režimu se volá jednou pro každý klíč ainputRowsiterátor vrátí všechny hodnoty klíče v mikro dávce. - Při psaní kódu musíte být obeznámeni s tímto rozdílem.
-
- Časovače událostí nejsou podporovány v režimu v reálném čase.
- V režimu v reálném čase jsou časovače zpožděné v závislosti na doručení dat. Jinak pokud neexistují žádná data, aktivuje se na konci dlouhotrvající dávky. Pokud se například časovač má spustit v 10:00:00 a současně nedojde k příjmu dat, neaktivuje se. Místo toho, pokud data přicházejí v 10:00:10, časovač se aktivuje se zpožděním 10 sekund. Nebo pokud žádná data nedorazí a dlouhotrvající dávka se ukončí, spustí časovač před ukončením dlouhotrvající dávky.
Uživatelem definované funkce Pythonu
Databricks podporuje většinu uživatelem definovaných funkcí Pythonu (UDF) v režimu v reálném čase:
| Typ UDF | Supported |
|---|---|
| Bezstavová UDF | |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
| Stavové seskupení UDF (UDAF) | |
|
Yes |
|
No |
| Bezstavové seskupení UDF (UDAF) | |
|
No |
|
No |
|
No |
| funkce tabulky | |
|
No |
| UC UDF | No |
Při používání UDFs v Pythonu v režimu reálného času je potřeba zvážit několik aspektů:
- Pokud chcete minimalizovat latenci, nakonfigurujte velikost dávky Arrow (spark.sql.execution.arrow.maxRecordsPerBatch) na 1.
- Kompromis: Tato konfigurace optimalizuje latenci na úkor propustnosti. U většiny úloh se toto nastavení doporučuje.
- Zvětšete velikost dávky pouze tehdy, když je požadována vyšší propustnost, aby se přizpůsobilo množství vstupních dat, při přijetí potenciálního zvýšení latence.
- UDF a funkce Pandas nefungují dobře s velikostí dávky Arrow 1.
- Pokud používáte UDF nebo funkce pandas, nastavte velikost dávky Arrow na vyšší hodnotu (například 100 nebo vyšší).
- Všimněte si, že to znamená vyšší latenci. Databricks doporučuje používat funkci UDF nebo šipky, pokud je to možné.
- Kvůli problému s výkonem knihovny pandas se transformWithState podporuje pouze v
Rowrozhraní.
Techniky optimalizace
| Technika | Ve výchozím nastavení povoleno |
|---|---|
| Asynchronní sledování průběhu: Přesune zápis do posunového protokolu a protokolu závazků do asynchronního vlákna, čímž se sníží doba mezi dvěma mikrodávkami. To může pomoct snížit latenci bezstavových dotazů streamování. | No |
| Asynchronní vytváření kontrolních bodů stavu: Pomáhá snížit latenci stavových streamovacích dotazů tím, že začne zpracovávat další mikrodávku hned po dokončení výpočtu předchozí mikrodávkové dávky bez čekání na kontrolní body stavu. | No |
Limitations
Omezení zdroje
U Kinesis se režim dotazování nepodporuje. Kromě toho může časté předělování negativně ovlivnit latenci.
Omezení Unie
Pro Unii platí určitá omezení:
- Sjednocení sám se sebou není podporováno:
- Kafka: Nemůžete použít stejný zdrojový objekt datového rámce a sjednocovat odvozené datové rámce z něj. Alternativní řešení: Použijte různé datové rámce, které se čtou ze stejného zdroje.
- Kinéza: Nelze sjednocovat datové rámce odvozené ze stejného zdroje Kinesis se stejnou konfigurací. Alternativní řešení: Kromě použití různých datových rámců můžete každému datovému rámci přiřadit jinou možnost consumerName.
- Stavové operátory (například
aggregate,deduplicate,transformWithState) definované před Sjednocením nejsou podporovány. - Sjednocení s dávkovými zdroji není podporováno.
Omezení MapPartitions
mapPartitions v jazyce Scala a podobných rozhraních API Pythonu (mapInPandas, mapInArrow) přebírají iterátor celého vstupního oddílu a vytvoří iterátor pro celý výstup s libovolným mapováním mezi vstupem a výstupem. Tato rozhraní API můžou způsobit problémy s výkonem v režimu streamování Real-Time tím, že zablokují celý výstup, což zvyšuje latenci. Sémantika těchto rozhraní API nepodporuje šíření vodoznaku dobře.
K dosažení podobných funkcí použijte skalární UDF v kombinaci s Transformace složitých datových typů nebo filter místo toho.
Examples
Následující příklady ukazují podporované dotazy.
Bezstavové dotazy
Podporují se všechny bezstavové dotazy s jednou nebo více fázemi.
Zdroj Kafka do jímky Kafka
V tomto příkladu čtete ze zdroje Kafka a zapisujete do sinku Kafka.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Rozdělení disku
V tomto příkladu čtete ze zdroje Kafka, znovu rozdělíte data do 20 oddílů a zapíšete do jímky Kafka.
Před použitím přerozdělení nastavte konfiguraci Sparku spark.sql.execution.sortBeforeRepartition na false.
Python
# Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Připojení snímku streamu (pouze broadcast)
V tomto příkladu načtete ze systému Kafka, spojíte data se statickou tabulkou a zapíšete je do jímky Kafka. Mějte na paměti, že podporovaná jsou pouze statická spojení streamu, která vysílají statickou tabulku, což znamená, že statická tabulka by se měla vejít do paměti.
Python
from pyspark.sql.functions import broadcast, expr
# We assume the static table in the path `stateic_table_location` has a column 'lookupKey'.
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.withColumn("joinKey", expr("CAST(value AS STRING)"))
.join(
broadcast(spark.read.format("parquet").load(static_table_location)),
expr("joinKey = lookupKey")
)
.selectExpr("value AS key", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Zdroj kinesis do jímky Kafka
V tomto příkladu si přečtete ze zdroje Kinesis a zapíšete do jímky Kafka.
Python
query = (
spark.readStream
.format("kinesis")
.option("region", region_name)
.option("awsAccessKey", aws_access_key_id)
.option("awsSecretKey", aws_secret_access_key)
.option("consumerMode", "efo")
.option("consumerName", consumer_name)
.load()
.selectExpr("parttitionKey AS key", "CAST(data AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kinesis")
.option("region", regionName)
.option("awsAccessKey", awsAccessKeyId)
.option("awsSecretKey", awsSecretAccessKey)
.option("consumerMode", "efo")
.option("consumerName", consumerName)
.load()
.select(
col("partitionKey").alias("key"),
col("data").cast("string").alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Union
V tomto příkladu sjednocujete dva datové rámce Kafka ze dvou různých témat a zapíšete je do jímky Kafka.
Python
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Stavové dotazy
Deduplication
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.dropDuplicates(["timestamp", "value"])
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.dropDuplicates("timestamp", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Aggregation
Python
from pyspark.sql.functions import col
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
Sjednocení s agregací
V tomto příkladu nejprve sjednocujete dva datové rámce Kafka ze dvou různých témat a pak provedete agregaci. Nakonec napíšete do jímky Kafka.
Python
from pyspark.sql.functions import col
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
TransformWithState
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import org.apache.spark.sql.streaming.{ListState, MapState, StatefulProcessor, OutputMode, TTLConfig, TimeMode, TimerValues, ValueState}
/**
* This processor counts the number of records it has seen for each key using state variables
* with TTLs. It redundantly maintains this count with a value, list, and map state to put load
* on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
* the count for a given grouping key.)
*
* The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
* The source-timestamp is passed through so that we can calculate end-to-end latency. The output
* schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
*
*/
class RTMStatefulProcessor(ttlConfig: TTLConfig)
extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
@transient private var _value: ValueState[Long] = _
@transient private var _map: MapState[Long, String] = _
@transient private var _list: ListState[String] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
// Counts the number of records this key has seen
_value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
_map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
_list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, Long)],
timerValues: TimerValues): Iterator[(String, Long, Long)] = {
inputRows.map { row =>
val key = row._1
val sourceTimestamp = row._2
val oldValue = _value.get()
_value.update(oldValue + 1)
_map.updateValue(oldValue, key)
_list.appendValue(key)
(key, oldValue + 1, sourceTimestamp)
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
.as[(String, String, Timestamp)]
.groupByKey(row => row._1)
.transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
.as[(String, Long, Long)]
.select(
col("_1").as("key"),
col("_2").as("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
Note
Existuje rozdíl mezi tím, jak režim v reálném čase a jiné režimy spouštění ve strukturovaném streamování spouští StatefulProcessor v transformWithState. Viz Použití transformWithState v režimu v reálném čase
TransformWithState (PySpark, rozhraní řádku)
from typing import Iterator, Tuple
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType
class RTMStatefulProcessor(StatefulProcessor):
"""
This processor counts the number of records it has seen for each key using state variables
with TTLs. It redundantly maintains this count with a value, list, and map state to put load
on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
the count for a given grouping key.)
The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
The source-timestamp is passed through so that we can calculate end-to-end latency. The output
schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
"""
def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("value", LongType(), True)])
self.value_state = handle.getValueState("value", state_schema, 30000)
map_key_schema = StructType([StructField("key", LongType(), True)])
map_value_schema = StructType([StructField("value", StringType(), True)])
self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
list_schema = StructType([StructField("value", StringType(), True)])
self.list_state = handle.getListState("list", list_schema, 30000)
def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
for row in rows:
# row is a tuple (key, source_timestamp)
key_str = row[0]
source_timestamp = row[1]
old_value = value.get()
if old_value is None:
old_value = 0
self.value_state.update((old_value + 1,))
self.map_state.update((old_value,), (key_str,))
self.list_state.appendValue((key_str,))
yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)
def close(self) -> None:
pass
output_schema = StructType(
[
StructField("key", StringType(), True),
StructField("value", LongType(), True),
StructField("timestamp", TimestampType(), True),
]
)
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.groupBy("key")
.transformWithState(
statefulProcessor=RTMStatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="processingTime",
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("Update")
.start()
)
Note
Existuje rozdíl v tom, jak režim v reálném čase a ostatní režimy spouštění ve strukturálním streamování provádějí StatefulProcessor v transformWithState. Viz Použití transformWithState v režimu v reálném čase
Sinks
Zápis do Postgresu přes foreachSink
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{ForeachWriter, Row}
/**
* Groups connection properties for
* the JDBC writers.
*
* @param url JDBC url of the form jdbc:subprotocol:subname to connect to
* @param dbtable database table that should be written into
* @param username username for authentication
* @param password password for authentication
*/
class JdbcWriterConfig(
val url: String,
val dbtable: String,
val username: String,
val password: String,
) extends Serializable
/**
* Handles streaming data writes to a database sink via JDBC, by:
* - connecting to the database
* - buffering incoming data rows in batches to reduce write overhead
*
* @param config connection parameters and configuration knobs for the writer
*/
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
extends ForeachWriter[Row] with Serializable {
// The writer currently only supports this hard-coded schema
private val UPSERT_STATEMENT_SQL =
s"""MERGE INTO "${config.dbtable}"
|USING (
| SELECT
| CAST(? AS INTEGER) AS "id",
| CAST(? AS CHARACTER VARYING) AS "data"
|) AS "source"
|ON "test"."id" = "source"."id"
|WHEN MATCHED THEN
| UPDATE SET "data" = "source"."data"
|WHEN NOT MATCHED THEN
| INSERT ("id", "data") VALUES ("source"."id", "source"."data")
|""".stripMargin
private val MAX_BUFFER_SIZE = 3
private val buffer = new Array[Row](MAX_BUFFER_SIZE)
private var bufferSize = 0
private var connection: Connection = _
/**
* Flushes the [[buffer]] by writing all rows in the buffer to the database.
*/
private def flushBuffer(): Unit = {
require(connection != null)
if (bufferSize == 0) {
return
}
var upsertStatement: PreparedStatement = null
try {
upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)
for (i <- 0 until bufferSize) {
val row = buffer(i)
upsertStatement.setInt(1, row.getAs[String]("key"))
upsertStatement.setString(2, row.getAs[String]("value"))
upsertStatement.addBatch()
}
upsertStatement.executeBatch()
connection.commit()
bufferSize = 0
} catch { case e: Exception =>
if (connection != null) {
connection.rollback()
}
throw e
} finally {
if (upsertStatement != null) {
upsertStatement.close()
}
}
}
override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(config.url, config.username, config.password)
true
}
override def process(row: Row): Unit = {
buffer(bufferSize) = row
bufferSize += 1
if (bufferSize >= MAX_BUFFER_SIZE) {
flushBuffer()
}
}
override def close(errorOrNull: Throwable): Unit = {
flushBuffer()
if (connection != null) {
connection.close()
connection = null
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.outputMode(OutputMode.Update())
.trigger(defaultTrigger)
.foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
.start()
Display
Important
Tato funkce je dostupná v Databricks Runtime 17.1 a novějších verzích.
Zdroj rychlosti zobrazení
V tomto příkladu si přečtete ze zdroje rychlosti a zobrazíte datový rámec streamování v poznámkovém bloku.
Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())