Sdílet prostřednictvím


Režim v reálném čase ve strukturovaném streamování

Important

Tato funkce je ve verzi Public Preview.

Tato stránka popisuje režim v reálném čase, typ triggeru pro strukturované streamování, který umožňuje zpracování dat s ultra-nízkou latencí s kompletní latencí až do 5 ms. Tento režim je určený pro provozní úlohy, které vyžadují okamžitou reakci na streamovaná data.

Režim v reálném čase je k dispozici v Databricks Runtime 16.4 LTS a novější.

Provozní úlohy

Úlohy streamování je možné široce rozdělit do analytických úloh a provozních úloh:

  • Analytické úlohy využívají příjem a transformaci dat, obvykle podle medailové architektury (například vkládání dat do bronzových, stříbrných a zlatých tabulek).
  • Provozní úlohy využívají data v reálném čase, používají obchodní logiku a aktivují podřízené akce nebo rozhodnutí.

Mezi příklady provozních úloh patří:

  • Blokování nebo označení transakce platební kartou v reálném čase, pokud skóre podvodu překračuje prahovou hodnotu na základě faktorů, jako je neobvyklé umístění, velká velikost transakce nebo vzorce rychlé útraty.
  • Doručení propagační zprávy, když clickstreamová data ukazují, že uživatel po dobu pěti minut prochází džíny a nabízí 25% slevu, pokud si koupí během dalších 15 minut.

Obecně platí, že provozní úlohy jsou charakterizovány potřebou latence pod sekundu. Toho lze dosáhnout pomocí režimu v reálném čase ve strukturovaném streamování Apache Sparku.

Jak režim v reálném čase dosahuje nízké latence

Režim v reálném čase vylepšuje architekturu provádění pomocí následujících možností:

  • Spouštění dlouhotrvajících dávek (výchozí hodnota je 5 minut), ve kterých se data zpracovávají, jakmile jsou ve zdroji k dispozici.
  • Všechny fáze dotazu jsou naplánovány současně. To vyžaduje, aby počet dostupných úkolových slotů byl roven nebo větší než počet úkolů ve všech fázích dávky.
  • Data se mezi fázemi předávají, jakmile byla vytvořena pomocí streamového shuffle.

Na konci zpracování dávky a před spuštěním další dávky strukturované streamování ukládá kontrolní body pokroku a zpřístupňuje metriky poslední dávky. Pokud jsou dávky delší, můžou být tyto aktivity méně časté, což vede k delšímu opakování v případě selhání a zpoždění v dostupnosti metrik. Pokud jsou dávky menší, jsou tyto aktivity naopak častější, což může mít vliv na latenci. Databricks doporučuje porovnávat režim v reálném čase s cílovými úlohami a požadavky, abyste našli odpovídající interval aktivační události.

Konfigurace clusteru

Pokud chcete ve strukturovaném streamování používat režim v reálném čase, musíte nakonfigurovat klasickou úlohu Lakeflow:

  1. V pracovním prostoru Azure Databricks klikněte v levém horním rohu na Nový . Zvolte Další a klikněte na Cluster.

  2. Vymazat akceleraci Photonu.

  3. Zrušte zaškrtnutí políčka Povolit automatické škálování.

  4. V části Rozšířený výkonzrušte zaškrtnutí políčka Použít spotové instance.

  5. V Pokročilém režimu přístupu klepněte na Ruční a vyberte Vyhrazený (dříve: Jeden uživatel).

  6. V části Spark zadejte v konfiguraci Sparku následující:

    spark.databricks.streaming.realTimeMode.enabled true
    
  7. Klikněte na Vytvořit.

Požadavky na velikost clusteru

Pokud má cluster dostatek slotů úloh, můžete spustit jednu úlohu v reálném čase na cluster.

Pokud chcete běžet v režimu s nízkou latencí, musí být celkový počet dostupných slotů úloh větší nebo roven počtu úkolů ve všech fázích dotazu.

Příklady výpočtů slotů

