Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Important
Deze functie bevindt zich in openbare preview-versie.
Op deze pagina wordt de realtimemodus beschreven, een triggertype voor Structured Streaming waarmee gegevensverwerking met ultra lage latentie met end-to-end latentie tot 5 ms mogelijk is. Deze modus is ontworpen voor operationele workloads die directe reactie op streaminggegevens vereisen.
De realtimemodus is beschikbaar in Databricks Runtime 16.4 LTS en hoger.
Operationele workloads
Streaming-workloads kunnen breed worden onderverdeeld in analytische workloads en operationele workloads.
- Analytische workloads maken gebruik van gegevensinname en transformatie, meestal volgens de medaillonarchitectuur (bijvoorbeeld het opnemen van gegevens in de bronzen, zilveren en gouden tabellen).
- Operationele workloads verbruiken realtimegegevens, passen bedrijfslogica toe en activeren downstreamacties of beslissingen.
Enkele voorbeelden van operationele workloads zijn:
- Het blokkeren of markeren van een creditcardtransactie in realtime als een fraudescore een drempelwaarde overschrijdt, op basis van factoren zoals ongebruikelijke locatie, grote transactiegrootte of snelle uitgavenpatronen.
- Het versturen van een promotiebericht wanneer clickstream-gegevens laten zien dat een gebruiker al vijf minuten jeans aan het bekijken is, waarbij een korting van 25% wordt aangeboden als ze binnen de komende 15 minuten kopen.
Over het algemeen worden de operationele workloads gekenmerkt door de noodzaak van een end-to-end latentie van onder de seconde. Dit kan worden bereikt met de realtimemodus in Apache Spark Structured Streaming.
Hoe de realtime-modus lage latentie bereikt
De realtimemodus verbetert de uitvoeringsarchitectuur door:
- Het uitvoeren van langlopende batches (de standaardwaarde is vijf minuten), waarin gegevens worden verwerkt zodra deze beschikbaar zijn in de bron.
- Alle fasen van de query worden gelijktijdig gepland. Hiervoor moet het aantal beschikbare taaksites gelijk zijn aan of groter zijn dan het aantal taken van alle fasen in een batch.
- De gegevens worden tussen fasen doorgegeven zodra ze zijn geproduceerd met behulp van een streaming shuffle.
Aan het einde van de verwerking van een batch en voordat de volgende batch wordt gestart, worden controlepunten voor gestructureerd streamen uitgevoerd en worden metrische gegevens voor de laatste batch beschikbaar gemaakt. Als de batches langer zijn, zijn deze activiteiten mogelijk minder frequent, wat leidt tot langere herhalingen in het geval van fouten en vertraging in de beschikbaarheid van metrische gegevens. Als de batches echter kleiner zijn, worden deze activiteiten frequenter, wat mogelijk van invloed is op de latentie. Databricks raadt u aan de realtime-modus te benchmarken op basis van uw doelworkload en vereisten om het juiste triggerinterval te vinden.
Clusterconfiguratie
Als u de realtime-modus in Structured Streaming wilt gebruiken, moet u een klassieke Lakeflow-taak configureren:
Klik in uw Azure Databricks-werkruimte op Nieuw in de linkerbovenhoek. Kies Meer en klik op Cluster.
Fotonversnelling wissen.
Schakel Automatisch schalen inschakelen uit.
Wis onder Geavanceerde prestaties de optie Spot-exemplaren gebruiken.
Klik onder De modus Geavanceerd en Toegang op Handmatig en selecteer Dedicated (voorheen: Eén gebruiker).
Voer onder Spark het volgende in onder Spark-configuratie:
spark.databricks.streaming.realTimeMode.enabled trueKlik op Create.
Vereisten voor clustergrootte
U kunt één realtime taak per cluster uitvoeren als het cluster voldoende taaksites heeft.
Als u in lage-latentiemodus wilt werken, moet het totale aantal beschikbare taakslots groter zijn dan of gelijk zijn aan het aantal taken over alle querystadia heen.
Voorbeelden van sleufberekeningen
Pijplijn zonder status met één fase (Kafka-bron + uitvoer):
Als maxPartitions = 8, hebt u ten minste 8 sleuven nodig. Als maxPartitions niet is ingesteld, gebruikt u het aantal Kafka-onderwerppartities.
Stateful pijplijn met twee fasen (Kafka-bron en shuffle):
Als maxPartitions = 8 en shuffle-partities = 20, hebt u 8 + 20 = 28 slots nodig.
Pijplijn met drie fasen (Kafka-bron + shuffle + repartition):
Met maxPartitions = 8 en twee shuffle-fasen van elk 20, heeft u 8 + 20 + 20 = 48 sleuven nodig.
Belangrijke overwegingen
Houd rekening met het volgende wanneer u uw cluster configureert:
- In tegenstelling tot de microbatchmodus kunnen realtime taken inactief blijven tijdens het wachten op gegevens, zodat de juiste grootte essentieel is om verspilde resources te voorkomen.
- Streef naar een gebruiksniveau van bijvoorbeeld 50% door het afstemmen van de volgende aspecten:
-
maxPartitions(voor Kafka) -
spark.sql.shuffle.partitions(voor shuffle-fasen)
-
- Databricks raadt aan maxPartitions in te stellen, zodat elke taak meerdere Kafka-partities verwerkt om de overhead te verminderen.
- Pas taakslots per werknemer aan zodat deze overeenkomen met de werkbelasting voor eenvoudige taken met één fase.
- Experimenteer voor shuffle-intensieve taken om het minimale aantal shuffle-partities te vinden dat achterstanden voorkomt, en pas van daaruit aan. De taak zal niet ingepland worden als het cluster niet genoeg slots heeft.
Note
Vanuit Databricks Runtime 16.4 LTS en hoger maken alle realtime-pijplijnen gebruik van controlepunt v2, waardoor naadloze schakelen tussen realtime- en microbatchmodi mogelijk is.
Queryconfiguratie
U moet de realtime-trigger inschakelen om op te geven dat een query moet worden uitgevoerd met behulp van de modus met lage latentie. Bovendien worden realtime-triggers alleen ondersteund in de updatemodus. Voorbeeld:
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# in PySpark, realTime trigger requires you to specify the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Observability
Voorheen was de end-to-end-querylatentie nauw gekoppeld aan de batchduur, waardoor batchduur een goede indicator van de querylatentie was. Deze methode is echter niet langer van toepassing in de realtimemodus, waarvoor alternatieve methoden nodig zijn voor het meten van latentie. End-to-end latentie is de workloadspecifiek en kan soms alleen nauwkeurig worden gemeten met bedrijfslogica. Als de brontijdstempel bijvoorbeeld wordt uitgevoerd in Kafka, kan de latentie worden berekend als het verschil tussen de tijdstempel van kafka en de tijdstempel van de bron.
U kunt de end-to-end-latentie op verschillende manieren schatten op basis van gedeeltelijke informatie die tijdens het streamingproces is verzameld.
StreamingQueryProgress gebruiken
De volgende metrische gegevens worden opgenomen in de StreamingQueryProgress gebeurtenis, die automatisch wordt geregistreerd in de stuurprogrammalogboeken. U kunt ze ook openen via de callback-functie van de StreamingQueryListeneronQueryProgress() functie.
QueryProgressEvent.json() ofwel toString() extra metrische gegevens voor de realtimemodus opnemen.
- Verwerkingslatentie (processingLatencyMs). De tijd die is verstreken tussen het moment waarop de realtime-modusquery een record leest en voordat deze naar de volgende fase of downstream wordt geschreven. Voor query's met één fase meet dit dezelfde duur als de E2E-latentie. Deze metrische waarde wordt per taak gerapporteerd.
- Wachtrijlatentie van de bron (sourceQueuingLatencyMs). De tijd die verstrijkt tussen het moment dat een record succesvol is geschreven naar een berichtenbus, bijvoorbeeld de toevoegingstijd van het logbestand in Kafka, en het moment dat het record voor het eerst wordt gelezen door een query in de realtimemodus. Deze metrische waarde wordt per taak gerapporteerd.
- E2E Latentie (e2eLatencyMs). De tijd tussen het moment waarop het record succesvol is geschreven naar een berichtenbus en wanneer het record downstream wordt geschreven door de realtime-modusquery. Deze metrische waarde wordt geaggregeerd per batch voor alle records die door alle taken worden verwerkt.
Voorbeeld:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
Observe-API gebruiken bij jobs
Met de Observe-API kunt u latentie meten zonder een andere taak te starten. Als u een brontijdstempel hebt die de aankomsttijd van de brongegevens bijwerkt en deze wordt doorgegeven voordat u de sink bereikt, of als u een manier kunt vinden om de tijdstempel door te geven, kunt u de latentie van elke batch schatten met behulp van de Observe-API:
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
In dit voorbeeld wordt een huidige tijdstempel vastgelegd voordat de invoer wordt uitgevoerd. Latentie wordt geschat door het verschil tussen deze tijdstempel en de brontijdstempel van de record te berekenen. De resultaten worden opgenomen in voortgangsrapporten en beschikbaar gesteld aan listeners. Hier volgt een voorbeelduitvoer:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
Wat wordt ondersteund?
Environments
| Clustertype | Supported |
|---|---|
| Dedicated (voorheen: één gebruiker) | Yes |
| Standard (voorheen: gedeeld) | No |
| Klassieke declaratieve Pijplijnen van Lakeflow Spark | No |
| Lakeflow Spark-declaratieve pijplijnen serverloos | No |
| Serverless | No |
Languages
| Language | Supported |
|---|---|
| Scala | Yes |
| Java | Yes |
| Python | Yes |
Uitvoeringsmodi
| Uitvoeringsmodus | Supported |
|---|---|
| Updatemodus | Yes |
| Append mode | No |
| Volledige modus | No |
Sources
| Sources | Supported |
|---|---|
| Apache Kafka | Yes |
| AWS MSK | Yes |
| Eventhub (met Kafka Connector) | Yes |
| Kinesis | Ja (enkel EFO-modus) |
| Google Pub/Sub (een berichten- en gebeurtenissenservice van Google) | No |
| Apache Pulsar | No |
Sinks
| Sinks | Supported |
|---|---|
| Apache Kafka | Yes |
| Eventhub (met Kafka Connector) | Yes |
| Kinesis | No |
| Google Pub/Sub (een berichten- en gebeurtenissenservice van Google) | No |
| Apache Pulsar | No |
| Willekeurige sinks (met forEachWriter) | Yes |
Operators
| Operators | Supported |
|---|---|
| Staatloze bewerkingen | |
|
Yes |
|
Yes |
| UDFs | |
|
Ja (met enkele beperkingen) |
|
Ja (met enkele beperkingen) |
| Aggregation | |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
| Aggregatiesfuncties | Yes |
| Windowing | |
|
Yes |
|
Yes |
|
No |
| Deduplication | |
|
Ja (de toestand is onbegrensd) |
|
No |
| Stream - Tabel Koppeling | |
|
Yes |
| Stream - Stream Toevoegen | No |
| (plat)MapGroupsWithState | No |
| transformWithState | Ja (met enkele verschillen) |
| union | Ja (met enkele beperkingen) |
| forEach | Yes |
| forEachBatch | No |
| mapPartitions | Nee (zie beperking) |
TransformWithState gebruiken in realtimemodus
Voor het bouwen van aangepaste stateful toepassingen ondersteunt Databricks transformWithState, een API in Apache Spark Structured Streaming. Zie Een aangepaste stateful toepassing bouwen voor meer informatie over de API en codefragmenten.
Er zijn echter enkele verschillen tussen hoe de API zich gedraagt in realtime-modus en traditionele streamingquery's die gebruikmaken van de microbatcharchitectuur.
- De methode in realtimemodus
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)wordt aangeroepen voor elke rij.- De
inputRowsiterator retourneert één waarde. In de microbatchmodus wordt deze eenmaal aangeroepen voor elke sleutel en retourneert deinputRowsiterator alle waarden voor een sleutel in de microbatch. - U moet zich bewust zijn van dit verschil bij het schrijven van uw code.
- De
- Timers voor gebeurtenistijd worden niet ondersteund in realtimemodus.
- In real-time modus worden timers vertraagd bij het afvuren, afhankelijk van de gegevensaanvoer. Als er geen gegevens zijn, wordt deze aan het einde van de langlopende batch geactiveerd. Als een timer bijvoorbeeld om 10:00:00 uur afgaat en er op hetzelfde moment geen gegevens binnenkomen, gaat deze niet af. Als gegevens om 10:00:10 binnenkomen, wordt de timer geactiveerd met een vertraging van 10 seconden. Als er geen gegevens binnenkomen en de langlopende batch wordt beëindigd, wordt de timer gestart voordat de langlopende batch wordt beëindigd.
Python UDF's
Databricks ondersteunt het merendeel van door de gebruiker gedefinieerde Python-functies (UDF's) in realtime:
| UDF-type | Supported |
|---|---|
| Staatloze UDF | |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
| Stateful Grouping UDF (UDAF) | |
|
Yes |
|
No |
| Niet-stateful groeperingS-UDF (UDAF) | |
|
No |
|
No |
|
No |
| Tabelfunctie | |
|
No |
| UC UDF | No |
Er zijn verschillende punten om rekening mee te houden bij het gebruik van Python UDF's in realtime-modus:
- Als u de latentie wilt minimaliseren, configureert u de grootte van de pijlbatch (spark.sql.execution.arrow.maxRecordsPerBatch) op 1.
- Trade-off: Deze configuratie optimaliseert voor latentie ten koste van doorvoer. Voor de meeste workloads wordt deze instelling aanbevolen.
- Verhoog de batchgrootte alleen als een hogere doorvoer is vereist voor invoervolume, waarbij de potentiële toename van de latentie wordt geaccepteerd.
- Pandas UDF's en functies presteren niet goed met een pijlbatchgrootte van 1.
- Als u pandas UDF's of functies gebruikt, stelt u de grootte van de pijlbatch in op een hogere waarde (bijvoorbeeld 100 of hoger).
- Houd er rekening mee dat dit een hogere latentie impliceert. Databricks raadt het gebruik van Arrow UDF of functie aan, indien mogelijk.
- Vanwege het prestatieprobleem met pandas wordt transformWithState alleen ondersteund met de
Rowinterface.
Optimalisatietechnieken
| Technique | Standaard ingeschakeld |
|---|---|
| Asynchrone voortgangstracering: Schrijven naar het offsetlogboek en het commitlogboek wordt naar een asynchrone thread verplaatst, waardoor de onderlinge tijd tussen twee microbatches wordt verminderd. Dit kan helpen de latentie van staatloze streamingquery's te verminderen. | No |
| Asynchrone statuscontrolepunten: helpt de latentie van stateful streamingquery's te verminderen door de volgende microbatch te verwerken zodra de berekening van de vorige microbatch is voltooid, zonder te wachten op statuscontrolepunten. | No |
Limitations
Bronbeperking
Voor Kinesis wordt de pollingmodus niet ondersteund. Bovendien kunnen frequente repartities een negatieve invloed hebben op de latentie.
Beperking van unie
Voor Union gelden enkele beperkingen:
- Self-union wordt niet ondersteund:
- Kafka: U kunt niet hetzelfde brongegevensframeobject gebruiken en tegelijkertijd de daaruit afgeleide gegevensframes samenvoegen. Tijdelijke oplossing: gebruik verschillende Dataframes die uit dezelfde bron worden gelezen.
- Kinesis: U kunt geen gegevensframes samenvoegen die zijn afgeleid van dezelfde Kinesis-bron met dezelfde configuratie. Tijdelijke oplossing: Naast het gebruik van verschillende Dataframes kunt u een andere optie 'consumerName' toewijzen aan elk DataFrame.
- Stateful operators (bijvoorbeeld
aggregate,deduplicate,transformWithState) die zijn gedefinieerd vóór de Union worden niet ondersteund. - Samenvoegen met batchbronnen wordt niet ondersteund.
Beperking van MapPartitions
mapPartitions in Scala en vergelijkbare Python-API's (mapInPandas, mapInArrow) nemen een iterator van de gehele invoerpartitie en produceren een iterator voor de gehele uitvoer met arbitraire koppeling tussen invoer en uitvoer. Deze API's kunnen prestatieproblemen veroorzaken in de streamingmodus Real-Time door de volledige uitvoer te blokkeren, waardoor de latentie toeneemt. De semantiek van deze API's biedt geen ondersteuning voor het doorgeven van watermerken.
Gebruik scalaire UDF's in combinatie met complexe gegevenstypen transformeren of filter in plaats daarvan om vergelijkbare functionaliteit te bereiken.
Examples
In de onderstaande voorbeelden ziet u query's die worden ondersteund.
Stateless queries
Alle stateless query's met één of meerdere fasen worden ondersteund.
Kafka-bron naar Kafka-sink
In dit voorbeeld leest u uit een Kafka-bron en schrijft u naar een Kafka-sink.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Verdeling
In dit voorbeeld leest u uit een Kafka-bron, herpartitioneert u de gegevens in 20 partities en schrijft u naar een Kafka-sink.
Stel de Spark-configuratie spark.sql.execution.sortBeforeRepartition in op false voordat u de repartitie gebruikt.
Python
# Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Stream-snapshot join (alleen uitzenden)
In dit voorbeeld leest u vanuit Kafka, voegt u de gegevens samen met een statische tabel en schrijft u naar een Kafka-sink. Houd er rekening mee dat alleen stream-statische joins die de statische tabel uitzenden, worden ondersteund. Dit betekent dat de statische tabel in het geheugen moet passen.
Python
from pyspark.sql.functions import broadcast, expr
# We assume the static table in the path `stateic_table_location` has a column 'lookupKey'.
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.withColumn("joinKey", expr("CAST(value AS STRING)"))
.join(
broadcast(spark.read.format("parquet").load(static_table_location)),
expr("joinKey = lookupKey")
)
.selectExpr("value AS key", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Kinesis-bron naar Kafka-sink
In dit voorbeeld leest u uit een Kinesis-bron en schrijft u naar een Kafka-sink.
Python
query = (
spark.readStream
.format("kinesis")
.option("region", region_name)
.option("awsAccessKey", aws_access_key_id)
.option("awsSecretKey", aws_secret_access_key)
.option("consumerMode", "efo")
.option("consumerName", consumer_name)
.load()
.selectExpr("parttitionKey AS key", "CAST(data AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kinesis")
.option("region", regionName)
.option("awsAccessKey", awsAccessKeyId)
.option("awsSecretKey", awsSecretAccessKey)
.option("consumerMode", "efo")
.option("consumerName", consumerName)
.load()
.select(
col("partitionKey").alias("key"),
col("data").cast("string").alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Union
In dit voorbeeld koppelt u twee Kafka DataFrames uit twee verschillende onderwerpen en schrijft u naar een Kafka-sink.
Python
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Stateful query's
Deduplication
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.dropDuplicates(["timestamp", "value"])
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.dropDuplicates("timestamp", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Aggregation
Python
from pyspark.sql.functions import col
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
Combinatie met aggregatie
In dit voorbeeld koppelt u eerst twee Kafka DataFrames uit twee verschillende onderwerpen en voert u vervolgens een aggregatie uit. Uiteindelijk schrijft u naar de Kafka-sink.
Python
from pyspark.sql.functions import col
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
TransformWithState
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import org.apache.spark.sql.streaming.{ListState, MapState, StatefulProcessor, OutputMode, TTLConfig, TimeMode, TimerValues, ValueState}
/**
* This processor counts the number of records it has seen for each key using state variables
* with TTLs. It redundantly maintains this count with a value, list, and map state to put load
* on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
* the count for a given grouping key.)
*
* The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
* The source-timestamp is passed through so that we can calculate end-to-end latency. The output
* schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
*
*/
class RTMStatefulProcessor(ttlConfig: TTLConfig)
extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
@transient private var _value: ValueState[Long] = _
@transient private var _map: MapState[Long, String] = _
@transient private var _list: ListState[String] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
// Counts the number of records this key has seen
_value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
_map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
_list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, Long)],
timerValues: TimerValues): Iterator[(String, Long, Long)] = {
inputRows.map { row =>
val key = row._1
val sourceTimestamp = row._2
val oldValue = _value.get()
_value.update(oldValue + 1)
_map.updateValue(oldValue, key)
_list.appendValue(key)
(key, oldValue + 1, sourceTimestamp)
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
.as[(String, String, Timestamp)]
.groupByKey(row => row._1)
.transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
.as[(String, Long, Long)]
.select(
col("_1").as("key"),
col("_2").as("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
Note
Er is een verschil tussen hoe de realtime modus en andere uitvoeringsmodi in Structured Streaming de StatefulProcessor uitvoeren in transformWithState. Zie TransformWithState gebruiken in realtime-modus
TransformWithState (PySpark, rijinterface)
from typing import Iterator, Tuple
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType
class RTMStatefulProcessor(StatefulProcessor):
"""
This processor counts the number of records it has seen for each key using state variables
with TTLs. It redundantly maintains this count with a value, list, and map state to put load
on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
the count for a given grouping key.)
The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
The source-timestamp is passed through so that we can calculate end-to-end latency. The output
schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
"""
def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("value", LongType(), True)])
self.value_state = handle.getValueState("value", state_schema, 30000)
map_key_schema = StructType([StructField("key", LongType(), True)])
map_value_schema = StructType([StructField("value", StringType(), True)])
self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
list_schema = StructType([StructField("value", StringType(), True)])
self.list_state = handle.getListState("list", list_schema, 30000)
def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
for row in rows:
# row is a tuple (key, source_timestamp)
key_str = row[0]
source_timestamp = row[1]
old_value = value.get()
if old_value is None:
old_value = 0
self.value_state.update((old_value + 1,))
self.map_state.update((old_value,), (key_str,))
self.list_state.appendValue((key_str,))
yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)
def close(self) -> None:
pass
output_schema = StructType(
[
StructField("key", StringType(), True),
StructField("value", LongType(), True),
StructField("timestamp", TimestampType(), True),
]
)
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.groupBy("key")
.transformWithState(
statefulProcessor=RTMStatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="processingTime",
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("Update")
.start()
)
Note
Er is een verschil tussen de manier waarop de realtimemodus en andere uitvoeringsmodi in Structured Streaming worden StatefulProcessor uitgevoerd in transformWithState. Zie TransformWithState gebruiken in realtime-modus
Sinks
Schrijven naar Postgres via foreachSink
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{ForeachWriter, Row}
/**
* Groups connection properties for
* the JDBC writers.
*
* @param url JDBC url of the form jdbc:subprotocol:subname to connect to
* @param dbtable database table that should be written into
* @param username username for authentication
* @param password password for authentication
*/
class JdbcWriterConfig(
val url: String,
val dbtable: String,
val username: String,
val password: String,
) extends Serializable
/**
* Handles streaming data writes to a database sink via JDBC, by:
* - connecting to the database
* - buffering incoming data rows in batches to reduce write overhead
*
* @param config connection parameters and configuration knobs for the writer
*/
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
extends ForeachWriter[Row] with Serializable {
// The writer currently only supports this hard-coded schema
private val UPSERT_STATEMENT_SQL =
s"""MERGE INTO "${config.dbtable}"
|USING (
| SELECT
| CAST(? AS INTEGER) AS "id",
| CAST(? AS CHARACTER VARYING) AS "data"
|) AS "source"
|ON "test"."id" = "source"."id"
|WHEN MATCHED THEN
| UPDATE SET "data" = "source"."data"
|WHEN NOT MATCHED THEN
| INSERT ("id", "data") VALUES ("source"."id", "source"."data")
|""".stripMargin
private val MAX_BUFFER_SIZE = 3
private val buffer = new Array[Row](MAX_BUFFER_SIZE)
private var bufferSize = 0
private var connection: Connection = _
/**
* Flushes the [[buffer]] by writing all rows in the buffer to the database.
*/
private def flushBuffer(): Unit = {
require(connection != null)
if (bufferSize == 0) {
return
}
var upsertStatement: PreparedStatement = null
try {
upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)
for (i <- 0 until bufferSize) {
val row = buffer(i)
upsertStatement.setInt(1, row.getAs[String]("key"))
upsertStatement.setString(2, row.getAs[String]("value"))
upsertStatement.addBatch()
}
upsertStatement.executeBatch()
connection.commit()
bufferSize = 0
} catch { case e: Exception =>
if (connection != null) {
connection.rollback()
}
throw e
} finally {
if (upsertStatement != null) {
upsertStatement.close()
}
}
}
override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(config.url, config.username, config.password)
true
}
override def process(row: Row): Unit = {
buffer(bufferSize) = row
bufferSize += 1
if (bufferSize >= MAX_BUFFER_SIZE) {
flushBuffer()
}
}
override def close(errorOrNull: Throwable): Unit = {
flushBuffer()
if (connection != null) {
connection.close()
connection = null
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.outputMode(OutputMode.Update())
.trigger(defaultTrigger)
.foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
.start()
Display
Important
Deze functie is beschikbaar in Databricks Runtime 17.1 en hoger.
Bron van weergavesnelheid
In dit voorbeeld leest u uit een frequentiebron en geeft u het streaming DataFrame weer in een notebook.
Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())