Freigeben über


Echtzeitmodus im strukturierten Streaming

Important

Dieses Feature befindet sich in der Public Preview.

Diese Seite beschreibt den Echtzeitmodus, einen Triggertyp für strukturiertes Streaming, der die Verarbeitung von Daten mit besonders niedriger Latenz (End-to-End-Latenz nur 5 ms) ermöglicht. Dieser Modus wurde für betriebsbereite Workloads entwickelt, die sofortige Reaktion auf Streamingdaten erfordern.

Der Echtzeitmodus ist in Databricks Runtime 16.4 LTS und höher verfügbar.

Betriebsarbeitslasten

Streaming-Workloads können allgemein in analytische Workloads und betriebsbereite Workloads unterteilt werden:

  • Analytische Workloads verwenden Datenaufnahme und Transformation, in der Regel nach der Medallion-Architektur (z. B. Das Aufnehmen von Daten in die Bronze-, Silber- und Goldtabellen).
  • Betriebsarbeitslasten verbrauchen Echtzeitdaten, wenden Geschäftslogik an und lösen nachgeschaltete Aktionen oder Entscheidungen aus.

Einige Beispiele für betriebliche Arbeitslasten sind:

  • Blockieren oder Kennzeichnen einer Kreditkartentransaktion in Echtzeit, wenn eine Betrugsbewertung einen Schwellenwert überschreitet, basierend auf Faktoren wie ungewöhnlichem Standort, großer Transaktionsgröße oder schnellen Ausgabenmustern.
  • Die Übermittlung einer Werbenachricht erfolgt, wenn Clickstream-Daten zeigen, dass ein Benutzer seit fünf Minuten nach Jeans durchsucht, und es wird ein Rabatt von 25 % angeboten, wenn der Benutzer innerhalb der nächsten 15 Minuten einen Kauf tätigt.

Im Allgemeinen zeichnen sich operative Arbeitslasten durch die Notwendigkeit einer subsekundären End-to-End-Latenz aus. Dies kann mit dem Echtzeitmodus in Apache Spark Structured Streaming erreicht werden.

So erzielt der Echtzeitmodus eine niedrige Latenz

Der Echtzeitmodus verbessert die Ausführungsarchitektur durch:

  • Ausführen langer Batches (der Standardwert ist 5 Minuten), in denen Daten verarbeitet werden, sobald sie in der Quelle verfügbar sind.
  • Alle Phasen der Abfrage werden gleichzeitig geplant. Dies erfordert, dass die Anzahl der verfügbaren Vorgangsplätze gleich oder größer als die Anzahl der Vorgänge aller Phasen in einem Batch ist.
  • Die Daten werden zwischen den Phasen übergeben, sobald sie mit einem Streamingshuffle erzeugt wurden.

Am Ende der Verarbeitung eines Batches und vor dem Beginn des nächsten Batches erfasst das Strukturierte Streaming den Fortschritt an Checkpoints und stellt Metriken für den letzten Batch zur Verfügung. Wenn die Batches länger sind, sind diese Aktivitäten möglicherweise weniger häufig, was zu längeren Wiedergaben im Falle eines Fehlers und einer Verzögerung bei der Verfügbarkeit von Metriken führt. Wenn die Batches dagegen kleiner sind, werden diese Aktivitäten häufiger, was sich möglicherweise auf die Latenz auswirkt. Databricks empfiehlt, den Echtzeitmodus mit Ihrer Zielarbeitsauslastung und den Anforderungen zu vergleichen, um das entsprechende Triggerintervall zu finden.

Clusterkonfiguration

