Delen via


Realtimemodus in gestructureerd streamen

Important

Deze functie bevindt zich in openbare preview-versie.

Op deze pagina wordt de realtimemodus beschreven, een triggertype voor Structured Streaming waarmee gegevensverwerking met ultra lage latentie met end-to-end latentie tot 5 ms mogelijk is. Deze modus is ontworpen voor operationele workloads die directe reactie op streaminggegevens vereisen.

De realtimemodus is beschikbaar in Databricks Runtime 16.4 LTS en hoger.

Operationele workloads

Streaming-workloads kunnen breed worden onderverdeeld in analytische workloads en operationele workloads.

  • Analytische workloads maken gebruik van gegevensinname en transformatie, meestal volgens de medaillonarchitectuur (bijvoorbeeld het opnemen van gegevens in de bronzen, zilveren en gouden tabellen).
  • Operationele workloads verbruiken realtimegegevens, passen bedrijfslogica toe en activeren downstreamacties of beslissingen.

Enkele voorbeelden van operationele workloads zijn:

  • Het blokkeren of markeren van een creditcardtransactie in realtime als een fraudescore een drempelwaarde overschrijdt, op basis van factoren zoals ongebruikelijke locatie, grote transactiegrootte of snelle uitgavenpatronen.
  • Het versturen van een promotiebericht wanneer clickstream-gegevens laten zien dat een gebruiker al vijf minuten jeans aan het bekijken is, waarbij een korting van 25% wordt aangeboden als ze binnen de komende 15 minuten kopen.

Over het algemeen worden de operationele workloads gekenmerkt door de noodzaak van een end-to-end latentie van onder de seconde. Dit kan worden bereikt met de realtimemodus in Apache Spark Structured Streaming.

Hoe de realtime-modus lage latentie bereikt

De realtimemodus verbetert de uitvoeringsarchitectuur door:

  • Het uitvoeren van langlopende batches (de standaardwaarde is vijf minuten), waarin gegevens worden verwerkt zodra deze beschikbaar zijn in de bron.
  • Alle fasen van de query worden gelijktijdig gepland. Hiervoor moet het aantal beschikbare taaksites gelijk zijn aan of groter zijn dan het aantal taken van alle fasen in een batch.
  • De gegevens worden tussen fasen doorgegeven zodra ze zijn geproduceerd met behulp van een streaming shuffle.

Aan het einde van de verwerking van een batch en voordat de volgende batch wordt gestart, worden controlepunten voor gestructureerd streamen uitgevoerd en worden metrische gegevens voor de laatste batch beschikbaar gemaakt. Als de batches langer zijn, zijn deze activiteiten mogelijk minder frequent, wat leidt tot langere herhalingen in het geval van fouten en vertraging in de beschikbaarheid van metrische gegevens. Als de batches echter kleiner zijn, worden deze activiteiten frequenter, wat mogelijk van invloed is op de latentie. Databricks raadt u aan de realtime-modus te benchmarken op basis van uw doelworkload en vereisten om het juiste triggerinterval te vinden.

Clusterconfiguratie

Als u de realtime-modus in Structured Streaming wilt gebruiken, moet u een klassieke Lakeflow-taak configureren:

  1. Klik in uw Azure Databricks-werkruimte op Nieuw in de linkerbovenhoek. Kies Meer en klik op Cluster.

  2. Fotonversnelling wissen.

  3. Schakel Automatisch schalen inschakelen uit.

  4. Wis onder Geavanceerde prestaties de optie Spot-exemplaren gebruiken.

  5. Klik onder De modus Geavanceerd en Toegang op Handmatig en selecteer Dedicated (voorheen: Eén gebruiker).

  6. Voer onder Spark het volgende in onder Spark-configuratie:

    spark.databricks.streaming.realTimeMode.enabled true
    
  7. Klik op Create.

Vereisten voor clustergrootte

U kunt één realtime taak per cluster uitvoeren als het cluster voldoende taaksites heeft.

Als u in lage-latentiemodus wilt werken, moet het totale aantal beschikbare taakslots groter zijn dan of gelijk zijn aan het aantal taken over alle querystadia heen.