Bezstavový kanál s jednou fází (zdroj Kafka + jímka):

Pokud maxPartitions = 8, potřebujete alespoň 8 slotů. Pokud není parametr maxPartitions nastaven, použijte počet particí tématu Kafka.

Dvoufázový stavový kanál (zdroj Kafka + náhodné prohazování):

Pokud maxPartitions = 8 a shuffle partitions = 20, budete potřebovat 8 + 20 = 28 slotů.

Třífázový kanál (zdroj Kafka + shuffle + repartition):

Při maxPartitions = 8 a dvou fázích přeskupování po 20 potřebujete 8 + 20 + 20 = 48 slotů.

Klíčové aspekty

Při konfiguraci clusteru vezměte v úvahu následující skutečnosti:

  • Na rozdíl od režimu mikrodávkového režimu můžou úkoly v reálném čase zůstat nečinné při čekání na data, takže správné nastavení velikosti je nezbytné, aby nedocházelo k plýtvání zdroji.
  • Zaměřte se na úroveň cílového využití (např. 50%) laděním:
    • maxPartitions (pro Kafka)
    • spark.sql.shuffle.partitions (pro fáze náhodného prohazování)
  • Databricks doporučuje nastavit maxPartitions tak, aby každá úloha zpracovávala více Kafka partitionů ke snížení režie.
  • Upravte sloty úkolů na pracovníka tak, aby odpovídaly zátěži pro jednoduché jednofázové úlohy.
  • U úloh s vysokými nároky na shuffle experimentujte s nastavením minimálního počtu oddílů, které se vyhnou zdržení, a odtud jej upravte. Pokud cluster nemá dostatek slotů, úloha se neplánuje.

Note

Od Databricks Runtime 16.4 LTS a dále používají všechny pipelines v reálném čase kontrolní bod v2, což umožňuje bezproblémové přepínání mezi reálným časem a mikrodávkovým režimem.

Konfigurace dotazu

Aktivační událost v reálném čase musíte povolit, aby bylo možné určit, že se má dotaz spustit pomocí režimu s nízkou latencí. Triggery v reálném čase se navíc podporují jenom v režimu aktualizace. Například:

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # in PySpark, realTime trigger requires you to specify the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Observability

Dříve byla latence dotazu od začátku do konce úzce svázána s dobou zpracování dávky, což z ní činilo dobrý indikátor latence dotazu. Tato metoda se však už nepoužívá v režimu v reálném čase, což vyžaduje alternativní přístupy k měření latence. Koncová latence je specifická pro úlohy a někdy se dá přesně měřit pouze s podnikovou logikou. Pokud je například časové razítko zdroje vygenerováno v systému Kafka jako výstup, můžete latenci vypočítat jako rozdíl mezi časovým razítkem výstupu z Kafka a zdrojovým časovým razítkem.

Latenci mezi koncovými body můžete odhadnout několika způsoby na základě částečných informací shromážděných během procesu streamování.

Použijte StreamingQueryProgress

V události jsou zahrnuty StreamingQueryProgress následující metriky, které se automaticky protokolují v protokolech ovladačů. K nim můžete přistupovat také prostřednictvím funkce zpětného StreamingQueryListeneronQueryProgress() volání. QueryProgressEvent.json() nebo toString() zahrnují další metriky režimu v reálném čase.

  1. Latence zpracování (processingLatencyMs) Čas, který uplyne od okamžiku, kdy dotazování v režimu reálného času přečte záznam, až po jeho zápis do následující fáze zpracování. U dotazů s jednou fází se měří stejná doba trvání jako latence E2E. Tato metrika se hlásí pro každou úlohu.
  2. Latence fronty zdroje (sourceQueuingLatencyMs) Doba, která uplyne od chvíle, kdy je záznam úspěšně zapsán do sběrnice zpráv, například čas připojení protokolu v systému Kafka, a okamžik, kdy je záznam poprvé přečtený dotazem v režimu v reálném čase. Tato metrika se hlásí pro každou úlohu.
  3. E2E latence (e2eLatencyMs). Čas mezi tím, kdy se záznam úspěšně zapíše do sběrnice zpráv a kdy je záznam zapsán dotazem režimu reálného času. Tato metrika se agreguje pro každou dávku napříč všemi záznamy zpracovanými všemi procesy.