Um den Echtzeitmodus im strukturierten Streaming zu verwenden, müssen Sie einen klassischen Lakeflow-Auftrag konfigurieren:

  1. Klicken Sie in Ihrem Azure Databricks-Arbeitsbereich in der oberen linken Ecke auf "Neu ". Wählen Sie "Weitere " aus, und klicken Sie auf "Cluster".

  2. Photonbeschleunigung löschen.

  3. Deaktivieren Sie Autoskalierung aktivieren.

  4. Deaktivieren Sie unter Erweiterte Leistungsüberwachung die Option Spot-Instanzen verwenden.

  5. Klicken Sie im Modus "Erweitert " und "Zugriff" auf "Manuell ", und wählen Sie " Dediziert" (vormals: Einzelbenutzer) aus.

  6. Geben Sie unter Spark Folgendes unter Spark-Konfiguration ein:

    spark.databricks.streaming.realTimeMode.enabled true
    
  7. Klicken Sie auf "Erstellen".

Anforderungen an die Clustergröße

Sie können einen Echtzeitauftrag pro Cluster ausführen, wenn der Cluster über genügend Aufgabenplätze verfügt.

Um im Modus mit geringer Latenz ausgeführt zu werden, muss die Gesamtanzahl der verfügbaren Aufgabenplätze größer oder gleich der Anzahl der Aufgaben in allen Abfragephasen sein.

Slot-Berechnungsbeispiele

Einstufige zustandslose Pipeline (Kafka-Quelle und -Senke):

Wenn maxPartitions = 8, benötigen Sie mindestens 8 Steckplätze. Wenn "maxPartitions" nicht festgelegt ist, verwenden Sie die Anzahl der Kafka-Themenpartitionen.

Zweistufige zustandsbehaftete Pipeline (Kafka-Quelle und -Shuffle):

Wenn maxPartitions = 8 und shuffle partitions = 20, benötigen Sie 8 + 20 = 28 Steckplätze.

Dreistufige Pipeline (Kafka-Quelle + Shuffle + Neuverteilung)

Bei maxPartitions = 8 und zwei Shuffle-Stufen mit jeweils 20, benötigen Sie insgesamt 8 + 20 + 20 = 48 Steckplätze.

Wichtige Überlegungen

Berücksichtigen Sie beim Konfigurieren des Clusters Folgendes:

  • Im Gegensatz zum Mikrobatchmodus können Echtzeitaufgaben im Leerlauf bleiben, während sie auf Daten warten. Daher ist die richtige Größenanpassung unerlässlich, um verschwendete Ressourcen zu vermeiden.
  • Anstreben einer Zielauslastung (z. B. 50 %) durch Optimierung:
    • maxPartitions (für Kafka)
    • spark.sql.shuffle.partitions (für Shuffle-Phasen)
  • Databricks empfiehlt das Festlegen von maxPartitions, sodass jeder Vorgang mehrere Kafka-Partitionen verarbeitet, um den Aufwand zu reduzieren.
  • Passen Sie die Aufgabenslots pro Worker für einfache einstufige Aufträge an die Workload an.
  • Experimentieren Sie bei Shuffle-intensiven Aufträgen, um die minimale Anzahl an Shufflepartitionen zu ermitteln, mit der Rückstände vermieden werden können. Nehmen Sie entsprechende Anpassungen vor. Der Auftrag wird nicht eingeplant, wenn der Cluster nicht über genügend Slots verfügt.

Note

Von Databricks Runtime 16.4 LTS und höher verwenden alle Echtzeitpipelinen Prüfpunkt v2, was einen nahtlosen Wechsel zwischen Echtzeit- und Mikrobatchmodi ermöglicht.

Abfragekonfiguration

Sie müssen den Echtzeittrigger aktivieren, um anzugeben, dass eine Abfrage mit dem Modus mit geringer Latenz ausgeführt werden soll. Darüber hinaus werden Echtzeittrigger nur im Updatemodus unterstützt. Beispiel:

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # in PySpark, realTime trigger requires you to specify the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Observability