Voorbeelden van sleufberekeningen

Pijplijn zonder status met één fase (Kafka-bron + uitvoer):

Als maxPartitions = 8, hebt u ten minste 8 sleuven nodig. Als maxPartitions niet is ingesteld, gebruikt u het aantal Kafka-onderwerppartities.

Stateful pijplijn met twee fasen (Kafka-bron en shuffle):

Als maxPartitions = 8 en shuffle-partities = 20, hebt u 8 + 20 = 28 slots nodig.

Pijplijn met drie fasen (Kafka-bron + shuffle + repartition):

Met maxPartitions = 8 en twee shuffle-fasen van elk 20, heeft u 8 + 20 + 20 = 48 sleuven nodig.

Belangrijke overwegingen

Houd rekening met het volgende wanneer u uw cluster configureert:

  • In tegenstelling tot de microbatchmodus kunnen realtime taken inactief blijven tijdens het wachten op gegevens, zodat de juiste grootte essentieel is om verspilde resources te voorkomen.
  • Streef naar een gebruiksniveau van bijvoorbeeld 50% door het afstemmen van de volgende aspecten:
    • maxPartitions (voor Kafka)
    • spark.sql.shuffle.partitions (voor shuffle-fasen)
  • Databricks raadt aan maxPartitions in te stellen, zodat elke taak meerdere Kafka-partities verwerkt om de overhead te verminderen.
  • Pas taakslots per werknemer aan zodat deze overeenkomen met de werkbelasting voor eenvoudige taken met één fase.
  • Experimenteer voor shuffle-intensieve taken om het minimale aantal shuffle-partities te vinden dat achterstanden voorkomt, en pas van daaruit aan. De taak zal niet ingepland worden als het cluster niet genoeg slots heeft.

Note

Vanuit Databricks Runtime 16.4 LTS en hoger maken alle realtime-pijplijnen gebruik van controlepunt v2, waardoor naadloze schakelen tussen realtime- en microbatchmodi mogelijk is.

Queryconfiguratie

U moet de realtime-trigger inschakelen om op te geven dat een query moet worden uitgevoerd met behulp van de modus met lage latentie. Bovendien worden realtime-triggers alleen ondersteund in de updatemodus. Voorbeeld:

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # in PySpark, realTime trigger requires you to specify the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Observability

Voorheen was de end-to-end-querylatentie nauw gekoppeld aan de batchduur, waardoor batchduur een goede indicator van de querylatentie was. Deze methode is echter niet langer van toepassing in de realtimemodus, waarvoor alternatieve methoden nodig zijn voor het meten van latentie. End-to-end latentie is de workloadspecifiek en kan soms alleen nauwkeurig worden gemeten met bedrijfslogica. Als de brontijdstempel bijvoorbeeld wordt uitgevoerd in Kafka, kan de latentie worden berekend als het verschil tussen de tijdstempel van kafka en de tijdstempel van de bron.

U kunt de end-to-end-latentie op verschillende manieren schatten op basis van gedeeltelijke informatie die tijdens het streamingproces is verzameld.

StreamingQueryProgress gebruiken

De volgende metrische gegevens worden opgenomen in de StreamingQueryProgress gebeurtenis, die automatisch wordt geregistreerd in de stuurprogrammalogboeken. U kunt ze ook openen via de callback-functie van de StreamingQueryListeneronQueryProgress() functie. QueryProgressEvent.json() ofwel toString() extra metrische gegevens voor de realtimemodus opnemen.

  1. Verwerkingslatentie (processingLatencyMs). De tijd die is verstreken tussen het moment waarop de realtime-modusquery een record leest en voordat deze naar de volgende fase of downstream wordt geschreven. Voor query's met één fase meet dit dezelfde duur als de E2E-latentie. Deze metrische waarde wordt per taak gerapporteerd.
  2. Wachtrijlatentie van de bron (sourceQueuingLatencyMs). De tijd die verstrijkt tussen het moment dat een record succesvol is geschreven naar een berichtenbus, bijvoorbeeld de toevoegingstijd van het logbestand in Kafka, en het moment dat het record voor het eerst wordt gelezen door een query in de realtimemodus. Deze metrische waarde wordt per taak gerapporteerd.
  3. E2E Latentie (e2eLatencyMs). De tijd tussen het moment waarop het record succesvol is geschreven naar een berichtenbus en wanneer het record downstream wordt geschreven door de realtime-modusquery. Deze metrische waarde wordt geaggregeerd per batch voor alle records die door alle taken worden verwerkt.