Například:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

Použití rozhraní API Pozorování v úlohách

Rozhraní API pro sledování pomáhá měřit latenci bez spuštění jiné úlohy. Pokud máte časové razítko zdroje, které se blíží času příjezdu zdrojových dat a předává se před dosažením jímky, nebo pokud můžete najít způsob, jak časové razítko předat, můžete pomocí rozhraní API sledování odhadnout latenci každé dávky:

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

V tomto příkladu se před výstupem položky zaznamená aktuální časové razítko a latence se odhaduje výpočtem rozdílu mezi tímto časovým razítkem a zdrojovým razítkem záznamu. Výsledky se zahrnou do průběžných zpráv a zpřístupní se posluchačům. Tady je ukázkový výstup:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Co je podporované?

Environments

Typ clusteru Supported
Dedicated (dříve: jeden uživatel) Yes
Standard (dříve: sdíleno) No
Klasické deklarativní kanály Lakeflow Spark No
Deklarativní kanály Sparku Lakeflow bez serveru No
Serverless No

Languages

Jazyk Supported
Scala Yes
Java Yes
Python Yes

Režimy spouštění

Režim spuštění Supported
Režim aktualizace Yes
režim přidávání No
Režim dokončení No

Sources

Sources Supported
Apache Kafka Yes
AWS MSK Yes
EventHub (pomocí konektoru Kafka) Yes
Kinesis Ano (pouze režim EFO)
Google Pub/Sub (služba pro zasílání zpráv) No
Apache Pulsar No

Sinks

Sinks Supported
Apache Kafka Yes
EventHub (pomocí konektoru Kafka) Yes
Kinesis No
Google Pub/Sub (služba pro zasílání zpráv) No
Apache Pulsar No
Libovolné jímky (pomocí forEachWriter) Yes

Operators

Operators Supported
Bezstavové operace
  • Selection
Yes
  • Projection
Yes
UDFs
  • Scala UDF
Ano (s určitými omezeními)
  • Uživatelsky definovaná funkce v Pythonu
Ano (s určitými omezeními)
Aggregation
  • sum
Yes
  • count
Yes
  • max
Yes
  • min
Yes
  • avg
Yes
Agregační funkce Yes
Windowing
  • Tumbling
Yes
  • Sliding
Yes
  • Session
No
Deduplication
  • odstraněníDuplicit
Ano (stav je nevázaný)
  • OdstraněníDuplicitVeVodoznaku
No
Stream – spojení tabulek
  • Tabulka vysílání (by měla být malá)
Yes
Stream – Spojení streamů No
(plochý)MapGroupsWithState No
transformWithState Ano (s některými rozdíly)
sjednocení Ano (s určitými omezeními)
forEach Yes
forEachBatch No
mapPartitions Ne (viz omezení)

Použijte transformWithState v režimu reálného času

Pro vytváření vlastních stavových aplikací podporuje transformWithStateDatabricks rozhraní API ve strukturovaném streamování Apache Sparku. Další informace o rozhraní API a fragmentech kódu najdete v tématu Vytvoření vlastní stavové aplikace .

Existuje však několik rozdílů mezi tím, jak se rozhraní API chová v režimu v reálném čase a tradičními dotazy streamování, které využívají mikrodávkovou architekturu.

  • Metoda v reálném čase v režimu handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) se volá pro každý řádek.
    • inputRows Iterátor vrátí jednu hodnotu. V mikrodávkovém režimu se volá jednou pro každý klíč a inputRows iterátor vrátí všechny hodnoty klíče v mikro dávce.
    • Při psaní kódu musíte být obeznámeni s tímto rozdílem.
  • Časovače událostí nejsou podporovány v režimu v reálném čase.
  • V režimu v reálném čase jsou časovače zpožděné v závislosti na doručení dat. Jinak pokud neexistují žádná data, aktivuje se na konci dlouhotrvající dávky. Pokud se například časovač má spustit v 10:00:00 a současně nedojde k příjmu dat, neaktivuje se. Místo toho, pokud data přicházejí v 10:00:10, časovač se aktivuje se zpožděním 10 sekund. Nebo pokud žádná data nedorazí a dlouhotrvající dávka se ukončí, spustí časovač před ukončením dlouhotrvající dávky.