Früher war die End-to-End-Abfragelatenz eng mit der Batchdauer verknüpft, die Batchdauer war also ein guter Indikator für die Abfragelatenz. Diese Methode gilt jedoch nicht mehr im Echtzeitmodus und erfordert alternative Ansätze zur Messung der Latenz. End-to-End-Latenz ist arbeitslastspezifisch und kann manchmal nur mit Geschäftslogik genau gemessen werden. Wenn z. B. der Quellzeitstempel in Kafka ausgegeben wird, kann die Latenz als Differenz zwischen dem Ausgabezeitstempel von Kafka und dem Quellzeitstempel berechnet werden.

Sie können die End-to-End-Latenz auf verschiedene Weise basierend auf teilweisen Informationen schätzen, die während des Streamingprozesses gesammelt wurden.

Verwenden von StreamingQueryProgress

Die folgenden Metriken sind im StreamingQueryProgress Ereignis enthalten, das automatisch in den Treiberprotokollen protokolliert wird. Sie können auch über die StreamingQueryListener-onQueryProgress()-Rückruffunktion darauf zugreifen. QueryProgressEvent.json() oder toString() zusätzliche Echtzeitmodusmetriken enthalten.

  1. Verarbeitungslatenz (processingLatencyMs). Die zwischen dem Lesen eines Datensatzes durch eine Abfrage im Echtzeitmodus bis zum Schreiben in die nächste Phase oder die nachgelagerte Etappe verstrichene Zeit. Bei Einzelstufenabfragen misst dies die gleiche Dauer wie die E2E-Latenz. Diese Metrik wird pro Vorgang gemeldet.
  2. Quellwarteschlangenlatenz (sourceQueuingLatencyMs). Die Zeitspanne zwischen dem erfolgreichen Schreiben eines Datensatzes in einen Nachrichtenbus (z. B. Zeit zum Ergänzen des Protokolls in Kafka) und dem ersten Auslesen des Datensatzes durch eine Abfrage im Echtzeitmodus. Diese Metrik wird pro Vorgang gemeldet.
  3. E2E-Latenz (e2eLatencyMs). Die Zeit zwischen dem erfolgreichen Schreiben des Datensatzes in einen Nachrichtenbus und dem Zeitpunkt, zu dem der Datensatz von der Echtzeitmodus-Abfrage weitergeschrieben wird. Diese Metrik wird pro Batch für alle Datensätze aggregiert, die von allen Vorgängen verarbeitet werden.

Beispiel:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

Verwenden der Observe-API in Aufträgen

Die Observe-API hilft beim Messen der Latenz, ohne einen anderen Auftrag zu starten. Wenn ein Quellzeitstempel mit der ungefähren Ankunftszeit der Quelldaten vor dem Erreichen der Senke übergeben wird, oder wenn Sie einen Weg finden, den Zeitstempel zu übergeben, können Sie die Latenz jedes Batches mit der Observe-API schätzen.

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

In diesem Beispiel wird ein aktueller Zeitstempel vor der Ausgabe des Eintrags aufgezeichnet, und die Latenz wird geschätzt, indem die Differenz zwischen diesem Zeitstempel und dem Quellzeitstempel des Datensatzes berechnet wird. Die Ergebnisse werden in Fortschrittsberichten aufgenommen und den Zuhörern zur Verfügung gestellt. Hier ist eine Beispielausgabe:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Was wird unterstützt?

Environments

Clustertyp Supported
Dedizierter (früher: einzelner Benutzer) Yes
Standard (früher: geteilt) No
Lakeflow Spark Declarative Pipelines Classic No
Lakeflow Spark Deklarative Pipelines Serverlos No
Serverless No

Languages

Language Supported
Scala Yes
Java Yes
Python Yes

Ausführungsmodi

Ausführungsmodus Supported
Updatemodus Yes
Append mode No
Vollständiger Modus No

Sources

Sources Supported
Apache Kafka Yes
AWS MSK Yes
Eventhub (mit Kafka Connector) Yes
Kinesis Ja (nur EFO-Modus)
Google Pub/Sub (Nachrichtendienst) No
Apache Pulsar No

Sinks