Voorbeeld:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

Observe-API gebruiken bij jobs

Met de Observe-API kunt u latentie meten zonder een andere taak te starten. Als u een brontijdstempel hebt die de aankomsttijd van de brongegevens bijwerkt en deze wordt doorgegeven voordat u de sink bereikt, of als u een manier kunt vinden om de tijdstempel door te geven, kunt u de latentie van elke batch schatten met behulp van de Observe-API:

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

In dit voorbeeld wordt een huidige tijdstempel vastgelegd voordat de invoer wordt uitgevoerd. Latentie wordt geschat door het verschil tussen deze tijdstempel en de brontijdstempel van de record te berekenen. De resultaten worden opgenomen in voortgangsrapporten en beschikbaar gesteld aan listeners. Hier volgt een voorbeelduitvoer:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Wat wordt ondersteund?

Environments

Clustertype Supported
Dedicated (voorheen: één gebruiker) Yes
Standard (voorheen: gedeeld) No
Klassieke declaratieve Pijplijnen van Lakeflow Spark No
Lakeflow Spark-declaratieve pijplijnen serverloos No
Serverless No

Languages

Language Supported
Scala Yes
Java Yes
Python Yes

Uitvoeringsmodi

Uitvoeringsmodus Supported
Updatemodus Yes
Append mode No
Volledige modus No

Sources

Sources Supported
Apache Kafka Yes
AWS MSK Yes
Eventhub (met Kafka Connector) Yes
Kinesis Ja (enkel EFO-modus)
Google Pub/Sub (een berichten- en gebeurtenissenservice van Google) No
Apache Pulsar No

Sinks

Sinks Supported
Apache Kafka Yes
Eventhub (met Kafka Connector) Yes
Kinesis No
Google Pub/Sub (een berichten- en gebeurtenissenservice van Google) No
Apache Pulsar No
Willekeurige sinks (met forEachWriter) Yes

Operators

Operators Supported
Staatloze bewerkingen
  • Selection
Yes
  • Projection
Yes
UDFs
  • Scala UDF
Ja (met enkele beperkingen)
  • Python UDF
Ja (met enkele beperkingen)
Aggregation
  • sum
Yes
  • count
Yes
  • max
Yes
  • min
Yes
  • avg
Yes
Aggregatiesfuncties Yes
Windowing
  • Tumbling
Yes
  • Sliding
Yes
  • Session
No
Deduplication
  • dropDuplicates
Ja (de toestand is onbegrensd)
  • dropDuplicatesWithinWatermark
No
Stream - Tabel Koppeling
  • Broadcast-tabel (moet klein zijn)
Yes
Stream - Stream Toevoegen No
(plat)MapGroupsWithState No
transformWithState Ja (met enkele verschillen)
union Ja (met enkele beperkingen)
forEach Yes
forEachBatch No
mapPartitions Nee (zie beperking)

TransformWithState gebruiken in realtimemodus

Voor het bouwen van aangepaste stateful toepassingen ondersteunt Databricks transformWithState, een API in Apache Spark Structured Streaming. Zie Een aangepaste stateful toepassing bouwen voor meer informatie over de API en codefragmenten.