Uživatelem definované funkce Pythonu

Databricks podporuje většinu uživatelem definovaných funkcí Pythonu (UDF) v režimu v reálném čase:

Typ UDF Supported
Bezstavová UDF
  • Skalární UDF v Pythonu (odkaz)
Yes
  • Skalární UDF se šipkami
Yes
  • Pandas Skalární UDF (odkaz)
Yes
  • Funkce Šipková (mapInArrow)
Yes
Yes
Stavové seskupení UDF (UDAF)
  • transformWithState (POZNÁMKA: pouze Row rozhraní)
Yes
  • applyInPandasWithState
No
Bezstavové seskupení UDF (UDAF)
  • apply
No
  • applyInArrow
No
  • applyInPandas
No
funkce tabulky
No
UC UDF No

Při používání UDFs v Pythonu v režimu reálného času je potřeba zvážit několik aspektů:

  • Pokud chcete minimalizovat latenci, nakonfigurujte velikost dávky Arrow (spark.sql.execution.arrow.maxRecordsPerBatch) na 1.
    • Kompromis: Tato konfigurace optimalizuje latenci na úkor propustnosti. U většiny úloh se toto nastavení doporučuje.
    • Zvětšete velikost dávky pouze tehdy, když je požadována vyšší propustnost, aby se přizpůsobilo množství vstupních dat, při přijetí potenciálního zvýšení latence.
  • UDF a funkce Pandas nefungují dobře s velikostí dávky Arrow 1.
    • Pokud používáte UDF nebo funkce pandas, nastavte velikost dávky Arrow na vyšší hodnotu (například 100 nebo vyšší).
    • Všimněte si, že to znamená vyšší latenci. Databricks doporučuje používat funkci UDF nebo šipky, pokud je to možné.
  • Kvůli problému s výkonem knihovny pandas se transformWithState podporuje pouze v Row rozhraní.

Techniky optimalizace

Technika Ve výchozím nastavení povoleno
Asynchronní sledování průběhu: Přesune zápis do posunového protokolu a protokolu závazků do asynchronního vlákna, čímž se sníží doba mezi dvěma mikrodávkami. To může pomoct snížit latenci bezstavových dotazů streamování. No
Asynchronní vytváření kontrolních bodů stavu: Pomáhá snížit latenci stavových streamovacích dotazů tím, že začne zpracovávat další mikrodávku hned po dokončení výpočtu předchozí mikrodávkové dávky bez čekání na kontrolní body stavu. No

Limitations

Omezení zdroje

U Kinesis se režim dotazování nepodporuje. Kromě toho může časté předělování negativně ovlivnit latenci.

Omezení Unie

Pro Unii platí určitá omezení:

  • Sjednocení sám se sebou není podporováno:
    • Kafka: Nemůžete použít stejný zdrojový objekt datového rámce a sjednocovat odvozené datové rámce z něj. Alternativní řešení: Použijte různé datové rámce, které se čtou ze stejného zdroje.
    • Kinéza: Nelze sjednocovat datové rámce odvozené ze stejného zdroje Kinesis se stejnou konfigurací. Alternativní řešení: Kromě použití různých datových rámců můžete každému datovému rámci přiřadit jinou možnost consumerName.
  • Stavové operátory (například aggregate, deduplicate, transformWithState) definované před Sjednocením nejsou podporovány.
  • Sjednocení s dávkovými zdroji není podporováno.

Omezení MapPartitions