Sinks Supported
Apache Kafka Yes
Eventhub (mit Kafka Connector) Yes
Kinesis No
Google Pub/Sub (Nachrichtendienst) No
Apache Pulsar No
Beliebige Senken (mit forEachWriter) Yes

Operators

Operators Supported
Zustandslose Vorgänge
  • Selection
Yes
  • Projection
Yes
UDFs
  • Skala UDF
Ja (mit einigen Einschränkungen)
  • Python-Benutzerdefinierte Funktion (UDF)
Ja (mit einigen Einschränkungen)
Aggregation
  • sum
Yes
  • count
Yes
  • max
Yes
  • min
Yes
  • avg
Yes
Aggregationsfunktionen Yes
Windowing
  • Tumbling
Yes
  • Sliding
Yes
  • Session
No
Deduplication
  • dropDuplicates
Ja (der Zustand ist ungebunden)
  • dropDuplicatesWithinWatermark
No
Stream – Tabellenverknüpfung
  • Übertragungstabelle (sollte klein sein)
Yes
Stream – Stream-Join No
(flach)MapGroupsWithState No
transformWithState Ja (mit einigen Unterschieden)
union Ja (mit einigen Einschränkungen)
forEach Yes
forEachBatch No
mapPartitions Nein (siehe Einschränkung)

Verwenden von transformWithState im Echtzeitmodus

Zum Erstellen von benutzerdefinierten zustandsbehafteten Anwendungen unterstützt Databricks transformWithState, eine API in Apache Spark Structured Streaming. Weitere Informationen zu API- und Codeausschnitten finden Sie unter Erstellen einer benutzerdefinierten zustandsbehafteten Anwendung .

Es gibt jedoch einige Unterschiede zwischen dem Verhalten der API im Echtzeitmodus und herkömmlichen Streamingabfragen, die die Mikrobatcharchitektur nutzen.

  • Die Methode im Echtzeitmodus handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) wird für jede Zeile aufgerufen.
    • Der inputRows Iterator gibt einen einzelnen Wert zurück. Im Mikrobatchmodus wird er einmal für jeden Schlüssel aufgerufen, und der inputRows Iterator gibt alle Werte für einen Schlüssel im Mikrobatch zurück.
    • Sie müssen diesen Unterschied erkennen, wenn Sie ihren Code schreiben.
  • Ereigniszeitgeber werden im Echtzeitmodus nicht unterstützt.
  • Im Echtzeitmodus wird das Auslösen von Timern je nach Ankunft der Daten verzögert. Andernfalls, wenn keine Daten vorhanden sind, wird es am Ende des langandauernden Batches ausgelöst. Wenn ein Timer beispielsweise um 10:00:00 Uhr feuern soll und gleichzeitig keine Daten eintreffen, wird er nicht ausgelöst. Wenn die Daten stattdessen um 10:00:10 eintreffen, wird der Timer mit einer Verzögerung von 10 Sekunden ausgelöst. Oder – falls keine Daten ankommen und der lang laufende Batchprozess beendet wird, wird der Timer gestartet, bevor der lang laufende Batch beendet wird.

Python Benutzerdefinierte Funktionen (UDFs)

Databricks unterstützt die meisten benutzerdefinierten Python-Funktionen (UDFs) im Echtzeitmodus:

UDF-Typ Supported
Zustandslose UDF
  • Python scalar UDF (Link)
Yes
  • Pfeilskaer-UDF
Yes
  • Pandas scalar UDF (Link)
Yes
  • Pfeilfunktion (mapInArrow)
Yes
  • Pandas-Funktion (Link)
Yes
Stateful Grouping UDF (UDAF)
  • transformWithState (HINWEIS: Nur Row Schnittstelle)
Yes
  • applyInPandasWithState
No
Nicht stateful Grouping UDF (UDAF)
  • apply
No
  • applyInArrow
No
  • applyInPandas
No
Tabellenfunktion
No
UC UDF No