Er zijn echter enkele verschillen tussen hoe de API zich gedraagt in realtime-modus en traditionele streamingquery's die gebruikmaken van de microbatcharchitectuur.

  • De methode in realtimemodus handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) wordt aangeroepen voor elke rij.
    • De inputRows iterator retourneert één waarde. In de microbatchmodus wordt deze eenmaal aangeroepen voor elke sleutel en retourneert de inputRows iterator alle waarden voor een sleutel in de microbatch.
    • U moet zich bewust zijn van dit verschil bij het schrijven van uw code.
  • Timers voor gebeurtenistijd worden niet ondersteund in realtimemodus.
  • In real-time modus worden timers vertraagd bij het afvuren, afhankelijk van de gegevensaanvoer. Als er geen gegevens zijn, wordt deze aan het einde van de langlopende batch geactiveerd. Als een timer bijvoorbeeld om 10:00:00 uur afgaat en er op hetzelfde moment geen gegevens binnenkomen, gaat deze niet af. Als gegevens om 10:00:10 binnenkomen, wordt de timer geactiveerd met een vertraging van 10 seconden. Als er geen gegevens binnenkomen en de langlopende batch wordt beëindigd, wordt de timer gestart voordat de langlopende batch wordt beëindigd.

Python UDF's

Databricks ondersteunt het merendeel van door de gebruiker gedefinieerde Python-functies (UDF's) in realtime:

UDF-type Supported
Staatloze UDF
Yes
  • Pijl scalaire UDF
Yes
Yes
  • Pijlfunctie (mapInArrow)
Yes
Yes
Stateful Grouping UDF (UDAF)
  • transformWithState (OPMERKING: alleen Row interface)
Yes
  • applyInPandasWithState
No
Niet-stateful groeperingS-UDF (UDAF)
  • apply
No
  • applyInArrow
No
  • applyInPandas
No
Tabelfunctie
No
UC UDF No

Er zijn verschillende punten om rekening mee te houden bij het gebruik van Python UDF's in realtime-modus:

  • Als u de latentie wilt minimaliseren, configureert u de grootte van de pijlbatch (spark.sql.execution.arrow.maxRecordsPerBatch) op 1.
    • Trade-off: Deze configuratie optimaliseert voor latentie ten koste van doorvoer. Voor de meeste workloads wordt deze instelling aanbevolen.
    • Verhoog de batchgrootte alleen als een hogere doorvoer is vereist voor invoervolume, waarbij de potentiële toename van de latentie wordt geaccepteerd.
  • Pandas UDF's en functies presteren niet goed met een pijlbatchgrootte van 1.
    • Als u pandas UDF's of functies gebruikt, stelt u de grootte van de pijlbatch in op een hogere waarde (bijvoorbeeld 100 of hoger).
    • Houd er rekening mee dat dit een hogere latentie impliceert. Databricks raadt het gebruik van Arrow UDF of functie aan, indien mogelijk.
  • Vanwege het prestatieprobleem met pandas wordt transformWithState alleen ondersteund met de Row interface.

Optimalisatietechnieken

Technique Standaard ingeschakeld
Asynchrone voortgangstracering: Schrijven naar het offsetlogboek en het commitlogboek wordt naar een asynchrone thread verplaatst, waardoor de onderlinge tijd tussen twee microbatches wordt verminderd. Dit kan helpen de latentie van staatloze streamingquery's te verminderen. No
Asynchrone statuscontrolepunten: helpt de latentie van stateful streamingquery's te verminderen door de volgende microbatch te verwerken zodra de berekening van de vorige microbatch is voltooid, zonder te wachten op statuscontrolepunten. No

Limitations

Bronbeperking

Voor Kinesis wordt de pollingmodus niet ondersteund. Bovendien kunnen frequente repartities een negatieve invloed hebben op de latentie.

Beperking van unie

Voor Union gelden enkele beperkingen:

  • Self-union wordt niet ondersteund:
    • Kafka: U kunt niet hetzelfde brongegevensframeobject gebruiken en tegelijkertijd de daaruit afgeleide gegevensframes samenvoegen. Tijdelijke oplossing: gebruik verschillende Dataframes die uit dezelfde bron worden gelezen.
    • Kinesis: U kunt geen gegevensframes samenvoegen die zijn afgeleid van dezelfde Kinesis-bron met dezelfde configuratie. Tijdelijke oplossing: Naast het gebruik van verschillende Dataframes kunt u een andere optie 'consumerName' toewijzen aan elk DataFrame.
  • Stateful operators (bijvoorbeeld aggregate, deduplicate, transformWithState) die zijn gedefinieerd vóór de Union worden niet ondersteund.
  • Samenvoegen met batchbronnen wordt niet ondersteund.

Beperking van MapPartitions

mapPartitions in Scala en vergelijkbare Python-API's (mapInPandas, mapInArrow) nemen een iterator van de gehele invoerpartitie en produceren een iterator voor de gehele uitvoer met arbitraire koppeling tussen invoer en uitvoer. Deze API's kunnen prestatieproblemen veroorzaken in de streamingmodus Real-Time door de volledige uitvoer te blokkeren, waardoor de latentie toeneemt. De semantiek van deze API's biedt geen ondersteuning voor het doorgeven van watermerken.

Gebruik scalaire UDF's in combinatie met complexe gegevenstypen transformeren of filter in plaats daarvan om vergelijkbare functionaliteit te bereiken.

Examples

In de onderstaande voorbeelden ziet u query's die worden ondersteund.

Stateless queries

Alle stateless query's met één of meerdere fasen worden ondersteund.

Kafka-bron naar Kafka-sink

In dit voorbeeld leest u uit een Kafka-bron en schrijft u naar een Kafka-sink.

Python
query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .trigger(realTime="5 minutes")
        .outputMode("update")
        .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Verdeling

In dit voorbeeld leest u uit een Kafka-bron, herpartitioneert u de gegevens in 20 partities en schrijft u naar een Kafka-sink.

Stel de Spark-configuratie spark.sql.execution.sortBeforeRepartition in op false voordat u de repartitie gebruikt.

Python
# Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("subscribe", input_topic)
    .option("startingOffsets", "earliest")
    .load()
    .repartition(20)
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .repartition(20)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Stream-snapshot join (alleen uitzenden)

In dit voorbeeld leest u vanuit Kafka, voegt u de gegevens samen met een statische tabel en schrijft u naar een Kafka-sink. Houd er rekening mee dat alleen stream-statische joins die de statische tabel uitzenden, worden ondersteund. Dit betekent dat de statische tabel in het geheugen moet passen.

Python
from pyspark.sql.functions import broadcast, expr

# We assume the static table in the path `stateic_table_location` has a column 'lookupKey'.

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("subscribe", input_topic)
    .option("startingOffsets", "earliest")
    .load()
    .withColumn("joinKey", expr("CAST(value AS STRING)"))
    .join(
        broadcast(spark.read.format("parquet").load(static_table_location)),
        expr("joinKey = lookupKey")
    )
    .selectExpr("value AS key", "value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Kinesis-bron naar Kafka-sink

In dit voorbeeld leest u uit een Kinesis-bron en schrijft u naar een Kafka-sink.

Python
query = (
    spark.readStream
        .format("kinesis")
        .option("region", region_name)
        .option("awsAccessKey", aws_access_key_id)
        .option("awsSecretKey", aws_secret_access_key)
        .option("consumerMode", "efo")
        .option("consumerName", consumer_name)
        .load()
        .selectExpr("parttitionKey AS key", "CAST(data AS STRING) AS value")
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .trigger(realTime="5 minutes")
        .outputMode("update")
        .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kinesis")
      .option("region", regionName)
      .option("awsAccessKey", awsAccessKeyId)
      .option("awsSecretKey", awsSecretAccessKey)
      .option("consumerMode", "efo")
      .option("consumerName", consumerName)
      .load()
      .select(
        col("partitionKey").alias("key"),
        col("data").cast("string").alias("value")
      )
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Union

In dit voorbeeld koppelt u twee Kafka DataFrames uit twee verschillende onderwerpen en schrijft u naar een Kafka-sink.

Python
df1 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_1)
    .load()
)

df2 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_2)
    .load()
)