mapPartitions v jazyce Scala a podobných rozhraních API Pythonu (mapInPandas, mapInArrow) přebírají iterátor celého vstupního oddílu a vytvoří iterátor pro celý výstup s libovolným mapováním mezi vstupem a výstupem. Tato rozhraní API můžou způsobit problémy s výkonem v režimu streamování Real-Time tím, že zablokují celý výstup, což zvyšuje latenci. Sémantika těchto rozhraní API nepodporuje šíření vodoznaku dobře.

K dosažení podobných funkcí použijte skalární UDF v kombinaci s Transformace složitých datových typů nebo filter místo toho.

Examples

Následující příklady ukazují podporované dotazy.

Bezstavové dotazy

Podporují se všechny bezstavové dotazy s jednou nebo více fázemi.

Zdroj Kafka do jímky Kafka

V tomto příkladu čtete ze zdroje Kafka a zapisujete do sinku Kafka.

Python
query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .trigger(realTime="5 minutes")
        .outputMode("update")
        .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Rozdělení disku

V tomto příkladu čtete ze zdroje Kafka, znovu rozdělíte data do 20 oddílů a zapíšete do jímky Kafka.

Před použitím přerozdělení nastavte konfiguraci Sparku spark.sql.execution.sortBeforeRepartition na false.

Python
# Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("subscribe", input_topic)
    .option("startingOffsets", "earliest")
    .load()
    .repartition(20)
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .repartition(20)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Připojení snímku streamu (pouze broadcast)

V tomto příkladu načtete ze systému Kafka, spojíte data se statickou tabulkou a zapíšete je do jímky Kafka. Mějte na paměti, že podporovaná jsou pouze statická spojení streamu, která vysílají statickou tabulku, což znamená, že statická tabulka by se měla vejít do paměti.

Python
from pyspark.sql.functions import broadcast, expr

# We assume the static table in the path `stateic_table_location` has a column 'lookupKey'.

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("subscribe", input_topic)
    .option("startingOffsets", "earliest")
    .load()
    .withColumn("joinKey", expr("CAST(value AS STRING)"))
    .join(
        broadcast(spark.read.format("parquet").load(static_table_location)),
        expr("joinKey = lookupKey")
    )
    .selectExpr("value AS key", "value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Zdroj kinesis do jímky Kafka

V tomto příkladu si přečtete ze zdroje Kinesis a zapíšete do jímky Kafka.

Python
query = (
    spark.readStream
        .format("kinesis")
        .option("region", region_name)
        .option("awsAccessKey", aws_access_key_id)
        .option("awsSecretKey", aws_secret_access_key)
        .option("consumerMode", "efo")
        .option("consumerName", consumer_name)
        .load()
        .selectExpr("parttitionKey AS key", "CAST(data AS STRING) AS value")
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .trigger(realTime="5 minutes")
        .outputMode("update")
        .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kinesis")
      .option("region", regionName)
      .option("awsAccessKey", awsAccessKeyId)
      .option("awsSecretKey", awsSecretAccessKey)
      .option("consumerMode", "efo")
      .option("consumerName", consumerName)
      .load()
      .select(
        col("partitionKey").alias("key"),
        col("data").cast("string").alias("value")
      )
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Union

V tomto příkladu sjednocujete dva datové rámce Kafka ze dvou různých témat a zapíšete je do jímky Kafka.

Python
df1 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_1)
    .load()
)

df2 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_2)
    .load()
)

query = (
    df1.union(df2)
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic1)
      .load()

val df2 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic2)
      .load()

df1.union(df2)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Stavové dotazy

Deduplication

Python
query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic)
    .load()
    .dropDuplicates(["timestamp", "value"])
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .dropDuplicates("timestamp", "value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Aggregation

Python
from pyspark.sql.functions import col

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic)
    .load()
    .groupBy(col("timestamp"), col("value"))
    .count()
    .selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .groupBy(col("timestamp"), col("value"))
      .count()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply("5 minutes"))
      .outputMode(OutputMode.Update())
      .start()