Es gibt mehrere Punkte, die Sie bei der Verwendung von Python UDFs im Echtzeitmodus berücksichtigen sollten:

  • Um die Latenz zu minimieren, konfigurieren Sie die Pfeilbatchgröße (spark.sql.execution.arrow.maxRecordsPerBatch) auf 1.
    • Kompromiss: Diese Konfiguration optimiert die Latenz auf Kosten des Durchsatzes. Für die meisten Workloads wird diese Einstellung empfohlen.
    • Erhöhen Sie die Batchgröße nur, wenn ein höherer Durchsatz erforderlich ist, um das Eingabevolume aufzunehmen und die potenzielle Latenzsteigerung zu akzeptieren.
  • Pandas UDFs und Funktionen funktionieren nicht gut mit einer Pfeilbatchgröße von 1.
    • Wenn Sie Pandas UDFs oder Funktionen verwenden, legen Sie die Pfeilbatchgröße auf einen höheren Wert fest (z. B. 100 oder höher).
    • Beachten Sie, dass dies eine höhere Latenz bedeutet. Databricks empfiehlt nach Möglichkeit die Verwendung von Pfeil-UDF oder -Funktionen.
  • Aufgrund des Leistungsproblems mit Pandas wird transformWithState nur mit der Row Schnittstelle unterstützt.

Optimierungstechniken

Technique Standardmäßig aktiviert
Asynchrone Fortschrittsverfolgung: Verschiebt das Schreiben in das Offsetlog und Commit-Log in einen asynchronen Thread und reduziert damit die Zeit zwischen zwei Mikrobatches. Dies kann dazu beitragen, die Latenz von zustandslosen Streamingabfragen zu verringern. No
Asynchrone Zustandsprüfpunkterstellung: Hilft, die Latenz von zustandsbehafteten Streamingabfragen zu reduzieren, indem sie mit der Verarbeitung des nächsten Mikro-Batchs beginnt, sobald die Berechnung des vorherigen Mikro-Batchs abgeschlossen ist, ohne auf Zustandsprüfpunkte zu warten. No

Limitations

Quelleinschränkung

Für Kinesis wird der Abrufmodus nicht unterstützt. Darüber hinaus können sich häufige Neupartitionen negativ auf die Latenz auswirken.

Beschränkung der Union

Für Union gibt es einige Einschränkungen:

  • Selbstvereinigung wird nicht unterstützt:
    • Kafka: Sie können nicht dieses Quelldatenframeobjekt und davon abgeleitete Union-Datenframes verwenden. Problemumgehung: Verwenden Sie unterschiedliche Dataframes, die aus derselben Quelle gelesen werden.
    • Kinesis: Datenframes, die von derselben Kinesis-Quelle abgeleitet sind, können nicht mit derselben Konfiguration verbunden werden. Problemumgehung: Neben der Verwendung verschiedener Dataframes können Sie jedem DataFrame eine andere Option "consumerName" zuweisen.
  • Zustandsbehaftete Operatoren (z. B. aggregate, deduplicate, transformWithState), die vor der Union definiert sind, werden nicht unterstützt.
  • Union mit Batchquellen wird nicht unterstützt.

MapPartitions-Einschränkung

mapPartitions in Scala und ähnlichen Python-APIs (mapInPandas, mapInArrow) verarbeiten einen Iterator der gesamten Eingabepartition und generieren einen Iterator der gesamten Ausgabe mit beliebiger Zuordnung zwischen Eingabe und Ausgabe. Diese APIs können Leistungsprobleme im Streaming-Real-Time Modus verursachen, indem die gesamte Ausgabe blockiert wird, wodurch die Latenz erhöht wird. Die Semantik dieser APIs unterstützt die Wasserzeichenpropagierung nicht gut.

Verwenden Sie skalare UDFs in Kombination mit transformieren komplexer Datentypen oder filter, um ähnliche Funktionalität zu erzielen.

Examples

