Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Important
De real-time-modus in Lakeflow Spark Declarative Pipelines is beschikbaar als Openbare preview in Databricks Runtime 18.1.2 op het preview-kanaal.
De real-timemodus maakt gegevensverwerking met een ultralage latentie mogelijk, met een end-to-end latentie van slechts vijf milliseconden. Gebruik de realtimemodus voor operationele workloads waarvoor directe reactie op streaminggegevens is vereist, zoals fraudedetectie en realtime personalisatie.
De realtimemodus is ook rechtstreeks beschikbaar in Structured Streaming buiten pijplijnen. Bekijk de real-time modus in Structured Streaming.
Hoe de realtime-modus lage latentie bereikt
Realtimemodus verschilt op drie belangrijke manieren van standaard continue verwerking:
- Langlopende batches: het systeem verwerkt gegevens naarmate deze beschikbaar is in de bron binnen langlopende batches (standaard is vijf minuten).
- Gelijktijdige planning van fasen: Alle queryfasen worden tegelijkertijd gepland. De rekenbron moet over voldoende beschikbare taakslots beschikken zodat alle fasen gelijktijdig kunnen worden uitgevoerd. Zie Rekengrootte.
- Streaming shuffle: gegevens worden doorgegeven tussen fasen zodra deze worden geproduceerd, in plaats van te wachten tot een upstream-fase is voltooid voordat de downstreamfase wordt gestart.
Het controlepuntinterval (geconfigureerd via pipelines.trigger.interval) bepaalt hoe vaak status- en bron offsets behouden blijven voor duurzame opslag. Langere intervallen verminderen de overhead van controlepunten, maar verhogen de hersteltijd na een fout- en vertragingsrapportage met metrische gegevens. Kortere intervallen verbeteren de duurzaamheid, maar voegen overhead toe.
Realtimemodus en continue pijplijnen
De real-time-modus is een specifieke vorm van een doorlopende trigger. Continue modus is nog steeds vereist: realtimemodus voegt latentieoptimalisaties op stroomniveau toe. Als u de realtime-modus wilt gebruiken, moet de pijplijn eerst in de continue modus draaien. De realtimemodus past vervolgens extra optimalisaties toe op stroomniveau om een latentie van sub-seconde te bereiken dan wat standaard continue verwerking biedt.
Voor het inschakelen van de realtimemodus zijn drie configuratiestappen vereist:
- Stel de pijplijn in op continue modus.
- Schakel de realtimemodus in op pijplijnniveau.
- Definieer een real-time updateflow.
Requirements
| Requirement | Value |
|---|---|
| Databricks Runtime | 18.1.2 op het SDP Preview-kanaal |
| Rekentype | Klassiek rekenproces of serverloos |
Realtime-modus configureren
Stap 1: De pijplijn instellen op continue modus
Stel in de pijplijninstellingen de pijplijnmodus in op Doorlopend of stel deze in de pijplijn-JSON in:
{
"continuous": true
}
Stap 2: Realtimemodus inschakelen op pijplijnniveau
Voeg in uw pijplijninstellingen de volgende sleutel toe aan de Spark-configuratie onder Geavanceerde > Spark-configuratie:
spark.databricks.streaming.realTimeMode.enabled = true
U kunt dit ook instellen in de pijplijn-JSON:
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}
Stap 3: Een realtime updatestroom definiëren
Voor de realtimemodus is een updatestroom vereist. Gebruik dp.create_sink() dit om het uitvoerdoel te definiëren en gebruik vervolgens de @dp.update_flow decorator die pipelines.trigger is ingesteld op "RealTime" en target wijst naar de sink.
from pyspark import pipelines as dp
# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)
# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes", # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)
Configuratieparameters op stroomniveau:
| Parameter | Verplicht | Verstek | Description |
|---|---|---|---|
pipelines.trigger |
Yes | — | Stel in op "RealTime" om de real-time-modus voor deze flow in te schakelen. |
pipelines.trigger.interval |
No | "5 minutes" |
Controlepuntinterval. Hiermee wordt bepaald hoe vaak status en offsets worden weggeschreven. Kortere waarden verbeteren de herstelbaarheid; langere waarden verminderen de overhead. |
Codevoorbeelden
Kafka naar Kafka
Lees vanuit een Kafka-onderwerp en schrijf naar een Kafka-uitvoerdoel:
from pyspark import pipelines as dp
dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)
Verrijken met een uitzendingsdeelname
Voeg een Kafka-stream toe aan een statische opzoektabel. Alleen broadcast-samenvoegingen (stream-to-static) worden ondersteund. Stream-to-stream-joins worden niet ondersteund in de realtime-modus.
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr
dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})
@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)
Aggregatie
Gebeurtenissen per sleutel tellen met behulp van een groupBy met status. Stel spark.sql.shuffle.partitions in op hetzelfde aantal als het aantal invoerpartities voor stateful bewerkingen:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
"spark.sql.shuffle.partitions": "8",
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)
Ondersteunde bronnen en sinks
| Connector | Als bron | Als gegevenssink | Opmerkingen |
|---|---|---|---|
| Apache Kafka | ✓ | ✓ | — |
| AWS MSK | ✓ | ✓ | Maakt gebruik van de kafka-compatibele interface. |
| Azure Event Hubs (Kafka connector) | ✓ | ✓ | Maakt gebruik van de kafka-compatibele interface. |
| Amazon Kinesis | ✓ | Niet ondersteund | Alleen gebruiken voor de EFO-modus (Enhanced Fan-Out). |
| Delta | Niet ondersteund | Niet ondersteund | — |
Grootte berekenen
U kunt één realtime pijplijn per compute-resource uitvoeren als de compute-resource voldoende taakslots heeft. Beschikbare taakslots moeten alle taken in alle fasen van de query dekken.
| Pijplijntype | Configuration | Vereiste taaksloten |
|---|---|---|
| Staatloos met één fase (Kafka-bron + sink) |
maxPartitions = 8 |
8 |
| Twee-fasen toestandsafhankelijk systeem (Kafka-bron + shuffle) |
maxPartitions = 8, partities verdelen = 20 |
28 (8 + 20) |
| Drie fasen (Kafka-bron + twee willekeurige volgordes) |
maxPartitions = 8, twee willekeurige fasen van elk 20 |
48 (8 + 20 + 20) |
Als u maxPartitions niet instelt, gebruikt u het aantal partities van het Kafka-topic.
Operatorondersteuning
| Categorie | Operator | Supported |
|---|---|---|
| Staatloos: | Selectie, projectie | ✓ |
| UDFs | Scala UDF | ✓ (met beperkingen) |
| UDFs | Python UDF | ✓ (met beperkingen) |
| Aggregatie | som, aantal, max, min, gem. | ✓ |
| Windowing | Tumbling, glijdend | ✓ |
| Windowing | Session | Niet ondersteund |
| Ontdubbeling | dropDuplicates |
✓ (niet-gebonden status) |
| Ontdubbeling | dropDuplicatesWithinWatermark |
Niet ondersteund |
| Joins | Tabeldeelname uitzenden | ✓ |
| Joins | Samenvoeging van stroom met stroom | Niet ondersteund |
| Custom | transformWithState |
✓ (met gedragsverschillen) |
| Custom | union |
✓ (met beperkingen) |
| Custom | forEach |
Niet ondersteund |
| Custom | flatMapGroupsWithState |
Niet ondersteund |
| Custom | mapPartitions |
Niet ondersteund |
| Custom | forEachBatch |
Niet ondersteund |
transformWithState in realtimemodus
transformWithState wordt ondersteund in realtime-modus met de volgende verschillen van microbatchverwerking:
-
handleInputRowswordt eenmaal per rij aangeroepen in plaats van één keer per sleutel per batch. DeinputRowsiterator resulteert in één waarde per aanroep. - Event-timetimers worden niet ondersteund. Verwerkingstijdtimer wordt geactiveerd wanneer een langlopende batch wordt beëindigd als er geen gegevens zijn aangekomen.
-
transformWithStateInPandaswordt niet ondersteund.
Pandas UDF's in realtimemodus
Als u de latentie met pandas UDF's wilt minimaliseren, stelt u deze in op spark.sql.execution.arrow.maxRecordsPerBatch1. Hierbij wordt de latentie geoptimaliseerd ten koste van de doorvoer. Als doorvoer ook belangrijk is, stelt u deze waarde in op 100 of hoger.
Prestaties van realtimemodus bewaken
In de realtime-modus worden latentiemetrieken in StreamingQueryProgress onder het veld latencies weergegeven. Benader deze statistieken via een StreamingQueryListener of door de eigenschap lastProgress van de streaming-query te inspecteren.
| Metrische gegevens | Description |
|---|---|
processingLatencyMs |
Tijd tussen het moment waarop een record wordt gelezen door de stroom en wanneer deze volledig wordt verwerkt door de stroom |
sourceQueuingLatencyMs |
Tijd tussen het moment waarop een record succesvol naar de berichtenbus is geschreven (bijvoorbeeld de logtoevoegtijd in Kafka) en het moment waarop het voor het eerst door de gegevensstroom wordt gelezen |
e2eLatencyMs |
Totale end-to-end-latentie vanaf het moment dat het record bij de bron wordt aangemaakt tot het moment dat het volledig door de gegevensstroom is verwerkt |
Elke metrische waarde wordt gerapporteerd als p50, p90, p95 en p99 percentielen.
Limitations
Eén realtimegegevensstroom per pipeline wordt aanbevolen. Er zijn meerdere stromen toegestaan, maar taaksiteconflicten tussen stromen verhogen de latentie.
Zie Realtime modusbeperkingen voor een volledige lijst met operator- en bronbeperkingen.