Monitorování dotazů strukturovaného streamování v Azure Databricks
Azure Databricks poskytuje integrované monitorování pro aplikace strukturovaného streamování prostřednictvím uživatelského rozhraní Sparku na kartě Streamování .
Rozlišení dotazů strukturovaného streamování v uživatelském rozhraní Sparku
Zadejte jedinečný název dotazu přidáním .queryName(<query-name>)
do writeStream
kódu, abyste snadno rozlišili metriky, ke kterým streamům patří v uživatelském rozhraní Sparku.
Nabízení metrik strukturovaného streamování do externích služeb
Metriky streamování je možné odeslat do externích služeb pro upozorňování nebo případy použití řídicích panelů pomocí rozhraní naslouchacího procesu dotazů streamování Apache Sparku. Ve službě Databricks Runtime 11.3 LTS a novější je naslouchací proces streamování dotazů dostupný v Pythonu a Scala.
Důležité
Přihlašovací údaje a objekty spravované katalogem Unity nelze použít v StreamingQueryListener
logice.
Poznámka:
Latence zpracování s naslouchacími procesy může výrazně ovlivnit rychlost zpracování dotazů. Doporučujeme omezit logiku zpracování v těchto naslouchacích procesech a rozhodnout se psát do systémů rychlé odezvy, jako je Kafka, kvůli efektivitě.
Následující kód obsahuje základní příklady syntaxe pro implementaci naslouchacího procesu:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Definování pozorovatelných metrik ve strukturovaném streamování
Pozorovatelné metriky se nazývají libovolné agregační funkce, které lze definovat v dotazu (DataFrame). Jakmile provádění datového rámce dosáhne bodu dokončení (tj. dokončí dávkové dotaz nebo dosáhne epochy streamování), vygeneruje se pojmenovaná událost obsahující metriky pro data zpracovávaná od posledního bodu dokončení.
Tyto metriky můžete sledovat připojením naslouchacího procesu k relaci Sparku. Naslouchací proces závisí na režimu spuštění:
Režim dávky: Použijte
QueryExecutionListener
.QueryExecutionListener
je volána po dokončení dotazu. Získejte přístup k metrikám pomocíQueryExecution.observedMetrics
mapy.Streamování nebo mikrobatch: Použijte
StreamingQueryListener
.StreamingQueryListener
se volá, když streamovací dotaz dokončí epochu. Získejte přístup k metrikám pomocíStreamingQueryProgress.observedMetrics
mapy. Azure Databricks nepodporuje streamování průběžného spouštění.
Příklad:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Metriky objektů StreamingQueryListener
Metrický | Popis |
---|---|
id |
Jedinečné ID dotazu, které se zachová při restartování. |
runId |
ID dotazu, které je jedinečné pro každé spuštění/restartování. Viz StreamingQuery.runId(). |
name |
Uživatelem zadaný název dotazu. Název má hodnotu null, pokud není zadán žádný název. |
timestamp |
Časové razítko pro spuštění mikrobatchu. |
batchId |
Jedinečné ID pro aktuální dávku zpracovávaných dat. V případě opakování po selhání může být dané ID dávky provedeno vícekrát. Podobně platí, že pokud neexistují žádná data ke zpracování, ID dávky se nezvýší. |
numInputRows |
Agregace (napříč všemi zdroji) počtu záznamů zpracovaných v triggeru |
inputRowsPerSecond |
Míra příchozích dat (napříč všemi zdroji) |
processedRowsPerSecond |
Agregace (napříč všemi zdroji) s jakou Spark zpracovává data. |
DurationMs – objekt
Informace o době potřebnou k dokončení různých fází procesu provádění mikrobatchu.
Metrický | Popis |
---|---|
durationMs.addBatch |
Doba potřebná ke spuštění mikrobatchu. Tím se vyloučí čas potřebný k naplánování mikrobatchu. |
durationMs.getBatch |
Doba potřebnou k načtení metadat o posunech ze zdroje. |
durationMs.latestOffset |
Nejnovější posun spotřebovaný pro mikrobatch. Tento objekt průběhu odkazuje na čas potřebný k načtení nejnovějšího posunu ze zdrojů. |
durationMs.queryPlanning |
Doba potřebná ke generování plánu provádění. |
durationMs.triggerExecution |
Doba, která trvá naplánování a spuštění mikrobatchu. |
durationMs.walCommit |
Doba potřebná k potvrzení nových dostupných posunů. |
eventTime – objekt
Informace o hodnotě času události, které se zobrazují v datech zpracovávaných v mikrobatchu. Tato data vodoznak používá k zjištění, jak oříznout stav stavových agregací definovaných v úloze strukturovaného streamování.
Metrický | Popis |
---|---|
eventTime.avg |
Průměrná doba události zobrazená v této aktivační události. |
eventTime.max |
Maximální doba události zobrazená v daném triggeru. |
eventTime.min |
Minimální doba události zobrazená v této aktivační události. |
eventTime.watermark |
Hodnota vodoznaku použitého v této aktivační události. |
stateOperators – objekt
Informace o stavových operacích, které jsou definovány v úloze strukturovaného streamování a agregace vytvořené z nich.
Metrický | Popis |
---|---|
stateOperators.operatorName |
Název stavového operátoru, ke kterému se metriky vztahují, například symmetricHashJoin , dedupe . stateStoreSave |
stateOperators.numRowsTotal |
Celkový počet řádků ve stavu v důsledku stavového operátoru nebo agregace. |
stateOperators.numRowsUpdated |
Celkový počet řádků aktualizovaných ve stavu v důsledku stavového operátoru nebo agregace. |
stateOperators.allUpdatesTimeMs |
Tato metrika v současné době není měřitelná sparkem a plánuje se odebrat v budoucích aktualizacích. |
stateOperators.numRowsRemoved |
Celkový počet řádků odebraných ze stavu v důsledku stavového operátoru nebo agregace. |
stateOperators.allRemovalsTimeMs |
Tato metrika v současné době není měřitelná sparkem a plánuje se odebrat v budoucích aktualizacích. |
stateOperators.commitTimeMs |
Doba potřebná k potvrzení všech aktualizací (vložení a odebrání) a vrácení nové verze. |
stateOperators.memoryUsedBytes |
Paměť používaná úložištěm stavů. |
stateOperators.numRowsDroppedByWatermark |
Počet řádků, které jsou považovány za příliš pozdě, aby se zahrnuly do stavové agregace. Pouze agregace streamování: Počet vynechaných řádků po agregaci (ne nezpracovaných vstupních řádků). Toto číslo není přesné, ale poskytuje indikaci, že dochází k pozdnímu vyřazení dat. |
stateOperators.numShufflePartitions |
Počet oddílů pro náhodné prohazování tohoto stavového operátoru. |
stateOperators.numStateStoreInstances |
Skutečná instance úložiště stavu, kterou operátor inicializoval a zachoval. U mnoha stavových operátorů je to stejné jako počet oddílů. Spojení stream-stream však inicializují čtyři instance úložiště stavu na oddíl. |
stateOperators.customMetrics – objekt
Informace shromážděné ze služby RocksDB zachycující metriky týkající se výkonu a operací s ohledem na stavové hodnoty, které udržuje pro úlohu strukturovaného streamování. Další informace najdete v tématu Konfigurace úložiště stavů RocksDB v Azure Databricks.
Metrický | Popis |
---|---|
customMetrics.rocksdbBytesCopied |
Počet bajtů zkopírovaných podle sledování správcem souborů RocksDB. |
customMetrics.rocksdbCommitCheckpointLatency |
Čas v milisekundách pořídí snímek nativní databáze RocksDB a zapíše ho do místního adresáře. |
customMetrics.rocksdbCompactLatency |
Doba komprimace v milisekundách (volitelné) během potvrzení kontrolního bodu. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
Čas synchronizace nativního snímku RocksDB s externím úložištěm (umístění kontrolního bodu) v milisekundách |
customMetrics.rocksdbCommitFlushLatency |
Čas v milisekundách vyprázdnění paměti RocksDB se změní na místní disk. |
customMetrics.rocksdbCommitPauseLatency |
Doba v milisekundách zastaví pracovní vlákna na pozadí jako součást potvrzení kontrolního bodu, například pro komprimace. |
customMetrics.rocksdbCommitWriteBatchLatency |
Čas v milisekundách, který používá fázované zápisy ve struktuře v paměti (WriteBatch ) na nativní rocksDB. |
customMetrics.rocksdbFilesCopied |
Počet souborů zkopírovaných podle sledování správcem souborů RocksDB. |
customMetrics.rocksdbFilesReused |
Početsouborůch |
customMetrics.rocksdbGetCount |
Počet get volání do databáze (nezahrnuje gets dávku WriteBatch v paměti použitou pro přípravné zápisy). |
customMetrics.rocksdbGetLatency |
Průměrná doba v nanosekundách základního nativního RocksDB::Get volání. |
customMetrics.rocksdbReadBlockCacheHitCount |
Počet přístupů do mezipaměti bloků ve službě RocksDB, které jsou užitečné při zabránění čtení místních disků. |
customMetrics.rocksdbReadBlockCacheMissCount |
Počet mezipaměti bloků ve službě RocksDB není užitečný při zabránění čtení místních disků. |
customMetrics.rocksdbSstFileSize |
Velikost všech souborů se statickou seřazenou tabulkou (SST) – tabulková struktura RocksDB používá k ukládání dat. |
customMetrics.rocksdbTotalBytesRead |
Počet nekomprimovaných bajtů přečtených operacemi get . |
customMetrics.rocksdbTotalBytesReadByCompaction |
Počet bajtů, které proces komprimace načítá z disku. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Celkový počet bajtů nekomprimovaných dat přečtených pomocí iterátoru. Některé stavové operace (například zpracování FlatMapGroupsWithState časového limitu a vodoznaku) vyžadují čtení dat v databázi prostřednictvím iterátoru. |
customMetrics.rocksdbTotalBytesWritten |
Celkový počet nekomprimovaných bajtů zapsaných operacemi put . |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
Celkový počet bajtů, které proces komprimace zapisuje na disk. |
customMetrics.rocksdbTotalCompactionLatencyMs |
Čas v milisekundách pro komprimace RocksDB, včetně komprimací pozadí a volitelné komprimace zahájené během potvrzení. |
customMetrics.rocksdbTotalFlushLatencyMs |
Celkový čas vyprázdnění, včetně vyprazdňování na pozadí. Operace vyprázdnění jsou procesy, kterými MemTable se vyprázdní do úložiště, jakmile je zaplněno. MemTables jsou první úroveň, ve které jsou data uložená ve službě RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
Velikost v bajtech nekomprimovaných souborů ZIP hlášených správcem souborů. Správce souborů spravuje využití a odstranění fyzického místa na disku se soubory SST. |
source – objekt (Kafka)
Metrický | Popis |
---|---|
sources.description |
Podrobný popis zdroje Kafka, který určuje přesné téma Kafka, ze kterého se čte. Například: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” . |
sources.startOffset objekt |
Počáteční číslo posunu v tématu Kafka, ve kterém byla spuštěna úloha streamování. |
sources.endOffset objekt |
Poslední posun zpracovaný mikrobatchem. To se může rovnat latestOffset probíhajícímu provádění mikrobatchu. |
sources.latestOffset objekt |
Poslední posun z obrázku microbatch. Proces mikrobatchingu nemusí zpracovat všechny posuny v případě omezování, což vede k endOffset rozdílu a latestOffset rozdílu. |
sources.numInputRows |
Počet vstupních řádků zpracovaných z tohoto zdroje |
sources.inputRowsPerSecond |
Rychlost, s jakou data přicházejí ke zpracování z tohoto zdroje. |
sources.processedRowsPerSecond |
Rychlost, s jakou Spark zpracovává data z tohoto zdroje. |
sources.metrics – objekt (Kafka)
Metrický | Popis |
---|---|
sources.metrics.avgOffsetsBehindLatest |
Průměrný počet posunů, které streamovací dotaz stojí za nejnovějším dostupným posunem mezi všemi předplatným tématy. |
sources.metrics.estimatedTotalBytesBehindLatest |
Odhadovaný početch |
sources.metrics.maxOffsetsBehindLatest |
Maximální počet posunů, které streamovací dotaz obsahuje, je za nejnovějším dostupným posunem mezi všemi předplatným tématy. |
sources.metrics.minOffsetsBehindLatest |
Minimální počet posunů, které streamovací dotaz stojí za nejnovějším dostupným posunem mezi všemi předplatným tématy. |
objekt jímky (Kafka)
Metrický | Popis |
---|---|
sink.description |
Popis jímky Kafka, do které dotaz streamování píše, podrobně popisuje konkrétní použitou implementaci jímky Kafka. Například: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” . |
sink.numOutputRows |
Počet řádků, které byly zapsány do výstupní tabulky nebo jímky jako součást mikrobatchu. V některých situacích může být tato hodnota -1 a obecně ji lze interpretovat jako "neznámá". |
sources – objekt (Delta Lake)
Metrický | Popis |
---|---|
sources.description |
Popis zdroje, ze kterého streamovací dotaz čte. Například: “DeltaSource[table]” . |
sources.[startOffset/endOffset].sourceVersion |
Verze serializace, se kterou je tento posun kódován. |
sources.[startOffset/endOffset].reservoirId |
ID přečtené tabulky. Používá se k detekci chybné konfigurace při restartování dotazu. |
sources.[startOffset/endOffset].reservoirVersion |
Verze tabulky, která se právě zpracovává |
sources.[startOffset/endOffset].index |
Index v posloupnosti AddFiles této verze. Slouží k přerušení velkých potvrzení do více dávek. Tento index je vytvořen řazením modificationTimestamp a path . |
sources.[startOffset/endOffset].isStartingVersion |
Určuje, zda aktuální posun označuje začátek nového streamovacího dotazu místo zpracování změn, ke kterým došlo po počátečním zpracování dat. Při spuštění nového dotazu se nejprve zpracuje všechna data, která jsou v tabulce na začátku, a pak všechna nová data, která dorazí. |
sources.latestOffset |
Nejnovější posun zpracovaný dotazem microbatch. |
sources.numInputRows |
Počet vstupních řádků zpracovaných z tohoto zdroje |
sources.inputRowsPerSecond |
Rychlost, s jakou data přicházejí ke zpracování z tohoto zdroje. |
sources.processedRowsPerSecond |
Rychlost, s jakou Spark zpracovává data z tohoto zdroje. |
sources.metrics.numBytesOutstanding |
Kombinovaná velikost nevyřízených souborů (soubory sledované rocksDB) Jedná se o metriku backlogu pro Rozdílový a automatický zavaděč jako zdroj streamování. |
sources.metrics.numFilesOutstanding |
Počet nevyřízených souborů, které se mají zpracovat. Jedná se o metriku backlogu pro Rozdílový a automatický zavaděč jako zdroj streamování. |
objekt jímky (Delta Lake)
Metrický | Popis |
---|---|
sink.description |
Popis jímky Delta podrobně popisuje konkrétní použitou implementaci jímky Delta. Například: “DeltaSink[table]” . |
sink.numOutputRows |
Počet řádků je vždy -1, protože Spark nemůže odvodit výstupní řádky pro jímky DSv1, což je klasifikace jímky Delta Lake. |
Příklady
Příklad události Kafka-to-Kafka StreamingQueryListener
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Příklad události Delta Lake-to-Delta Lake StreamingQueryListener
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Příklad události Kinesis-to-Delta Lake StreamingQueryListener
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Příklad události Kafka+Delta Lake-to-Delta Lake StreamingQueryListener
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Příklad zdroje rychlosti do události Delta Lake StreamingQueryListener
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}