Monitorowanie zapytań strukturalnego przesyłania strumieniowego w usłudze Azure Databricks

Usługa Azure Databricks zapewnia wbudowane monitorowanie aplikacji do przesyłania strumieniowego ze strukturą za pośrednictwem interfejsu użytkownika platformy Spark na karcie Przesyłanie strumieniowe .

Rozróżnianie zapytań przesyłania strumieniowego ze strukturą w interfejsie użytkownika platformy Spark

Podaj unikatową nazwę zapytania, dodając .queryName(<query-name>) do writeStream kodu, aby łatwo odróżnić metryki, do których należy strumień w interfejsie użytkownika platformy Spark.

Wypychanie metryk przesyłania strumieniowego ze strukturą do usług zewnętrznych

Metryki przesyłania strumieniowego można wypychać do usług zewnętrznych na potrzeby zgłaszania alertów lub pulpitów nawigacyjnych przy użyciu interfejsu odbiornika zapytań przesyłania strumieniowego platformy Apache Spark. W środowisku Databricks Runtime 11.3 LTS i nowszym odbiornik zapytań przesyłania strumieniowego jest dostępny w językach Python i Scala.

Ważne

Nie można używać poświadczeń i obiektów zarządzanych przez wykaz aparatu Unity w StreamingQueryListener logice.

Uwaga

Opóźnienie przetwarzania skojarzone z odbiornikami może mieć negatywny wpływ na przetwarzanie zapytań. Usługa Databricks zaleca zminimalizowanie logiki przetwarzania w tych odbiornikach i zapisywanie w ujściach o małych opóźnieniach, takich jak kafka.

Poniższy kod zawiera podstawowe przykłady składni implementowania odbiornika:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definiowanie obserwowalnych metryk w strumieniu ze strukturą

Obserwowane metryki są nazywane dowolnymi funkcjami agregacji, które można zdefiniować w zapytaniu (DataFrame). Gdy tylko wykonanie ramki danych osiągnie punkt ukończenia (czyli kończy zapytanie wsadowe lub osiąga epokę przesyłania strumieniowego), emitowane jest nazwane zdarzenie zawierające metryki dla danych przetworzonych od ostatniego punktu ukończenia.

Możesz obserwować te metryki, dołączając odbiornik do sesji platformy Spark. Odbiornik zależy od trybu wykonywania:

  • Tryb wsadowy: użyj polecenia QueryExecutionListener.

    QueryExecutionListener jest wywoływana po zakończeniu zapytania. Uzyskaj dostęp do metryk przy użyciu QueryExecution.observedMetrics mapy.

  • Przesyłanie strumieniowe lub mikrosadowe: użyj polecenia StreamingQueryListener.

    StreamingQueryListener jest wywoływana, gdy zapytanie przesyłane strumieniowo kończy epokę. Uzyskaj dostęp do metryk przy użyciu StreamingQueryProgress.observedMetrics mapy. Usługa Azure Databricks nie obsługuje przesyłania strumieniowego ciągłego wykonywania.

Na przykład:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Metryki obiektu StreamingQueryListener

Metryczne opis
id Unikatowy identyfikator zapytania, który będzie się powtarzać po ponownym uruchomieniu. Zobacz StreamingQuery.id().
runId Unikatowy identyfikator zapytania dla każdego uruchomienia lub ponownego uruchomienia. Zobacz StreamingQuery.runId().
name Określona przez użytkownika nazwa zapytania. Wartość null, jeśli nie zostanie określona.
timestamp Sygnatura czasowa wykonania mikrosadowej partii.
batchId Unikatowy identyfikator bieżącej partii przetwarzanych danych. Należy pamiętać, że w przypadku ponownych prób po awarii dany identyfikator partii można wykonać więcej niż raz. Podobnie, gdy nie ma danych do przetworzenia, identyfikator partii nie jest zwiększany.
numInputRows Zagregowana (we wszystkich źródłach) liczba rekordów przetworzonych w wyzwalaczu.
inputRowsPerSecond Zagregowany (we wszystkich źródłach) współczynnik przychodzących danych.
processedRowsPerSecond Agregacja (we wszystkich źródłach) szybkość przetwarzania danych przez platformę Spark.

durationMs, obiekt

Informacje o czasie potrzebnym na ukończenie różnych etapów procesu wykonywania mikrosadowego.