Sjednocení s agregací

V tomto příkladu nejprve sjednocujete dva datové rámce Kafka ze dvou různých témat a pak provedete agregaci. Nakonec napíšete do jímky Kafka.

Python
from pyspark.sql.functions import col

df1 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_1)
    .load()
)

df2 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_2)
    .load()
)

query = (
    df1.union(df2)
    .groupBy(col("timestamp"), col("value"))
    .count()
    .selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic1)
      .load()

val df2 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic2)
      .load()

df1.union(df2)
      .groupBy(col("timestamp"), col("value"))
      .count()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

TransformWithState

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import org.apache.spark.sql.streaming.{ListState, MapState, StatefulProcessor, OutputMode, TTLConfig, TimeMode, TimerValues, ValueState}

/**
 * This processor counts the number of records it has seen for each key using state variables
 * with TTLs. It redundantly maintains this count with a value, list, and map state to put load
 * on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
 * the count for a given grouping key.)
 *
 * The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
 * The source-timestamp is passed through so that we can calculate end-to-end latency. The output
 * schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
 *
 */

class RTMStatefulProcessor(ttlConfig: TTLConfig)
  extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
  @transient private var _value: ValueState[Long] = _
  @transient private var _map: MapState[Long, String] = _
  @transient private var _list: ListState[String] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    // Counts the number of records this key has seen
    _value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
    _map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
    _list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
  }

  override def handleInputRows(
      key: String,
      inputRows: Iterator[(String, Long)],
      timerValues: TimerValues): Iterator[(String, Long, Long)] = {
    inputRows.map { row =>
      val key = row._1
      val sourceTimestamp = row._2

      val oldValue = _value.get()
      _value.update(oldValue + 1)
      _map.updateValue(oldValue, key)
      _list.appendValue(key)

      (key, oldValue + 1, sourceTimestamp)
    }
  }
}

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
      .as[(String, String, Timestamp)]
      .groupByKey(row => row._1)
      .transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
      .as[(String, Long, Long)]
      .select(
            col("_1").as("key"),
            col("_2").as("value")
      )
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply("5 minutes"))
      .outputMode(OutputMode.Update())
      .start()

Note

Existuje rozdíl mezi tím, jak režim v reálném čase a jiné režimy spouštění ve strukturovaném streamování spouští StatefulProcessor v transformWithState. Viz Použití transformWithState v režimu v reálném čase

TransformWithState (PySpark, rozhraní řádku)

from typing import Iterator, Tuple

from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType


class RTMStatefulProcessor(StatefulProcessor):
  """
  This processor counts the number of records it has seen for each key using state variables
  with TTLs. It redundantly maintains this count with a value, list, and map state to put load
  on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
  the count for a given grouping key.)

  The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
  The source-timestamp is passed through so that we can calculate end-to-end latency. The output
  schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
  """

  def init(self, handle: StatefulProcessorHandle) -> None:
    state_schema = StructType([StructField("value", LongType(), True)])
    self.value_state = handle.getValueState("value", state_schema, 30000)
    map_key_schema = StructType([StructField("key", LongType(), True)])
    map_value_schema = StructType([StructField("value", StringType(), True)])
    self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
    list_schema = StructType([StructField("value", StringType(), True)])
    self.list_state = handle.getListState("list", list_schema, 30000)

  def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
    for row in rows:
      # row is a tuple (key, source_timestamp)
      key_str = row[0]
      source_timestamp = row[1]
      old_value = value.get()
      if old_value is None:
        old_value = 0
      self.value_state.update((old_value + 1,))
      self.map_state.update((old_value,), (key_str,))
      self.list_state.appendValue((key_str,))
      yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)

  def close(self) -> None:
    pass


output_schema = StructType(
  [
    StructField("key", StringType(), True),
    StructField("value", LongType(), True),
    StructField("timestamp", TimestampType(), True),
  ]
)

