Query's voor gestructureerd streamen bewaken in Azure Databricks
Azure Databricks biedt ingebouwde bewaking voor structured streaming-toepassingen via de Spark-gebruikersinterface op het tabblad Streaming .
Structured Streaming-query's onderscheiden in de Spark-gebruikersinterface
Geef uw streams een unieke querynaam op door toe te voegen .queryName(<query-name>)
aan uw writeStream
code om eenvoudig te onderscheiden welke metrische gegevens tot welke stream behoren in de Spark-gebruikersinterface.
Metrische gegevens over gestructureerd streamen naar externe services pushen
Metrische streaminggegevens kunnen worden gepusht naar externe services voor waarschuwingen of gebruiksvoorbeelden voor dashboards met behulp van de interface streamingquerylistener van Apache Spark. In Databricks Runtime 11.3 LTS en hoger is de Streaming Query Listener beschikbaar in Python en Scala.
Belangrijk
Referenties en objecten die worden beheerd door Unity Catalog kunnen niet worden gebruikt in StreamingQueryListener
logica.
Notitie
De verwerkingslatentie die is gekoppeld aan listeners, kan de verwerking van query's nadelig beïnvloeden. Databricks raadt aan om verwerkingslogica in deze listeners te minimaliseren en te schrijven naar sinks met lage latentie, zoals Kafka.
De volgende code bevat basisvoorbeelden van de syntaxis voor het implementeren van een listener:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Waarneembare metrische gegevens definiëren in Structured Streaming
Waarneembare metrische gegevens worden willekeurige statistische functies genoemd die kunnen worden gedefinieerd voor een query (DataFrame). Zodra de uitvoering van een DataFrame een voltooiingspunt bereikt (dat wil gezegd, voltooit u een batchquery of bereikt u een streaming-epoch), wordt een benoemde gebeurtenis verzonden die de metrische gegevens bevat voor de gegevens die zijn verwerkt sinds het laatste voltooiingspunt.
U kunt deze metrische gegevens bekijken door een listener toe te voegen aan de Spark-sessie. De listener is afhankelijk van de uitvoeringsmodus:
Batchmodus: Gebruiken
QueryExecutionListener
.QueryExecutionListener
wordt aangeroepen wanneer de query is voltooid. Toegang tot de metrische gegevens met behulp van deQueryExecution.observedMetrics
kaart.Streaming of microbatch: Gebruik
StreamingQueryListener
.StreamingQueryListener
wordt aangeroepen wanneer de streamingquery een periode voltooit. Toegang tot de metrische gegevens met behulp van deStreamingQueryProgress.observedMetrics
kaart. Azure Databricks biedt geen ondersteuning voor continue uitvoering van streaming.
Voorbeeld:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Metrische gegevens van streamingQueryListener-objecten
Metrisch | Beschrijving |
---|---|
id |
Unieke query-id die zich blijft voordoen bij opnieuw opstarten. Zie StreamingQuery.id(). |
runId |
Unieke query-id voor elke start of herstart. Zie StreamingQuery.runId(). |
name |
Door de gebruiker opgegeven naam van de query. Null indien niet opgegeven. |
timestamp |
Tijdstempel voor de uitvoering van de microbatch. |
batchId |
Unieke id voor de huidige batch met gegevens die worden verwerkt. Houd er rekening mee dat in het geval van nieuwe pogingen na een fout een bepaalde batch-id meerdere keren kan worden uitgevoerd. Op dezelfde manier wordt de batch-id niet verhoogd wanneer er geen gegevens moeten worden verwerkt. |
numInputRows |
Aggregeren (in alle bronnen) het aantal records dat in een trigger is verwerkt. |
inputRowsPerSecond |
Aggregeren (in alle bronnen) het aantal binnenkomende gegevens. |
processedRowsPerSecond |
Aggregeringssnelheid (voor alle bronnen) waarbij Spark gegevens verwerkt. |
durationMs-object
Informatie over de tijd die nodig is om verschillende fasen van het microbatchuitvoeringsproces te voltooien.
Metrisch | Beschrijving |
---|---|
durationMs.addBatch |
De tijd die nodig is om de microbatch uit te voeren. Dit sluit de tijd die Spark nodig heeft om de microbatch te plannen. |
durationMs.getBatch |
De tijd die nodig is om de metagegevens over de offsets van de bron op te halen. |
durationMs.latestOffset |
Meest recente offset verbruikt voor de microbatch. Dit voortgangsobject verwijst naar de tijd die nodig is om de meest recente offset van bronnen op te halen. |
durationMs.queryPlanning |
De tijd die nodig is om het uitvoeringsplan te genereren. |
durationMs.triggerExecution |
De tijd die nodig is om de microbatch te plannen en uit te voeren. |
durationMs.walCommit |
De tijd die nodig is om de nieuwe beschikbare offsets vast te leggen. |
eventTime-object
Informatie over de waarde van de gebeurtenistijd die wordt weergegeven in de gegevens die in de microbatch worden verwerkt. Deze gegevens worden door het watermerk gebruikt om erachter te komen hoe u de status kunt bijsnijden voor het verwerken van stateful aggregaties die zijn gedefinieerd in de structured streaming-taak.
Metrisch | Beschrijving |
---|---|
eventTime.avg |
De gemiddelde gebeurtenistijd die in de trigger wordt weergegeven. |
eventTime.max |
Maximale gebeurtenistijd die wordt weergegeven in de trigger. |
eventTime.min |
Minimale gebeurtenistijd die wordt weergegeven in de trigger. |
eventTime.watermark |
Waarde van het watermerk dat in de trigger wordt gebruikt. |
stateOperators-object
Informatie over de stateful bewerkingen die zijn gedefinieerd in de Structured Streaming-taak en de aggregaties die daaruit worden geproduceerd.
Metrisch | Beschrijving |
---|---|
stateOperators.operatorName |
Naam van de stateful operator waarmee de metrische gegevens betrekking hebben. Bijvoorbeeld, symmetricHashJoin , dedupe . stateStoreSave |
stateOperators.numRowsTotal |
Het aantal rijen in de status als gevolg van de stateful operator of aggregatie. |
stateOperators.numRowsUpdated |
Het aantal rijen dat in de status is bijgewerkt als gevolg van de stateful operator of aggregatie. |
stateOperators.numRowsRemoved |
Het aantal rijen dat uit de status is verwijderd als gevolg van de stateful operator of aggregatie. |
stateOperators.commitTimeMs |
De tijd die nodig is om alle updates door te voeren (plaatst en verwijdert) en een nieuwe versie retourneert. |
stateOperators.memoryUsedBytes |
Geheugen dat wordt gebruikt door het statusarchief. |
stateOperators.numRowsDroppedByWatermark |
Het aantal rijen dat te laat wordt beschouwd om te worden opgenomen in de stateful aggregatie. Alleen streamingaggregaties: het aantal rijen dat na de aggregatie is verwijderd en geen onbewerkte invoerrijen. Het getal is niet nauwkeurig, maar kan erop wijzen dat late gegevens worden verwijderd. |
stateOperators.numShufflePartitions |
Aantal willekeurige partities voor deze stateful operator. |
stateOperators.numStateStoreInstances |
Het werkelijke exemplaar van het statusarchief dat de operator heeft geïnitialiseerd en onderhouden. In veel stateful operators is dit hetzelfde als het aantal partities, maar stream-stream join initialiseert vier statusopslagexemplaren per partitie. |
stateOperators.customMetrics-object
Informatie verzameld uit RocksDB die metrische gegevens vastlegt over de prestaties en bewerkingen met betrekking tot de stateful waarden die worden onderhouden voor de Structured Streaming-taak. Zie RocksDB-statusopslag configureren in Azure Databricks voor meer informatie.
Metrisch | Beschrijving |
---|---|
customMetrics.rocksdbBytesCopied |
Het aantal bytes dat is gekopieerd als bijgehouden door RocksDB File Manager. |
customMetrics.rocksdbCommitCheckpointLatency |
Tijd in milliseconden om een momentopname van systeemeigen RocksDB te maken en naar een lokale map te schrijven. |
customMetrics.rocksdbCompactLatency |
Tijd in milliseconden voor compressie (optioneel) tijdens het doorvoeren van het controlepunt. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
Tijd in milliseconden om de systeemeigen RocksDB-momentopname gerelateerde bestanden te synchroniseren met een externe opslag (controlepuntlocatie). |
customMetrics.rocksdbCommitFlushLatency |
Tijd in milliseconden om de wijzigingen in het geheugen van RocksDB in het geheugen op uw lokale schijf leeg te maken. |
customMetrics.rocksdbCommitPauseLatency |
Tijd in milliseconden om de achtergrondwerkrolthreads (bijvoorbeeld voor compressie) te stoppen als onderdeel van de controlepuntdoorvoering. |
customMetrics.rocksdbCommitWriteBatchLatency |
Tijd in milliseconden om de gefaseerde schrijfbewerkingen in de geheugenstructuur (WriteBatch ) toe te passen op systeemeigen RocksDB. |
customMetrics.rocksdbFilesCopied |
Het aantal bestanden dat is gekopieerd als bijgehouden door RocksDB File Manager. |
customMetrics.rocksdbFilesReused |
Het aantal bestanden dat opnieuw wordt gebruikt als bijgehouden door RocksDB File Manager. |
customMetrics.rocksdbGetCount |
get Aantal aanroepen naar de database (dit omvat gets niet vanWriteBatch : In-memory batch die wordt gebruikt voor faseringsschrijfbewerkingen). |
customMetrics.rocksdbGetLatency |
Gemiddelde tijd in nanoseconden voor de onderliggende systeemeigen RocksDB::Get aanroep. |
customMetrics.rocksdbReadBlockCacheHitCount |
Hoeveel van de blokcache in RocksDB is nuttig of niet en vermijdt leesbewerkingen van lokale schijven. |
customMetrics.rocksdbReadBlockCacheMissCount |
Hoeveel van de blokcache in RocksDB is nuttig of niet en vermijdt leesbewerkingen van lokale schijven. |
customMetrics.rocksdbSstFileSize |
Grootte van alle SST-bestanden. SST staat voor Statische gesorteerde tabel. Dit is de tabellaire structuur die RocksDB gebruikt om gegevens op te slaan. |
customMetrics.rocksdbTotalBytesRead |
Aantal niet-gecomprimeerde bytes gelezen door get bewerkingen. |
customMetrics.rocksdbTotalBytesReadByCompaction |
Het aantal bytes dat door het compressieproces van de schijf wordt gelezen. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Voor sommige stateful bewerkingen (bijvoorbeeld time-outverwerking in FlatMapGroupsWithState en watermerken) moeten gegevens in DB worden gelezen via een iterator. Deze metrische waarde vertegenwoordigt de grootte van niet-gecomprimeerde gegevens die worden gelezen met behulp van de iterator. |
customMetrics.rocksdbTotalBytesWritten |
Aantal niet-gecomprimeerde bytes geschreven door put bewerkingen. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
Het aantal bytes dat het compressieproces naar de schijf schrijft. |
customMetrics.rocksdbTotalCompactionLatencyMs |
Tijd milliseconden voor RocksDB-compressies, inclusief achtergrondcompressies en de optionele compressie die tijdens de doorvoer is geïnitieerd. |
customMetrics.rocksdbTotalFlushLatencyMs |
Spoeltijd, inclusief het leegmaken van de achtergrond. Flush-bewerkingen zijn processen waarmee de MemTable wordt leeggemaakt naar de opslag zodra deze vol is. MemTables zijn het eerste niveau waarop gegevens worden opgeslagen in RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
RocksDB File Manager beheert het fysieke SST-bestandsruimtegebruik en -verwijdering. Deze metrische waarde vertegenwoordigt de niet-gecomprimeerde zip-bestanden in bytes, zoals gerapporteerd door Bestandsbeheer. |
bronobject (Kafka)
Metrisch | Beschrijving |
---|---|
sources.description |
De naam van de bron waaruit de streamingquery leest. Bijvoorbeeld: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” . |
sources.startOffset Object |
Begin van het offsetnummer in het Kafka-onderwerp waarmee de streamingtaak is gestart. |
sources.endOffset Object |
Meest recente offset verwerkt door de microbatch. Dit kan gelijk zijn aan latestOffset een doorlopende uitvoering van microbatch. |
sources.latestOffset Object |
De meest recente offset die wordt bepaald door de microbatch. Wanneer er sprake is van beperking, verwerkt het microbatchproces mogelijk niet alle offsets, waardoor endOffset en latestOffset verschilt. |
sources.numInputRows |
Het aantal invoerrijen dat uit deze bron is verwerkt. |
sources.inputRowsPerSecond |
Snelheid waarmee gegevens binnenkomen voor verwerking voor deze bron. |
sources.processedRowsPerSecond |
Snelheid waarmee Spark gegevens verwerkt voor deze bron. |
sources.metrics-object (Kafka)
Metrisch | Beschrijving |
---|---|
sources.metrics.avgOffsetsBehindLatest |
Het gemiddelde aantal offsets dat de streamingquery zich achter de meest recente beschikbare offset bevindt voor alle geabonneerde onderwerpen. |
sources.metrics.estimatedTotalBytesBehindLatest |
Het geschatte aantal bytes dat het queryproces niet heeft verbruikt vanuit de geabonneerde onderwerpen. |
sources.metrics.maxOffsetsBehindLatest |
Maximum aantal offsets dat de streamingquery zich achter de meest recente beschikbare offset bevindt voor alle geabonneerde onderwerpen. |
sources.metrics.minOffsetsBehindLatest |
Minimaal aantal offsets dat de streamingquery zich achter de meest recente beschikbare offset bevindt voor alle geabonneerde onderwerpen. |
sink-object (Kafka)
Metrisch | Beschrijving |
---|---|
sink.description |
De naam van de sink waar de streamingquery naar schrijft. Bijvoorbeeld: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” . |
sink.numOutputRows |
Het aantal rijen dat als onderdeel van de microbatch naar de uitvoertabel of sink is geschreven. In sommige situaties kan deze waarde '-1' zijn en over het algemeen worden geïnterpreteerd als 'onbekend'. |
bronobject (Delta Lake)
Metrisch | Beschrijving |
---|---|
sources.description |
De naam van de bron waaruit de streamingquery leest. Bijvoorbeeld: “DeltaSource[table]” . |
sources.[startOffset/endOffset].sourceVersion |
Versie van serialisatie waarmee deze offset is gecodeerd. |
sources.[startOffset/endOffset].reservoirId |
Id van de tabel waaruit u leest. Dit wordt gebruikt om onjuiste configuratie te detecteren bij het opnieuw opstarten van een query. |
sources.[startOffset/endOffset].reservoirVersion |
Versie van de tabel die u momenteel verwerkt. |
sources.[startOffset/endOffset].index |
Indexeren in de volgorde van AddFiles in deze versie. Dit wordt gebruikt om grote doorvoeringen in meerdere batches op te splitsen. Deze index wordt gemaakt door te sorteren op modificationTimestamp en path . |
sources.[startOffset/endOffset].isStartingVersion |
Of deze offset een query aangeeft die wordt gestart in plaats van wijzigingen te verwerken. Wanneer u een nieuwe query start, worden alle gegevens die aanwezig zijn in de tabel aan het begin verwerkt en vervolgens nieuwe gegevens die zijn aangekomen. |
sources.latestOffset |
Meest recente offset verwerkt door de microbatch-query. |
sources.numInputRows |
Het aantal invoerrijen dat uit deze bron is verwerkt. |
sources.inputRowsPerSecond |
Snelheid waarmee gegevens binnenkomen voor verwerking voor deze bron. |
sources.processedRowsPerSecond |
Snelheid waarmee Spark gegevens verwerkt voor deze bron. |
sources.metrics.numBytesOutstanding |
Grootte van de openstaande bestanden (bestanden bijgehouden door RocksDB) gecombineerd. Dit is de metrische achterstand voor Delta en Auto Loader als streamingbron. |
sources.metrics.numFilesOutstanding |
Aantal openstaande bestanden dat moet worden verwerkt. Dit is de metrische achterstand voor Delta en Auto Loader als streamingbron. |
sink-object (Delta Lake)
Metrisch | Beschrijving |
---|---|
sink.description |
Naam van de sink waarnaar de streamingquery schrijft. Bijvoorbeeld: “DeltaSink[table]” . |
sink.numOutputRows |
Het aantal rijen in deze metrische waarde is -1 omdat Spark geen uitvoerrijen voor DSv1-sinks kan afleiden. Dit is de classificatie voor de Delta Lake-sink. |
Voorbeelden
Voorbeeld van kafka-naar-Kafka StreamingQueryListener-gebeurtenis
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Voorbeeld van een Delta Lake-naar-Delta Lake StreamingQueryListener-gebeurtenis
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Voorbeeld van frequentiebron naar Delta Lake StreamingQueryListener-gebeurtenis
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}