Die folgenden Beispiele zeigen Abfragen, die unterstützt werden.

Zustandslose Abfragen

Alle einzel- oder mehrstufigen zustandslosen Abfragen werden unterstützt.

Kafka Quelle zu Kafka Senke

In diesem Beispiel lesen Sie aus einer Kafka-Quelle und schreiben in eine Kafka-Senke.

Python
query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .trigger(realTime="5 minutes")
        .outputMode("update")
        .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Neuzuweisung

In diesem Beispiel lesen Sie aus einer Kafka-Quelle, repartitionieren die Daten in zwanzig Partitionen und schreiben in eine Kafka-Senke.

Legen Sie die Spark-Konfiguration spark.sql.execution.sortBeforeRepartition fest false bevor Sie die Neuaufteilung verwenden.

Python
# Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("subscribe", input_topic)
    .option("startingOffsets", "earliest")
    .load()
    .repartition(20)
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .repartition(20)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Stream-snapshot join (nur Broadcast)

In diesem Beispiel lesen Sie aus Kafka, verknüpfen die Daten mit einer statischen Tabelle und schreiben in einen Kafka-Sink. Beachten Sie, dass nur streamstatische Verknüpfungen, die die statische Tabelle übertragen, unterstützt werden, was bedeutet, dass die statische Tabelle in den Arbeitsspeicher passen sollte.

Python
from pyspark.sql.functions import broadcast, expr

# We assume the static table in the path `stateic_table_location` has a column 'lookupKey'.

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("subscribe", input_topic)
    .option("startingOffsets", "earliest")
    .load()
    .withColumn("joinKey", expr("CAST(value AS STRING)"))
    .join(
        broadcast(spark.read.format("parquet").load(static_table_location)),
        expr("joinKey = lookupKey")
    )
    .selectExpr("value AS key", "value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Kinesis-Quelle an Kafka-Senke

In diesem Beispiel lesen Sie aus einer Kinesis-Quelle und schreiben in eine Kafka-Senke.

Python
query = (
    spark.readStream
        .format("kinesis")
        .option("region", region_name)
        .option("awsAccessKey", aws_access_key_id)
        .option("awsSecretKey", aws_secret_access_key)
        .option("consumerMode", "efo")
        .option("consumerName", consumer_name)
        .load()
        .selectExpr("parttitionKey AS key", "CAST(data AS STRING) AS value")
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .trigger(realTime="5 minutes")
        .outputMode("update")
        .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kinesis")
      .option("region", regionName)
      .option("awsAccessKey", awsAccessKeyId)
      .option("awsSecretKey", awsSecretAccessKey)
      .option("consumerMode", "efo")
      .option("consumerName", consumerName)
      .load()
      .select(
        col("partitionKey").alias("key"),
        col("data").cast("string").alias("value")
      )
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Union

In diesem Beispiel verknüpfen Sie zwei Kafka-Datenframes aus zwei verschiedenen Themen und schreiben in eine Kafka-Senke.

Python
df1 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_1)
    .load()
)

df2 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_2)
    .load()
)

query = (
    df1.union(df2)
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic1)
      .load()

val df2 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic2)
      .load()

df1.union(df2)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Zustandsbehaftete Abfragen

Deduplication

Python
query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic)
    .load()
    .dropDuplicates(["timestamp", "value"])
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .dropDuplicates("timestamp", "value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Aggregation

Python
from pyspark.sql.functions import col

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic)
    .load()
    .groupBy(col("timestamp"), col("value"))
    .count()
    .selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .groupBy(col("timestamp"), col("value"))
      .count()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply("5 minutes"))
      .outputMode(OutputMode.Update())
      .start()

Union mit Aggregation

In diesem Beispiel haben Sie zunächst zwei Kafka DataFrames aus zwei verschiedenen Themen zusammengefügt und dann eine Aggregation ausgeführt. Letztlich schreiben Sie in die Kafka-Senke.