Metryczne opis
durationMs.addBatch Czas potrzebny na wykonanie mikrobajtu. Wyklucza to czas, przez który platforma Spark planuje mikrobajt.
durationMs.getBatch Czas potrzebny na pobranie metadanych dotyczących przesunięć ze źródła.
durationMs.latestOffset Najnowsze przesunięcie używane dla mikrobatchu. Ten obiekt postępu odnosi się do czasu potrzebnego na pobranie najnowszego przesunięcia ze źródeł.
durationMs.queryPlanning Czas potrzebny na wygenerowanie planu wykonania.
durationMs.triggerExecution Czas potrzebny na zaplanowanie i wykonanie mikrobatchu.
durationMs.walCommit Czas potrzebny na zatwierdzenie nowych dostępnych przesunięć.

eventTime, obiekt

Informacje o wartości czasu zdarzenia widocznej w danych przetwarzanych w mikrosadowej partii. Te dane są używane przez znak wodny, aby dowiedzieć się, jak przyciąć stan przetwarzania agregacji stanowych zdefiniowanych w zadaniu przesyłania strumieniowego ze strukturą.

Metryczne opis
eventTime.avg Średni czas zdarzenia widoczny w wyzwalaczu.
eventTime.max Maksymalny czas zdarzenia widoczny w wyzwalaczu.
eventTime.min Minimalny czas zdarzenia widoczny w wyzwalaczu.
eventTime.watermark Wartość znaku wodnego używanego w wyzwalaczu.

stateOperators, obiekt

Informacje o operacjach stanowych zdefiniowanych w zadaniu przesyłania strumieniowego ze strukturą i agregacjach generowanych z nich.

Metryczne opis
stateOperators.operatorName Nazwa operatora stanowego, do którego odnoszą się metryki. Na przykład , symmetricHashJoin, dedupe, stateStoreSave.
stateOperators.numRowsTotal Liczba wierszy w stanie w wyniku operatora stanowego lub agregacji.
stateOperators.numRowsUpdated Liczba wierszy zaktualizowanych w stanie w wyniku operatora stanowego lub agregacji.
stateOperators.numRowsRemoved Liczba wierszy usuniętych ze stanu w wyniku operatora stanowego lub agregacji.
stateOperators.commitTimeMs Czas potrzebny na zatwierdzenie wszystkich aktualizacji (umieszcza i usuwa) i zwraca nową wersję.
stateOperators.memoryUsedBytes Pamięć używana przez magazyn stanów.
stateOperators.numRowsDroppedByWatermark Liczba wierszy, które są uznawane za za późno, aby zostały uwzględnione w agregacji stanowej. Tylko agregacje przesyłania strumieniowego: liczba wierszy porzuconych po agregacji, a nie nieprzetworzonych wierszy wejściowych. Liczba nie jest dokładna, ale może wskazywać, że opóźnione dane są usuwane.
stateOperators.numShufflePartitions Liczba partycji mieszania dla tego operatora stanowego.
stateOperators.numStateStoreInstances Rzeczywiste wystąpienie magazynu stanów, które operator zainicjował i konserwował. W wielu operatorach stanowych jest to samo co liczba partycji, ale sprzężenia strumienia inicjuje cztery wystąpienia magazynu stanów na partycję.

stateOperators.customMetrics, obiekt

Informacje zebrane z bazy danych RocksDB, które przechwytują metryki dotyczące wydajności i operacji w odniesieniu do wartości stanowych, które są przechowywane dla zadania przesyłania strumieniowego ze strukturą. Aby uzyskać więcej informacji, zobacz Configure RocksDB state store on Azure Databricks (Konfigurowanie magazynu stanów bazy danych RocksDB w usłudze Azure Databricks).