query = (
    df1.union(df2)
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic1)
      .load()

val df2 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic2)
      .load()

df1.union(df2)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Stateful query's

Deduplication

Python
query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic)
    .load()
    .dropDuplicates(["timestamp", "value"])
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .dropDuplicates("timestamp", "value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Aggregation

Python
from pyspark.sql.functions import col

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic)
    .load()
    .groupBy(col("timestamp"), col("value"))
    .count()
    .selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .groupBy(col("timestamp"), col("value"))
      .count()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply("5 minutes"))
      .outputMode(OutputMode.Update())
      .start()

Combinatie met aggregatie

In dit voorbeeld koppelt u eerst twee Kafka DataFrames uit twee verschillende onderwerpen en voert u vervolgens een aggregatie uit. Uiteindelijk schrijft u naar de Kafka-sink.

Python
from pyspark.sql.functions import col

df1 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_1)
    .load()
)

df2 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_2)
    .load()
)

query = (
    df1.union(df2)
    .groupBy(col("timestamp"), col("value"))
    .count()
    .selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic1)
      .load()

val df2 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic2)
      .load()

df1.union(df2)
      .groupBy(col("timestamp"), col("value"))
      .count()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

TransformWithState

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import org.apache.spark.sql.streaming.{ListState, MapState, StatefulProcessor, OutputMode, TTLConfig, TimeMode, TimerValues, ValueState}

