Query's voor gestructureerd streamen bewaken in Azure Databricks
Azure Databricks biedt ingebouwde bewaking voor structured streaming-toepassingen via de Spark-gebruikersinterface op het tabblad Streaming .
Structured Streaming-query's onderscheiden in de Spark-gebruikersinterface
Geef uw streams een unieke querynaam op door toe te voegen .queryName(<query-name>)
aan uw writeStream
code om eenvoudig te onderscheiden welke metrische gegevens tot welke stream behoren in de Spark-gebruikersinterface.
Metrische gegevens over gestructureerd streamen naar externe services pushen
Metrische streaminggegevens kunnen worden gepusht naar externe services voor waarschuwingen of gebruiksvoorbeelden voor dashboards met behulp van de interface streamingquerylistener van Apache Spark. In Databricks Runtime 11.3 LTS en hoger is de Streaming Query Listener beschikbaar in Python en Scala.
Belangrijk
Referenties en objecten die worden beheerd door Unity Catalog kunnen niet worden gebruikt in StreamingQueryListener
logica.
Notitie
De verwerkingslatentie met listeners kan aanzienlijk van invloed zijn op de verwerkingssnelheden van query's. Het wordt aangeraden om de verwerkingslogica in deze listeners te beperken en ervoor te kiezen om te schrijven naar snelle responssystemen zoals Kafka voor efficiëntie.
De volgende code bevat basisvoorbeelden van de syntaxis voor het implementeren van een listener:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Waarneembare metrische gegevens definiëren in Structured Streaming
Waarneembare metrische gegevens worden willekeurige statistische functies genoemd die kunnen worden gedefinieerd voor een query (DataFrame). Zodra de uitvoering van een DataFrame een voltooiingspunt bereikt (dat wil gezegd, voltooit u een batchquery of bereikt u een streaming-epoch), wordt een benoemde gebeurtenis verzonden die de metrische gegevens bevat voor de gegevens die zijn verwerkt sinds het laatste voltooiingspunt.
U kunt deze metrische gegevens bekijken door een listener toe te voegen aan de Spark-sessie. De listener is afhankelijk van de uitvoeringsmodus:
Batchmodus: Gebruiken
QueryExecutionListener
.QueryExecutionListener
wordt aangeroepen wanneer de query is voltooid. Toegang tot de metrische gegevens met behulp van deQueryExecution.observedMetrics
kaart.Streaming of microbatch: Gebruik
StreamingQueryListener
.StreamingQueryListener
wordt aangeroepen wanneer de streamingquery een periode voltooit. Toegang tot de metrische gegevens met behulp van deStreamingQueryProgress.observedMetrics
kaart. Azure Databricks biedt geen ondersteuning voor continue uitvoering van streaming.
Voorbeeld:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Metrische gegevens van streamingQueryListener-objecten
Metrisch | Beschrijving |
---|---|
id |
Een unieke query-id die blijft bestaan tijdens het opnieuw opstarten. |
runId |
Een query-id die uniek is voor elke start/herstart. Zie StreamingQuery.runId(). |
name |
De door de gebruiker opgegeven naam van de query. De naam is null als er geen naam is opgegeven. |
timestamp |
De tijdstempel voor de uitvoering van de microbatch. |
batchId |
Een unieke id voor de huidige batch met gegevens die worden verwerkt. In het geval van nieuwe pogingen na een fout, kan een bepaalde batch-id meerdere keren worden uitgevoerd. Op dezelfde manier wordt de batch-id niet verhoogd wanneer er geen gegevens moeten worden verwerkt. |
numInputRows |
Het geaggregeerde aantal records (in alle bronnen) dat in een trigger is verwerkt. |
inputRowsPerSecond |
De cumulatieve snelheid (voor alle bronnen) van binnenkomende gegevens. |
processedRowsPerSecond |
De cumulatieve snelheid (voor alle bronnen) waarmee Spark gegevens verwerkt. |
durationMs-object
Informatie over de tijd die nodig is om verschillende fasen van het microbatch-uitvoeringsproces te voltooien.
Metrisch | Beschrijving |
---|---|
durationMs.addBatch |
De tijd die nodig was om de microbatch uit te voeren. Dit sluit de tijd die Spark nodig heeft om de microbatch te plannen. |
durationMs.getBatch |
De tijd die nodig is om de metagegevens over de offsets van de bron op te halen. |
durationMs.latestOffset |
De meest recente offset die wordt verbruikt voor de microbatch. Dit voortgangsobject verwijst naar de tijd die nodig is om de meest recente offset van bronnen op te halen. |
durationMs.queryPlanning |
De tijd die nodig is om het uitvoeringsplan te genereren. |
durationMs.triggerExecution |
De tijd die nodig is om de microbatch te plannen en uit te voeren. |
durationMs.walCommit |
De tijd die nodig is om de nieuwe beschikbare offsets vast te leggen. |
eventTime-object
Informatie over de waarde van de gebeurtenistijd die wordt gezien in de gegevens die in de microbatch worden verwerkt. Deze gegevens worden door het watermerk gebruikt om erachter te komen hoe u de status kunt bijsnijden voor het verwerken van stateful aggregaties die zijn gedefinieerd in de structured streaming-taak.
Metrisch | Beschrijving |
---|---|
eventTime.avg |
De gemiddelde gebeurtenistijd die in die trigger wordt gezien. |
eventTime.max |
De maximale gebeurtenistijd die in die trigger wordt weergegeven. |
eventTime.min |
De minimale gebeurtenistijd die in die trigger wordt weergegeven. |
eventTime.watermark |
De waarde van het watermerk dat in die trigger wordt gebruikt. |
stateOperators-object
Informatie over de stateful bewerkingen die zijn gedefinieerd in de Structured Streaming-taak en de aggregaties die daaruit worden geproduceerd.
Metrisch | Beschrijving |
---|---|
stateOperators.operatorName |
De naam van de stateful operator waarop de metrische gegevens betrekking hebben, zoals symmetricHashJoin , dedupe stateStoreSave . |
stateOperators.numRowsTotal |
Het totale aantal rijen met status als gevolg van een stateful operator of aggregatie. |
stateOperators.numRowsUpdated |
Het totale aantal rijen dat in de status is bijgewerkt als gevolg van een stateful operator of aggregatie. |
stateOperators.allUpdatesTimeMs |
Deze metrische waarde is momenteel niet meetbaar door Spark en is gepland om te worden verwijderd in toekomstige updates. |
stateOperators.numRowsRemoved |
Het totale aantal rijen dat uit de status is verwijderd als gevolg van een stateful operator of aggregatie. |
stateOperators.allRemovalsTimeMs |
Deze metrische waarde is momenteel niet meetbaar door Spark en is gepland om te worden verwijderd in toekomstige updates. |
stateOperators.commitTimeMs |
De tijd die nodig is om alle updates door te voeren (plaatst en verwijdert) en een nieuwe versie retourneert. |
stateOperators.memoryUsedBytes |
Geheugen dat wordt gebruikt door het statusarchief. |
stateOperators.numRowsDroppedByWatermark |
Het aantal rijen dat te laat wordt beschouwd om te worden opgenomen in een stateful aggregatie. Alleen streamingaggregaties: het aantal rijen dat na aggregatie is verwijderd (geen onbewerkte invoerrijen). Dit getal is niet nauwkeurig, maar geeft een indicatie dat er late gegevens worden verwijderd. |
stateOperators.numShufflePartitions |
Het aantal willekeurige partities voor deze stateful operator. |
stateOperators.numStateStoreInstances |
Het werkelijke exemplaar van het statusarchief dat de operator heeft geïnitialiseerd en onderhouden. Voor veel stateful operators is dit hetzelfde als het aantal partities. Stream-stream-joins initialiseren echter vier exemplaren van het statusarchief per partitie. |
stateOperators.customMetrics-object
Gegevens die worden verzameld uit RocksDB die metrische gegevens vastleggen over de prestaties en bewerkingen met betrekking tot de stateful waarden die worden onderhouden voor de structured streaming-taak. Zie RocksDB-statusopslag configureren in Azure Databricks voor meer informatie.
Metrisch | Beschrijving |
---|---|
customMetrics.rocksdbBytesCopied |
Het aantal bytes dat is gekopieerd als bijgehouden door RocksDB File Manager. |
customMetrics.rocksdbCommitCheckpointLatency |
De tijd in milliseconden die een momentopname maken van systeemeigen RocksDB en deze naar een lokale map schrijven. |
customMetrics.rocksdbCompactLatency |
De tijd in milliseconden comprimeren (optioneel) tijdens het doorvoeren van het controlepunt. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
De tijd in milliseconden die de systeemeigen RocksDB-momentopname synchroniseert met externe opslag (de locatie van het controlepunt). |
customMetrics.rocksdbCommitFlushLatency |
De tijd in milliseconden die de RocksDB-in-memory wijzigingen in het geheugen leegmaken aan de lokale schijf. |
customMetrics.rocksdbCommitPauseLatency |
De tijd in milliseconden stopt de achtergrondwerkrolthreads als onderdeel van de controlepuntdoorvoering, zoals voor compressie. |
customMetrics.rocksdbCommitWriteBatchLatency |
De tijd in milliseconden die de gefaseerde schrijfbewerkingen in de geheugenstructuur (WriteBatch ) toepassen op systeemeigen RocksDB. |
customMetrics.rocksdbFilesCopied |
Het aantal bestanden dat is gekopieerd als bijgehouden door RocksDB File Manager. |
customMetrics.rocksdbFilesReused |
Het aantal bestanden dat opnieuw wordt gebruikt als bijgehouden door RocksDB File Manager. |
customMetrics.rocksdbGetCount |
Het aantal get aanroepen naar de database (bevat gets WriteBatch geen batch in het geheugen die wordt gebruikt voor faseringsschrijfbewerkingen). |
customMetrics.rocksdbGetLatency |
De gemiddelde tijd in nanoseconden voor de onderliggende systeemeigen RocksDB::Get aanroep. |
customMetrics.rocksdbReadBlockCacheHitCount |
Het aantal cachetreffers uit de blokcache in RocksDB die handig zijn bij het voorkomen van leesbewerkingen van lokale schijven. |
customMetrics.rocksdbReadBlockCacheMissCount |
Het aantal blokcaches in RocksDB is niet handig bij het voorkomen van leesbewerkingen van lokale schijven. |
customMetrics.rocksdbSstFileSize |
De grootte van alle SST-bestanden (Static Sorted Table) - de tabellaire structuur RocksDB gebruikt om gegevens op te slaan. |
customMetrics.rocksdbTotalBytesRead |
Het aantal niet-gecomprimeerde bytes dat door get bewerkingen wordt gelezen. |
customMetrics.rocksdbTotalBytesReadByCompaction |
Het aantal bytes dat door het compressieproces van de schijf wordt gelezen. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Het totale aantal bytes aan niet-gecomprimeerde gegevens dat wordt gelezen met behulp van een iterator. Voor sommige stateful bewerkingen (bijvoorbeeld time-outverwerking in FlatMapGroupsWithState en watermerking) moeten gegevens in DB worden gelezen via een iterator. |
customMetrics.rocksdbTotalBytesWritten |
Het totale aantal niet-gecomprimeerde bytes dat is geschreven door put bewerkingen. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
Het totale aantal bytes dat door het compressieproces naar de schijf wordt geschreven. |
customMetrics.rocksdbTotalCompactionLatencyMs |
De tijd in milliseconden voor RocksDB-compressies, inclusief achtergrondcompressies en de optionele compressie die tijdens het doorvoeren is geïnitieerd. |
customMetrics.rocksdbTotalFlushLatencyMs |
De totale spoeltijd, inclusief het leegmaken van de achtergrond. Flush-bewerkingen zijn processen waarmee de MemTable leeggemaakte bewerkingen naar de opslag worden gespoeld zodra deze vol is. MemTables zijn het eerste niveau waar gegevens worden opgeslagen in RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
De grootte in bytes van de niet-gecomprimeerde zip-bestanden, zoals gerapporteerd door Bestandsbeheer. Bestandsbeheer beheert het gebruik en de verwijdering van de fysieke SST-bestandsschijfruimte. |
bronobject (Kafka)
Metrisch | Beschrijving |
---|---|
sources.description |
Een gedetailleerde beschrijving van de Kafka-bron, waarin het exacte Kafka-onderwerp wordt opgegeven waaruit wordt gelezen. Voorbeeld: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” . |
sources.startOffset object |
Het beginverschilnummer in het Kafka-onderwerp waarop de streamingtaak is gestart. |
sources.endOffset object |
De laatste offset verwerkt door de microbatch. Dit kan gelijk zijn aan latestOffset een doorlopende uitvoering van microbatch. |
sources.latestOffset object |
De meest recente offset van de microbatch. Het microbatchingproces verwerkt mogelijk niet alle offsets wanneer er beperking is, wat resulteert in endOffset en latestOffset differentiëren. |
sources.numInputRows |
Het aantal invoerrijen dat uit deze bron is verwerkt. |
sources.inputRowsPerSecond |
De snelheid waarmee gegevens binnenkomen voor verwerking vanuit deze bron. |
sources.processedRowsPerSecond |
De snelheid waarmee Spark gegevens van deze bron verwerkt. |
sources.metrics-object (Kafka)
Metrisch | Beschrijving |
---|---|
sources.metrics.avgOffsetsBehindLatest |
Het gemiddelde aantal offsets dat de streamingquery achter de meest recente beschikbare offset ligt voor alle geabonneerde onderwerpen. |
sources.metrics.estimatedTotalBytesBehindLatest |
Het geschatte aantal bytes dat het queryproces niet heeft verbruikt uit de geabonneerde onderwerpen. |
sources.metrics.maxOffsetsBehindLatest |
Het maximum aantal offsets dat de streamingquery zich achter de meest recente beschikbare offset bevindt voor alle geabonneerde onderwerpen. |
sources.metrics.minOffsetsBehindLatest |
Het minimale aantal offsets dat de streamingquery zich achter de meest recente beschikbare offset bevindt voor alle geabonneerde onderwerpen. |
sink-object (Kafka)
Metrisch | Beschrijving |
---|---|
sink.description |
De beschrijving van de Kafka-sink waarnaar de streamingquery schrijft, waarin de specifieke Kafka-sink-implementatie wordt beschreven die wordt gebruikt. Voorbeeld: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” . |
sink.numOutputRows |
Het aantal rijen dat als onderdeel van de microbatch naar de uitvoertabel of sink is geschreven. In sommige situaties kan deze waarde '-1' zijn en over het algemeen worden geïnterpreteerd als 'onbekend'. |
bronobject (Delta Lake)
Metrisch | Beschrijving |
---|---|
sources.description |
De beschrijving van de bron waaruit de streamingquery leest. Voorbeeld: “DeltaSource[table]” . |
sources.[startOffset/endOffset].sourceVersion |
De versie van serialisatie waarmee deze offset wordt gecodeerd. |
sources.[startOffset/endOffset].reservoirId |
De id van de tabel die wordt gelezen. Dit wordt gebruikt om onjuiste configuratie te detecteren bij het opnieuw opstarten van een query. |
sources.[startOffset/endOffset].reservoirVersion |
De versie van de tabel die momenteel wordt verwerkt. |
sources.[startOffset/endOffset].index |
De index in de volgorde van AddFiles in deze versie. Dit wordt gebruikt om grote doorvoeringen in meerdere batches op te splitsen. Deze index wordt gemaakt door te sorteren op modificationTimestamp en path . |
sources.[startOffset/endOffset].isStartingVersion |
Hiermee wordt aangegeven of de huidige offset het begin van een nieuwe streamingquery markeert in plaats van de verwerking van wijzigingen die zijn opgetreden nadat de initiële gegevens zijn verwerkt. Wanneer u een nieuwe query start, worden alle gegevens die aanwezig zijn in de tabel aan het begin eerst verwerkt en vervolgens alle nieuwe gegevens die binnenkomen. |
sources.latestOffset |
De meest recente offset die wordt verwerkt door de microbatch-query. |
sources.numInputRows |
Het aantal invoerrijen dat uit deze bron is verwerkt. |
sources.inputRowsPerSecond |
De snelheid waarmee gegevens binnenkomen voor verwerking vanuit deze bron. |
sources.processedRowsPerSecond |
De snelheid waarmee Spark gegevens van deze bron verwerkt. |
sources.metrics.numBytesOutstanding |
De gecombineerde grootte van de openstaande bestanden (bestanden bijgehouden door RocksDB). Dit is de metrische achterstand voor Delta en Auto Loader als streamingbron. |
sources.metrics.numFilesOutstanding |
Het aantal openstaande bestanden dat moet worden verwerkt. Dit is de metrische achterstand voor Delta en Auto Loader als streamingbron. |
sink-object (Delta Lake)
Metrisch | Beschrijving |
---|---|
sink.description |
De beschrijving van de Delta-sink, met informatie over de specifieke Implementatie van de Delta-sink die wordt gebruikt. Voorbeeld: “DeltaSink[table]” . |
sink.numOutputRows |
Het aantal rijen is altijd -1, omdat Spark geen uitvoerrijen voor DSv1-sinks kan afleiden. Dit is de classificatie voor de Delta Lake-sink. |
Voorbeelden
Voorbeeld van kafka-naar-Kafka StreamingQueryListener-gebeurtenis
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Voorbeeld van een Delta Lake-naar-Delta Lake StreamingQueryListener-gebeurtenis
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Voorbeeld van de gebeurtenis Kinesis-to-Delta Lake StreamingQueryListener
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Voorbeeld van kafka+Delta Lake-to-Delta Lake StreamingQueryListener-gebeurtenis
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Voorbeeld van frequentiebron naar Delta Lake StreamingQueryListener-gebeurtenis
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}