Metryczne opis
customMetrics.rocksdbBytesCopied Liczba bajtów skopiowanych jako śledzonych przez Menedżera plików Bazy danych RocksDB.
customMetrics.rocksdbCommitCheckpointLatency Czas w milisekundach, aby utworzyć migawkę natywnej bazy danych RocksDB i zapisać ją w katalogu lokalnym.
customMetrics.rocksdbCompactLatency Czas w milisekundach kompaktowania (opcjonalnie) podczas zatwierdzania punktu kontrolnego.
customMetrics.rocksdbCommitFileSyncLatencyMs Czas w milisekundach w celu zsynchronizowania natywnej migawki bazy danych RocksDB z zewnętrznym magazynem (lokalizacja punktu kontrolnego).
customMetrics.rocksdbCommitFlushLatency Czas w milisekundach opróżniania zmian w pamięci bazy danych RocksDB na dysku lokalnym.
customMetrics.rocksdbCommitPauseLatency Czas w milisekundach, aby zatrzymać wątki procesu roboczego w tle (na przykład w przypadku kompaktowania) w ramach zatwierdzenia punktu kontrolnego.
customMetrics.rocksdbCommitWriteBatchLatency Czas w milisekundach, aby zastosować zapisy etapowe w strukturze w pamięci (WriteBatch) do natywnej bazy danych RocksDB.
customMetrics.rocksdbFilesCopied Liczba plików skopiowanych zgodnie z śledzeniem przez Menedżera plików Bazy danych RocksDB.
customMetrics.rocksdbFilesReused Liczba ponownie użytych plików, które są śledzone przez Menedżera plików Bazy danych RocksDB.
customMetrics.rocksdbGetCount Liczba wywołań get bazy danych (nie obejmuje getsWriteBatchto wsadu w pamięci używanej do przejściowych zapisów).
customMetrics.rocksdbGetLatency Średni czas w nanosekundach dla bazowego wywołania natywnego RocksDB::Get .
customMetrics.rocksdbReadBlockCacheHitCount Ilość pamięci podręcznej blokowej w bazie danych RocksDB jest przydatna lub nie i unika odczytu dysku lokalnego.
customMetrics.rocksdbReadBlockCacheMissCount Ilość pamięci podręcznej blokowej w bazie danych RocksDB jest przydatna lub nie i unika odczytu dysku lokalnego.
customMetrics.rocksdbSstFileSize Rozmiar wszystkich plików SST. SST oznacza statyczną tabelę sortowaną, która jest strukturą tabelaryczna RocksDB używa do przechowywania danych.
customMetrics.rocksdbTotalBytesRead Liczba nieskompresowanych bajtów odczytanych według get operacji.
customMetrics.rocksdbTotalBytesReadByCompaction Liczba bajtów odczytywanych przez proces kompaktowania z dysku.
customMetrics.rocksdbTotalBytesReadThroughIterator Niektóre operacje stanowe (na przykład przetwarzanie limitu czasu i FlatMapGroupsWithState znakowanie wodne) wymagają odczytywania danych w bazie danych za pomocą iteratora. Ta metryka reprezentuje rozmiar nieskompresowanych danych odczytywanych przy użyciu iteratora.
customMetrics.rocksdbTotalBytesWritten Liczba nieskompresowanych bajtów zapisanych przez put operacje.
customMetrics.rocksdbTotalBytesWrittenByCompaction Liczba bajtów zapisu procesu kompaktowania na dysku.
customMetrics.rocksdbTotalCompactionLatencyMs Milisekundy czasu dla kompaktacji bazy danych RocksDB, w tym kompaktowania w tle i opcjonalnego kompaktowania zainicjowanego podczas zatwierdzania.
customMetrics.rocksdbTotalFlushLatencyMs Opróżnianie czasu, w tym opróżnianie tła. Operacje opróżniania to procesy, za pomocą których tabela MemTable jest opróżniona do magazynu po jej zapełnieniu. Tabele MemTable to pierwszy poziom, na którym dane są przechowywane w bazie danych RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed Menedżer plików rocksDB zarządza fizycznym wykorzystaniem i usunięciem miejsca na dysku pliku SST. Ta metryka reprezentuje nieskompresowane pliki zip w bajtach zgłoszonych przez Menedżera plików.

sources object (Kafka)

Metryczne opis
sources.description Nazwa źródła odczytywanego zapytania przesyłania strumieniowego. Na przykład “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
sources.startOffset Obiektu Początkowy numer przesunięcia w temacie platformy Kafka, na który uruchomiono zadanie przesyłania strumieniowego.
sources.endOffset Obiektu Najnowsze przesunięcie przetworzone przez mikrobabajt. Może to być równe latestOffset dla trwającego wykonywania mikrobajta.
sources.latestOffset Obiektu Najnowsze przesunięcie obliczone przez mikrobajt. W przypadku ograniczania przepływności proces mikrosadowania może nie przetwarzać wszystkich przesunięć, powodując różnice ilatestOffset.endOffset
sources.numInputRows Liczba wierszy wejściowych przetworzonych z tego źródła.
sources.inputRowsPerSecond Szybkość, z jaką dane docierają do przetwarzania dla tego źródła.
sources.processedRowsPerSecond Szybkość przetwarzania danych przez platformę Spark dla tego źródła.