/**
 * This processor counts the number of records it has seen for each key using state variables
 * with TTLs. It redundantly maintains this count with a value, list, and map state to put load
 * on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
 * the count for a given grouping key.)
 *
 * The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
 * The source-timestamp is passed through so that we can calculate end-to-end latency. The output
 * schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
 *
 */

class RTMStatefulProcessor(ttlConfig: TTLConfig)
  extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
  @transient private var _value: ValueState[Long] = _
  @transient private var _map: MapState[Long, String] = _
  @transient private var _list: ListState[String] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    // Counts the number of records this key has seen
    _value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
    _map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
    _list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
  }

  override def handleInputRows(
      key: String,
      inputRows: Iterator[(String, Long)],
      timerValues: TimerValues): Iterator[(String, Long, Long)] = {
    inputRows.map { row =>
      val key = row._1
      val sourceTimestamp = row._2

      val oldValue = _value.get()
      _value.update(oldValue + 1)
      _map.updateValue(oldValue, key)
      _list.appendValue(key)

      (key, oldValue + 1, sourceTimestamp)
    }
  }
}

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
      .as[(String, String, Timestamp)]
      .groupByKey(row => row._1)
      .transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
      .as[(String, Long, Long)]
      .select(
            col("_1").as("key"),
            col("_2").as("value")
      )
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply("5 minutes"))
      .outputMode(OutputMode.Update())
      .start()

Note

Er is een verschil tussen hoe de realtime modus en andere uitvoeringsmodi in Structured Streaming de StatefulProcessor uitvoeren in transformWithState. Zie TransformWithState gebruiken in realtime-modus

TransformWithState (PySpark, rijinterface)

from typing import Iterator, Tuple

from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType


class RTMStatefulProcessor(StatefulProcessor):
  """
  This processor counts the number of records it has seen for each key using state variables
  with TTLs. It redundantly maintains this count with a value, list, and map state to put load
  on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
  the count for a given grouping key.)

  The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
  The source-timestamp is passed through so that we can calculate end-to-end latency. The output
  schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
  """

  def init(self, handle: StatefulProcessorHandle) -> None:
    state_schema = StructType([StructField("value", LongType(), True)])
    self.value_state = handle.getValueState("value", state_schema, 30000)
    map_key_schema = StructType([StructField("key", LongType(), True)])
    map_value_schema = StructType([StructField("value", StringType(), True)])
    self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
    list_schema = StructType([StructField("value", StringType(), True)])
    self.list_state = handle.getListState("list", list_schema, 30000)

  def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
    for row in rows:
      # row is a tuple (key, source_timestamp)
      key_str = row[0]
      source_timestamp = row[1]
      old_value = value.get()
      if old_value is None:
        old_value = 0
      self.value_state.update((old_value + 1,))
      self.map_state.update((old_value,), (key_str,))
      self.list_state.appendValue((key_str,))
      yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)

  def close(self) -> None:
    pass


output_schema = StructType(
  [
    StructField("key", StringType(), True),
    StructField("value", LongType(), True),
    StructField("timestamp", TimestampType(), True),
  ]
)