Python
from pyspark.sql.functions import col

df1 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_1)
    .load()
)

df2 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_2)
    .load()
)

query = (
    df1.union(df2)
    .groupBy(col("timestamp"), col("value"))
    .count()
    .selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic1)
      .load()

val df2 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic2)
      .load()

df1.union(df2)
      .groupBy(col("timestamp"), col("value"))
      .count()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

TransformWithState

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import org.apache.spark.sql.streaming.{ListState, MapState, StatefulProcessor, OutputMode, TTLConfig, TimeMode, TimerValues, ValueState}

/**
 * This processor counts the number of records it has seen for each key using state variables
 * with TTLs. It redundantly maintains this count with a value, list, and map state to put load
 * on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
 * the count for a given grouping key.)
 *
 * The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
 * The source-timestamp is passed through so that we can calculate end-to-end latency. The output
 * schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
 *
 */

class RTMStatefulProcessor(ttlConfig: TTLConfig)
  extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
  @transient private var _value: ValueState[Long] = _
  @transient private var _map: MapState[Long, String] = _
  @transient private var _list: ListState[String] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    // Counts the number of records this key has seen
    _value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
    _map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
    _list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
  }

  override def handleInputRows(
      key: String,
      inputRows: Iterator[(String, Long)],
      timerValues: TimerValues): Iterator[(String, Long, Long)] = {
    inputRows.map { row =>
      val key = row._1
      val sourceTimestamp = row._2

      val oldValue = _value.get()
      _value.update(oldValue + 1)
      _map.updateValue(oldValue, key)
      _list.appendValue(key)

      (key, oldValue + 1, sourceTimestamp)
    }
  }
}

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
      .as[(String, String, Timestamp)]
      .groupByKey(row => row._1)
      .transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
      .as[(String, Long, Long)]
      .select(
            col("_1").as("key"),
            col("_2").as("value")
      )
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply("5 minutes"))
      .outputMode(OutputMode.Update())
      .start()

Note

Es gibt einen Unterschied zwischen dem Echtzeitmodus und anderen Ausführungsmodi im strukturierten Streaming in Bezug auf die Ausführung von StatefulProcessor in transformWithState. Siehe Verwenden von transformWithState im Echtzeitmodus

TransformWithState (PySpark, Row-Schnittstelle)

from typing import Iterator, Tuple

from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType


class RTMStatefulProcessor(StatefulProcessor):
  """
  This processor counts the number of records it has seen for each key using state variables
  with TTLs. It redundantly maintains this count with a value, list, and map state to put load
  on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
  the count for a given grouping key.)

  The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
  The source-timestamp is passed through so that we can calculate end-to-end latency. The output
  schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
  """

  def init(self, handle: StatefulProcessorHandle) -> None:
    state_schema = StructType([StructField("value", LongType(), True)])
    self.value_state = handle.getValueState("value", state_schema, 30000)
    map_key_schema = StructType([StructField("key", LongType(), True)])
    map_value_schema = StructType([StructField("value", StringType(), True)])
    self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
    list_schema = StructType([StructField("value", StringType(), True)])
    self.list_state = handle.getListState("list", list_schema, 30000)

  def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
    for row in rows:
      # row is a tuple (key, source_timestamp)
      key_str = row[0]
      source_timestamp = row[1]
      old_value = value.get()
      if old_value is None:
        old_value = 0
      self.value_state.update((old_value + 1,))
      self.map_state.update((old_value,), (key_str,))
      self.list_state.appendValue((key_str,))
      yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)

  def close(self) -> None:
    pass


output_schema = StructType(
  [
    StructField("key", StringType(), True),
    StructField("value", LongType(), True),
    StructField("timestamp", TimestampType(), True),
  ]
)