sources.metrics, obiekt (Kafka)

Metryczne opis
sources.metrics.avgOffsetsBehindLatest Średnia liczba przesunięć, które zapytanie przesyłane strumieniowo znajduje się za najnowszym dostępnym przesunięciem wśród wszystkich subskrybowanych tematów.
sources.metrics.estimatedTotalBytesBehindLatest Szacowana liczba bajtów, z których proces zapytania nie korzysta z subskrybowanych tematów.
sources.metrics.maxOffsetsBehindLatest Maksymalna liczba przesunięć, które zapytanie przesyłane strumieniowo znajduje się za najnowszym dostępnym przesunięciem wśród wszystkich subskrybowanych tematów.
sources.metrics.minOffsetsBehindLatest Minimalna liczba przesunięć, które zapytanie przesyłane strumieniowo znajduje się za najnowszym dostępnym przesunięciem wśród wszystkich subskrybowanych tematów.

obiekt ujścia (Kafka)

Metryczne opis
sink.description Nazwa ujścia, do których zapisuje się zapytanie przesyłane strumieniowo. Na przykład “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows Liczba wierszy zapisanych w tabeli wyjściowej lub ujściu w ramach mikrobatchu. W niektórych sytuacjach ta wartość może mieć wartość "-1" i ogólnie może być interpretowana jako "nieznana".

obiekt sources (Delta Lake)

Metryczne opis
sources.description Nazwa źródła odczytywanego zapytania przesyłania strumieniowego. Na przykład “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion Wersja serializacji, z którą to przesunięcie jest kodowane.
sources.[startOffset/endOffset].reservoirId Identyfikator odczytywanej tabeli. Służy do wykrywania błędnej konfiguracji podczas ponownego uruchamiania zapytania.
sources.[startOffset/endOffset].reservoirVersion Wersja tabeli, którą obecnie przetwarzasz.
sources.[startOffset/endOffset].index Indeksuj w sekwencji AddFiles w tej wersji. Służy do dzielenia dużych zatwierdzeń na wiele partii. Ten indeks jest tworzony przez sortowanie według modificationTimestamp i path.
sources.[startOffset/endOffset].isStartingVersion Czy to przesunięcie oznacza zapytanie, które jest uruchamiane, a nie przetwarzania zmian. Podczas uruchamiania nowego zapytania wszystkie dane obecne w tabeli na początku są przetwarzane, a następnie nowe dane, które dotarły.
sources.latestOffset Najnowsze przesunięcie przetworzone przez zapytanie mikrobatch.
sources.numInputRows Liczba wierszy wejściowych przetworzonych z tego źródła.
sources.inputRowsPerSecond Szybkość, z jaką dane docierają do przetwarzania dla tego źródła.
sources.processedRowsPerSecond Szybkość przetwarzania danych przez platformę Spark dla tego źródła.
sources.metrics.numBytesOutstanding Łączny rozmiar zaległych plików (plików śledzonych przez bazę danych RocksDB). Jest to metryka listy prac dla funkcji delta i automatycznego modułu ładującego jako źródło przesyłania strumieniowego.
sources.metrics.numFilesOutstanding Liczba zaległych plików do przetworzenia. Jest to metryka listy prac dla funkcji delta i automatycznego modułu ładującego jako źródło przesyłania strumieniowego.

obiekt ujścia (Delta Lake)

Metryczne opis
sink.description Nazwa ujścia, do którego zapisuje zapytanie przesyłane strumieniowo. Na przykład “DeltaSink[table]”.
sink.numOutputRows Liczba wierszy w tej metryce to "-1", ponieważ platforma Spark nie może wywnioskować wierszy wyjściowych dla ujścia DSv1, czyli klasyfikacji ujścia usługi Delta Lake.

Przykłady

Przykładowe zdarzenie Kafka-to-Kafka StreamingQueryListener

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Przykładowe zdarzenie Delta Lake-to-Delta Lake StreamingQueryListener

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Przykładowe źródło współczynnika dla zdarzenia Delta Lake StreamingQueryListener

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}