query = (
  spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker_address)
  .option("subscribe", input_topic)
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
  .groupBy("key")
  .transformWithState(
    statefulProcessor=RTMStatefulProcessor(),
    outputStructType=output_schema,
    outputMode="Update",
    timeMode="processingTime",
  )
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker_address)
  .option("topic", output_topic)
  .option("checkpointLocation", checkpoint_location)
  .trigger(realTime="5 minutes")
  .outputMode("Update")
  .start()
)

Note

Er is een verschil tussen de manier waarop de realtimemodus en andere uitvoeringsmodi in Structured Streaming worden StatefulProcessor uitgevoerd in transformWithState. Zie TransformWithState gebruiken in realtime-modus

Sinks

Schrijven naar Postgres via foreachSink

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.{ForeachWriter, Row}

/**
 * Groups connection properties for
 * the JDBC writers.
 *
 * @param url JDBC url of the form jdbc:subprotocol:subname to connect to
 * @param dbtable database table that should be written into
 * @param username username for authentication
 * @param password password for authentication
 */
class JdbcWriterConfig(
    val url: String,
    val dbtable: String,
    val username: String,
    val password: String,
) extends Serializable

/**
 * Handles streaming data writes to a database sink via JDBC, by:
 *   - connecting to the database
 *   - buffering incoming data rows in batches to reduce write overhead
 *
 * @param config connection parameters and configuration knobs for the writer
 */
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
  extends ForeachWriter[Row] with Serializable {
  // The writer currently only supports this hard-coded schema
  private val UPSERT_STATEMENT_SQL =
    s"""MERGE INTO "${config.dbtable}"
       |USING (
       |  SELECT
       |    CAST(? AS INTEGER) AS "id",
       |    CAST(? AS CHARACTER VARYING) AS "data"
       |) AS "source"
       |ON "test"."id" = "source"."id"
       |WHEN MATCHED THEN
       |  UPDATE SET "data" = "source"."data"
       |WHEN NOT MATCHED THEN
       |  INSERT ("id", "data") VALUES ("source"."id", "source"."data")
       |""".stripMargin

  private val MAX_BUFFER_SIZE = 3
  private val buffer = new Array[Row](MAX_BUFFER_SIZE)
  private var bufferSize = 0

  private var connection: Connection = _

  /**
   * Flushes the [[buffer]] by writing all rows in the buffer to the database.
   */
  private def flushBuffer(): Unit = {
    require(connection != null)

    if (bufferSize == 0) {
      return
    }

    var upsertStatement: PreparedStatement = null

    try {
      upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)

      for (i <- 0 until bufferSize) {
        val row = buffer(i)
        upsertStatement.setInt(1, row.getAs[String]("key"))
        upsertStatement.setString(2, row.getAs[String]("value"))
        upsertStatement.addBatch()
      }

      upsertStatement.executeBatch()
      connection.commit()

      bufferSize = 0
    } catch { case e: Exception =>
      if (connection != null) {
        connection.rollback()
      }
      throw e
    } finally {
      if (upsertStatement != null) {
        upsertStatement.close()
      }
    }
  }

  override def open(partitionId: Long, epochId: Long): Boolean = {
    connection = DriverManager.getConnection(config.url, config.username, config.password)
    true
  }

  override def process(row: Row): Unit = {
    buffer(bufferSize) = row
    bufferSize += 1
    if (bufferSize >= MAX_BUFFER_SIZE) {
      flushBuffer()
    }
  }

  override def close(errorOrNull: Throwable): Unit = {
    flushBuffer()
    if (connection != null) {
      connection.close()
      connection = null
    }
  }
}


spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .writeStream
      .outputMode(OutputMode.Update())
      .trigger(defaultTrigger)
      .foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
      .start()

Display

Important

Deze functie is beschikbaar in Databricks Runtime 17.1 en hoger.

Bron van weergavesnelheid

In dit voorbeeld leest u uit een frequentiebron en geeft u het streaming DataFrame weer in een notebook.

Python
inputDF = (
  spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

val inputDF = spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())