Monitorowanie zapytań strukturalnego przesyłania strumieniowego w usłudze Azure Databricks
Usługa Azure Databricks zapewnia wbudowane monitorowanie aplikacji do przesyłania strumieniowego ze strukturą za pośrednictwem interfejsu użytkownika platformy Spark na karcie Przesyłanie strumieniowe .
Rozróżnianie zapytań przesyłania strumieniowego ze strukturą w interfejsie użytkownika platformy Spark
Podaj unikatową nazwę zapytania, dodając .queryName(<query-name>)
do writeStream
kodu, aby łatwo odróżnić metryki, do których należy strumień w interfejsie użytkownika platformy Spark.
Wypychanie metryk przesyłania strumieniowego ze strukturą do usług zewnętrznych
Metryki przesyłania strumieniowego można wypychać do usług zewnętrznych na potrzeby zgłaszania alertów lub pulpitów nawigacyjnych przy użyciu interfejsu odbiornika zapytań przesyłania strumieniowego platformy Apache Spark. W środowisku Databricks Runtime 11.3 LTS i nowszym odbiornik zapytań przesyłania strumieniowego jest dostępny w językach Python i Scala.
Ważne
Nie można używać poświadczeń i obiektów zarządzanych przez wykaz aparatu Unity w StreamingQueryListener
logice.
Uwaga
Opóźnienie przetwarzania skojarzone z odbiornikami może mieć negatywny wpływ na przetwarzanie zapytań. Usługa Databricks zaleca zminimalizowanie logiki przetwarzania w tych odbiornikach i zapisywanie w ujściach o małych opóźnieniach, takich jak kafka.
Poniższy kod zawiera podstawowe przykłady składni implementowania odbiornika:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Definiowanie obserwowalnych metryk w strumieniu ze strukturą
Obserwowane metryki są nazywane dowolnymi funkcjami agregacji, które można zdefiniować w zapytaniu (DataFrame). Gdy tylko wykonanie ramki danych osiągnie punkt ukończenia (czyli kończy zapytanie wsadowe lub osiąga epokę przesyłania strumieniowego), emitowane jest nazwane zdarzenie zawierające metryki dla danych przetworzonych od ostatniego punktu ukończenia.
Możesz obserwować te metryki, dołączając odbiornik do sesji platformy Spark. Odbiornik zależy od trybu wykonywania:
Tryb wsadowy: użyj polecenia
QueryExecutionListener
.QueryExecutionListener
jest wywoływana po zakończeniu zapytania. Uzyskaj dostęp do metryk przy użyciuQueryExecution.observedMetrics
mapy.Przesyłanie strumieniowe lub mikrosadowe: użyj polecenia
StreamingQueryListener
.StreamingQueryListener
jest wywoływana, gdy zapytanie przesyłane strumieniowo kończy epokę. Uzyskaj dostęp do metryk przy użyciuStreamingQueryProgress.observedMetrics
mapy. Usługa Azure Databricks nie obsługuje przesyłania strumieniowego ciągłego wykonywania.
Na przykład:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Metryki obiektu StreamingQueryListener
Metryczne | opis |
---|---|
id |
Unikatowy identyfikator zapytania, który będzie się powtarzać po ponownym uruchomieniu. Zobacz StreamingQuery.id(). |
runId |
Unikatowy identyfikator zapytania dla każdego uruchomienia lub ponownego uruchomienia. Zobacz StreamingQuery.runId(). |
name |
Określona przez użytkownika nazwa zapytania. Wartość null, jeśli nie zostanie określona. |
timestamp |
Sygnatura czasowa wykonania mikrosadowej partii. |
batchId |
Unikatowy identyfikator bieżącej partii przetwarzanych danych. Należy pamiętać, że w przypadku ponownych prób po awarii dany identyfikator partii można wykonać więcej niż raz. Podobnie, gdy nie ma danych do przetworzenia, identyfikator partii nie jest zwiększany. |
numInputRows |
Zagregowana (we wszystkich źródłach) liczba rekordów przetworzonych w wyzwalaczu. |
inputRowsPerSecond |
Zagregowany (we wszystkich źródłach) współczynnik przychodzących danych. |
processedRowsPerSecond |
Agregacja (we wszystkich źródłach) szybkość przetwarzania danych przez platformę Spark. |
durationMs, obiekt
Informacje o czasie potrzebnym na ukończenie różnych etapów procesu wykonywania mikrosadowego.
Metryczne | opis |
---|---|
durationMs.addBatch |
Czas potrzebny na wykonanie mikrobajtu. Wyklucza to czas, przez który platforma Spark planuje mikrobajt. |
durationMs.getBatch |
Czas potrzebny na pobranie metadanych dotyczących przesunięć ze źródła. |
durationMs.latestOffset |
Najnowsze przesunięcie używane dla mikrobatchu. Ten obiekt postępu odnosi się do czasu potrzebnego na pobranie najnowszego przesunięcia ze źródeł. |
durationMs.queryPlanning |
Czas potrzebny na wygenerowanie planu wykonania. |
durationMs.triggerExecution |
Czas potrzebny na zaplanowanie i wykonanie mikrobatchu. |
durationMs.walCommit |
Czas potrzebny na zatwierdzenie nowych dostępnych przesunięć. |
eventTime, obiekt
Informacje o wartości czasu zdarzenia widocznej w danych przetwarzanych w mikrosadowej partii. Te dane są używane przez znak wodny, aby dowiedzieć się, jak przyciąć stan przetwarzania agregacji stanowych zdefiniowanych w zadaniu przesyłania strumieniowego ze strukturą.
Metryczne | opis |
---|---|
eventTime.avg |
Średni czas zdarzenia widoczny w wyzwalaczu. |
eventTime.max |
Maksymalny czas zdarzenia widoczny w wyzwalaczu. |
eventTime.min |
Minimalny czas zdarzenia widoczny w wyzwalaczu. |
eventTime.watermark |
Wartość znaku wodnego używanego w wyzwalaczu. |
stateOperators, obiekt
Informacje o operacjach stanowych zdefiniowanych w zadaniu przesyłania strumieniowego ze strukturą i agregacjach generowanych z nich.
Metryczne | opis |
---|---|
stateOperators.operatorName |
Nazwa operatora stanowego, do którego odnoszą się metryki. Na przykład , symmetricHashJoin , dedupe , stateStoreSave . |
stateOperators.numRowsTotal |
Liczba wierszy w stanie w wyniku operatora stanowego lub agregacji. |
stateOperators.numRowsUpdated |
Liczba wierszy zaktualizowanych w stanie w wyniku operatora stanowego lub agregacji. |
stateOperators.numRowsRemoved |
Liczba wierszy usuniętych ze stanu w wyniku operatora stanowego lub agregacji. |
stateOperators.commitTimeMs |
Czas potrzebny na zatwierdzenie wszystkich aktualizacji (umieszcza i usuwa) i zwraca nową wersję. |
stateOperators.memoryUsedBytes |
Pamięć używana przez magazyn stanów. |
stateOperators.numRowsDroppedByWatermark |
Liczba wierszy, które są uznawane za za późno, aby zostały uwzględnione w agregacji stanowej. Tylko agregacje przesyłania strumieniowego: liczba wierszy porzuconych po agregacji, a nie nieprzetworzonych wierszy wejściowych. Liczba nie jest dokładna, ale może wskazywać, że opóźnione dane są usuwane. |
stateOperators.numShufflePartitions |
Liczba partycji mieszania dla tego operatora stanowego. |
stateOperators.numStateStoreInstances |
Rzeczywiste wystąpienie magazynu stanów, które operator zainicjował i konserwował. W wielu operatorach stanowych jest to samo co liczba partycji, ale sprzężenia strumienia inicjuje cztery wystąpienia magazynu stanów na partycję. |
stateOperators.customMetrics, obiekt
Informacje zebrane z bazy danych RocksDB, które przechwytują metryki dotyczące wydajności i operacji w odniesieniu do wartości stanowych, które są przechowywane dla zadania przesyłania strumieniowego ze strukturą. Aby uzyskać więcej informacji, zobacz Configure RocksDB state store on Azure Databricks (Konfigurowanie magazynu stanów bazy danych RocksDB w usłudze Azure Databricks).
Metryczne | opis |
---|---|
customMetrics.rocksdbBytesCopied |
Liczba bajtów skopiowanych jako śledzonych przez Menedżera plików Bazy danych RocksDB. |
customMetrics.rocksdbCommitCheckpointLatency |
Czas w milisekundach, aby utworzyć migawkę natywnej bazy danych RocksDB i zapisać ją w katalogu lokalnym. |
customMetrics.rocksdbCompactLatency |
Czas w milisekundach kompaktowania (opcjonalnie) podczas zatwierdzania punktu kontrolnego. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
Czas w milisekundach w celu zsynchronizowania natywnej migawki bazy danych RocksDB z zewnętrznym magazynem (lokalizacja punktu kontrolnego). |
customMetrics.rocksdbCommitFlushLatency |
Czas w milisekundach opróżniania zmian w pamięci bazy danych RocksDB na dysku lokalnym. |
customMetrics.rocksdbCommitPauseLatency |
Czas w milisekundach, aby zatrzymać wątki procesu roboczego w tle (na przykład w przypadku kompaktowania) w ramach zatwierdzenia punktu kontrolnego. |
customMetrics.rocksdbCommitWriteBatchLatency |
Czas w milisekundach, aby zastosować zapisy etapowe w strukturze w pamięci (WriteBatch ) do natywnej bazy danych RocksDB. |
customMetrics.rocksdbFilesCopied |
Liczba plików skopiowanych zgodnie z śledzeniem przez Menedżera plików Bazy danych RocksDB. |
customMetrics.rocksdbFilesReused |
Liczba ponownie użytych plików, które są śledzone przez Menedżera plików Bazy danych RocksDB. |
customMetrics.rocksdbGetCount |
Liczba wywołań get bazy danych (nie obejmuje gets WriteBatch to wsadu w pamięci używanej do przejściowych zapisów). |
customMetrics.rocksdbGetLatency |
Średni czas w nanosekundach dla bazowego wywołania natywnego RocksDB::Get . |
customMetrics.rocksdbReadBlockCacheHitCount |
Ilość pamięci podręcznej blokowej w bazie danych RocksDB jest przydatna lub nie i unika odczytu dysku lokalnego. |
customMetrics.rocksdbReadBlockCacheMissCount |
Ilość pamięci podręcznej blokowej w bazie danych RocksDB jest przydatna lub nie i unika odczytu dysku lokalnego. |
customMetrics.rocksdbSstFileSize |
Rozmiar wszystkich plików SST. SST oznacza statyczną tabelę sortowaną, która jest strukturą tabelaryczna RocksDB używa do przechowywania danych. |
customMetrics.rocksdbTotalBytesRead |
Liczba nieskompresowanych bajtów odczytanych według get operacji. |
customMetrics.rocksdbTotalBytesReadByCompaction |
Liczba bajtów odczytywanych przez proces kompaktowania z dysku. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Niektóre operacje stanowe (na przykład przetwarzanie limitu czasu i FlatMapGroupsWithState znakowanie wodne) wymagają odczytywania danych w bazie danych za pomocą iteratora. Ta metryka reprezentuje rozmiar nieskompresowanych danych odczytywanych przy użyciu iteratora. |
customMetrics.rocksdbTotalBytesWritten |
Liczba nieskompresowanych bajtów zapisanych przez put operacje. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
Liczba bajtów zapisu procesu kompaktowania na dysku. |
customMetrics.rocksdbTotalCompactionLatencyMs |
Milisekundy czasu dla kompaktacji bazy danych RocksDB, w tym kompaktowania w tle i opcjonalnego kompaktowania zainicjowanego podczas zatwierdzania. |
customMetrics.rocksdbTotalFlushLatencyMs |
Opróżnianie czasu, w tym opróżnianie tła. Operacje opróżniania to procesy, za pomocą których tabela MemTable jest opróżniona do magazynu po jej zapełnieniu. Tabele MemTable to pierwszy poziom, na którym dane są przechowywane w bazie danych RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
Menedżer plików rocksDB zarządza fizycznym wykorzystaniem i usunięciem miejsca na dysku pliku SST. Ta metryka reprezentuje nieskompresowane pliki zip w bajtach zgłoszonych przez Menedżera plików. |
sources object (Kafka)
Metryczne | opis |
---|---|
sources.description |
Nazwa źródła odczytywanego zapytania przesyłania strumieniowego. Na przykład “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” . |
sources.startOffset Obiektu |
Początkowy numer przesunięcia w temacie platformy Kafka, na który uruchomiono zadanie przesyłania strumieniowego. |
sources.endOffset Obiektu |
Najnowsze przesunięcie przetworzone przez mikrobabajt. Może to być równe latestOffset dla trwającego wykonywania mikrobajta. |
sources.latestOffset Obiektu |
Najnowsze przesunięcie obliczone przez mikrobajt. W przypadku ograniczania przepływności proces mikrosadowania może nie przetwarzać wszystkich przesunięć, powodując różnice ilatestOffset .endOffset |
sources.numInputRows |
Liczba wierszy wejściowych przetworzonych z tego źródła. |
sources.inputRowsPerSecond |
Szybkość, z jaką dane docierają do przetwarzania dla tego źródła. |
sources.processedRowsPerSecond |
Szybkość przetwarzania danych przez platformę Spark dla tego źródła. |
sources.metrics, obiekt (Kafka)
Metryczne | opis |
---|---|
sources.metrics.avgOffsetsBehindLatest |
Średnia liczba przesunięć, które zapytanie przesyłane strumieniowo znajduje się za najnowszym dostępnym przesunięciem wśród wszystkich subskrybowanych tematów. |
sources.metrics.estimatedTotalBytesBehindLatest |
Szacowana liczba bajtów, z których proces zapytania nie korzysta z subskrybowanych tematów. |
sources.metrics.maxOffsetsBehindLatest |
Maksymalna liczba przesunięć, które zapytanie przesyłane strumieniowo znajduje się za najnowszym dostępnym przesunięciem wśród wszystkich subskrybowanych tematów. |
sources.metrics.minOffsetsBehindLatest |
Minimalna liczba przesunięć, które zapytanie przesyłane strumieniowo znajduje się za najnowszym dostępnym przesunięciem wśród wszystkich subskrybowanych tematów. |
obiekt ujścia (Kafka)
Metryczne | opis |
---|---|
sink.description |
Nazwa ujścia, do których zapisuje się zapytanie przesyłane strumieniowo. Na przykład “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” . |
sink.numOutputRows |
Liczba wierszy zapisanych w tabeli wyjściowej lub ujściu w ramach mikrobatchu. W niektórych sytuacjach ta wartość może mieć wartość "-1" i ogólnie może być interpretowana jako "nieznana". |
obiekt sources (Delta Lake)
Metryczne | opis |
---|---|
sources.description |
Nazwa źródła odczytywanego zapytania przesyłania strumieniowego. Na przykład “DeltaSource[table]” . |
sources.[startOffset/endOffset].sourceVersion |
Wersja serializacji, z którą to przesunięcie jest kodowane. |
sources.[startOffset/endOffset].reservoirId |
Identyfikator odczytywanej tabeli. Służy do wykrywania błędnej konfiguracji podczas ponownego uruchamiania zapytania. |
sources.[startOffset/endOffset].reservoirVersion |
Wersja tabeli, którą obecnie przetwarzasz. |
sources.[startOffset/endOffset].index |
Indeksuj w sekwencji AddFiles w tej wersji. Służy do dzielenia dużych zatwierdzeń na wiele partii. Ten indeks jest tworzony przez sortowanie według modificationTimestamp i path . |
sources.[startOffset/endOffset].isStartingVersion |
Czy to przesunięcie oznacza zapytanie, które jest uruchamiane, a nie przetwarzania zmian. Podczas uruchamiania nowego zapytania wszystkie dane obecne w tabeli na początku są przetwarzane, a następnie nowe dane, które dotarły. |
sources.latestOffset |
Najnowsze przesunięcie przetworzone przez zapytanie mikrobatch. |
sources.numInputRows |
Liczba wierszy wejściowych przetworzonych z tego źródła. |
sources.inputRowsPerSecond |
Szybkość, z jaką dane docierają do przetwarzania dla tego źródła. |
sources.processedRowsPerSecond |
Szybkość przetwarzania danych przez platformę Spark dla tego źródła. |
sources.metrics.numBytesOutstanding |
Łączny rozmiar zaległych plików (plików śledzonych przez bazę danych RocksDB). Jest to metryka listy prac dla funkcji delta i automatycznego modułu ładującego jako źródło przesyłania strumieniowego. |
sources.metrics.numFilesOutstanding |
Liczba zaległych plików do przetworzenia. Jest to metryka listy prac dla funkcji delta i automatycznego modułu ładującego jako źródło przesyłania strumieniowego. |
obiekt ujścia (Delta Lake)
Metryczne | opis |
---|---|
sink.description |
Nazwa ujścia, do którego zapisuje zapytanie przesyłane strumieniowo. Na przykład “DeltaSink[table]” . |
sink.numOutputRows |
Liczba wierszy w tej metryce to "-1", ponieważ platforma Spark nie może wywnioskować wierszy wyjściowych dla ujścia DSv1, czyli klasyfikacji ujścia usługi Delta Lake. |
Przykłady
Przykładowe zdarzenie Kafka-to-Kafka StreamingQueryListener
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Przykładowe zdarzenie Delta Lake-to-Delta Lake StreamingQueryListener
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Przykładowe źródło współczynnika dla zdarzenia Delta Lake StreamingQueryListener
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}