query = (
  spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker_address)
  .option("subscribe", input_topic)
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
  .groupBy("key")
  .transformWithState(
    statefulProcessor=RTMStatefulProcessor(),
    outputStructType=output_schema,
    outputMode="Update",
    timeMode="processingTime",
  )
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker_address)
  .option("topic", output_topic)
  .option("checkpointLocation", checkpoint_location)
  .trigger(realTime="5 minutes")
  .outputMode("Update")
  .start()
)

Note

Existuje rozdíl v tom, jak režim v reálném čase a ostatní režimy spouštění ve strukturálním streamování provádějí StatefulProcessor v transformWithState. Viz Použití transformWithState v režimu v reálném čase

Sinks

Zápis do Postgresu přes foreachSink

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.{ForeachWriter, Row}

/**
 * Groups connection properties for
 * the JDBC writers.
 *
 * @param url JDBC url of the form jdbc:subprotocol:subname to connect to
 * @param dbtable database table that should be written into
 * @param username username for authentication
 * @param password password for authentication
 */
class JdbcWriterConfig(
    val url: String,
    val dbtable: String,
    val username: String,
    val password: String,
) extends Serializable

/**
 * Handles streaming data writes to a database sink via JDBC, by:
 *   - connecting to the database
 *   - buffering incoming data rows in batches to reduce write overhead
 *
 * @param config connection parameters and configuration knobs for the writer
 */
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
  extends ForeachWriter[Row] with Serializable {
  // The writer currently only supports this hard-coded schema
  private val UPSERT_STATEMENT_SQL =
    s"""MERGE INTO "${config.dbtable}"
       |USING (
       |  SELECT
       |    CAST(? AS INTEGER) AS "id",
       |    CAST(? AS CHARACTER VARYING) AS "data"
       |) AS "source"
       |ON "test"."id" = "source"."id"
       |WHEN MATCHED THEN
       |  UPDATE SET "data" = "source"."data"
       |WHEN NOT MATCHED THEN
       |  INSERT ("id", "data") VALUES ("source"."id", "source"."data")
       |""".stripMargin

  private val MAX_BUFFER_SIZE = 3
  private val buffer = new Array[Row](MAX_BUFFER_SIZE)
  private var bufferSize = 0

  private var connection: Connection = _

  /**
   * Flushes the [[buffer]] by writing all rows in the buffer to the database.
   */
  private def flushBuffer(): Unit = {
    require(connection != null)

    if (bufferSize == 0) {
      return
    }

    var upsertStatement: PreparedStatement = null

    try {
      upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)

      for (i <- 0 until bufferSize) {
        val row = buffer(i)
        upsertStatement.setInt(1, row.getAs[String]("key"))
        upsertStatement.setString(2, row.getAs[String]("value"))
        upsertStatement.addBatch()
      }

      upsertStatement.executeBatch()
      connection.commit()

      bufferSize = 0
    } catch { case e: Exception =>
      if (connection != null) {
        connection.rollback()
      }
      throw e
    } finally {
      if (upsertStatement != null) {
        upsertStatement.close()
      }
    }
  }

  override def open(partitionId: Long, epochId: Long): Boolean = {
    connection = DriverManager.getConnection(config.url, config.username, config.password)
    true
  }

  override def process(row: Row): Unit = {
    buffer(bufferSize) = row
    bufferSize += 1
    if (bufferSize >= MAX_BUFFER_SIZE) {
      flushBuffer()
    }
  }

  override def close(errorOrNull: Throwable): Unit = {
    flushBuffer()
    if (connection != null) {
      connection.close()
      connection = null
    }
  }
}


spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .writeStream
      .outputMode(OutputMode.Update())
      .trigger(defaultTrigger)
      .foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
      .start()

Display

Important

Tato funkce je dostupná v Databricks Runtime 17.1 a novějších verzích.

Zdroj rychlosti zobrazení

V tomto příkladu si přečtete ze zdroje rychlosti a zobrazíte datový rámec streamování v poznámkovém bloku.

Python
inputDF = (
  spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

val inputDF = spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())