query = (
  spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker_address)
  .option("subscribe", input_topic)
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
  .groupBy("key")
  .transformWithState(
    statefulProcessor=RTMStatefulProcessor(),
    outputStructType=output_schema,
    outputMode="Update",
    timeMode="processingTime",
  )
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker_address)
  .option("topic", output_topic)
  .option("checkpointLocation", checkpoint_location)
  .trigger(realTime="5 minutes")
  .outputMode("Update")
  .start()
)

Note

Es gibt einen Unterschied zwischen der Ausführung des Echtzeitmodus und anderen Ausführungsmodi im strukturierten Streaming.StatefulProcessortransformWithState Siehe Verwenden von transformWithState im Echtzeitmodus

Sinks

In Postgres schreiben über foreachSink

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.{ForeachWriter, Row}

/**
 * Groups connection properties for
 * the JDBC writers.
 *
 * @param url JDBC url of the form jdbc:subprotocol:subname to connect to
 * @param dbtable database table that should be written into
 * @param username username for authentication
 * @param password password for authentication
 */
class JdbcWriterConfig(
    val url: String,
    val dbtable: String,
    val username: String,
    val password: String,
) extends Serializable

/**
 * Handles streaming data writes to a database sink via JDBC, by:
 *   - connecting to the database
 *   - buffering incoming data rows in batches to reduce write overhead
 *
 * @param config connection parameters and configuration knobs for the writer
 */
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
  extends ForeachWriter[Row] with Serializable {
  // The writer currently only supports this hard-coded schema
  private val UPSERT_STATEMENT_SQL =
    s"""MERGE INTO "${config.dbtable}"
       |USING (
       |  SELECT
       |    CAST(? AS INTEGER) AS "id",
       |    CAST(? AS CHARACTER VARYING) AS "data"
       |) AS "source"
       |ON "test"."id" = "source"."id"
       |WHEN MATCHED THEN
       |  UPDATE SET "data" = "source"."data"
       |WHEN NOT MATCHED THEN
       |  INSERT ("id", "data") VALUES ("source"."id", "source"."data")
       |""".stripMargin

  private val MAX_BUFFER_SIZE = 3
  private val buffer = new Array[Row](MAX_BUFFER_SIZE)
  private var bufferSize = 0

  private var connection: Connection = _

  /**
   * Flushes the [[buffer]] by writing all rows in the buffer to the database.
   */
  private def flushBuffer(): Unit = {
    require(connection != null)

    if (bufferSize == 0) {
      return
    }

    var upsertStatement: PreparedStatement = null

    try {
      upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)

      for (i <- 0 until bufferSize) {
        val row = buffer(i)
        upsertStatement.setInt(1, row.getAs[String]("key"))
        upsertStatement.setString(2, row.getAs[String]("value"))
        upsertStatement.addBatch()
      }

      upsertStatement.executeBatch()
      connection.commit()

      bufferSize = 0
    } catch { case e: Exception =>
      if (connection != null) {
        connection.rollback()
      }
      throw e
    } finally {
      if (upsertStatement != null) {
        upsertStatement.close()
      }
    }
  }

  override def open(partitionId: Long, epochId: Long): Boolean = {
    connection = DriverManager.getConnection(config.url, config.username, config.password)
    true
  }

  override def process(row: Row): Unit = {
    buffer(bufferSize) = row
    bufferSize += 1
    if (bufferSize >= MAX_BUFFER_SIZE) {
      flushBuffer()
    }
  }

  override def close(errorOrNull: Throwable): Unit = {
    flushBuffer()
    if (connection != null) {
      connection.close()
      connection = null
    }
  }
}


spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .writeStream
      .outputMode(OutputMode.Update())
      .trigger(defaultTrigger)
      .foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
      .start()

Display

Important

Dieses Feature ist in Databricks Runtime 17.1 und höher verfügbar.

Anzeigeratequelle

In diesem Beispiel lesen Sie aus einer Ratenquelle und zeigen den Streaming-DataFrame in einem Notebook an.

Python
inputDF = (
  spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

val inputDF = spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())