Supervisión de consultas de Structured Streaming en Azure Databricks

Azure Databricks proporciona una supervisión integrada para las aplicaciones de Structured Streaming mediante la interfaz de usuario de Spark de la pestaña Streaming.

Cómo distinguir consultas de Structured Streaming en la UI de Spark

Para distinguir las métricas pertenecen a cada secuencia de manera sencilla mediante la UI de Spark, proporcione un nombre de consulta único a los flujos. Para ello, agregue el elemento .queryName(<query-name>) al código writeStream.

Inserción de métricas de Structured Streaming en servicios externos

Las métricas de streaming se pueden insertar en servicios externos para alertas o casos de uso de paneles mediante la interfaz del cliente de escucha de consultas de streaming de Apache Spark. En Databricks Runtime 11.3 LTS y versiones posteriores, el cliente de escucha de consultas de streaming está disponible en Python y Scala.

Importante

Las credenciales y los objetos administrados por Unity Catalog no se pueden usar en la lógica de StreamingQueryListener.

Nota:

La latencia de procesamiento asociada a los clientes de escucha puede afectar negativamente al procesamiento de consultas. Databricks recomienda minimizar la lógica de procesamiento en estos clientes de escucha y escribir en receptores de baja latencia, como Kafka.

En el código siguiente se proporcionan ejemplos básicos de la sintaxis para implementar un cliente de escucha:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definición de métricas observables en Structured Streaming

Las métricas observables son funciones de agregado arbitrarias con nombre que se pueden definir en una consulta (DataFrame). En cuanto la ejecución de un DataFrame alcanza un punto de finalización (es decir, finaliza una consulta por lotes o llega a una época de streaming), se emite un evento con nombre que contiene las métricas de los datos procesados desde el último punto de finalización.

Para observar estas métricas, adjunte un cliente de escucha a la sesión de Spark. El cliente de escucha que debe usar dependerá del modo de ejecución:

  • Modo por lotes: use el cliente QueryExecutionListener.

    El agente QueryExecutionListener recibe una llamada cuando se completa una consulta. Acceda a las métricas mediante el mapa QueryExecution.observedMetrics.

  • Streaming o microlote: use el agente StreamingQueryListener.

    El agente StreamingQueryListener recibe una llamada cuando la consulta de streaming completa una época. Acceda a las métricas mediante el mapa StreamingQueryProgress.observedMetrics. La plataforma Azure Databricks no admite el streaming de ejecución continua.

Por ejemplo:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Métricas del objeto StreamingQueryListener

Métrica Descripción
id Id. de consulta único que persiste en los reinicios. Consulte StreamingQuery.id().
runId Id. de consulta único para cada inicio o reinicio. Consulte StreamingQuery.runId().
name Nombre especificado por el usuario de la consulta. Null si no se especifica.
timestamp Marca de tiempo para la ejecución del microlote.
batchId Id. único del lote actual de datos que se está procesando. Tenga en cuenta que, en el caso de reintentos tras un error, se puede ejecutar un id. de lote determinado más de una vez. Del mismo modo, cuando no hay datos que se van a procesar, el id. de lote no se incrementa.
numInputRows Número agregado (en todos los orígenes) de registros procesados en un desencadenador.
inputRowsPerSecond Tasa de agregado (en todos los orígenes) de los datos que llegan.
processedRowsPerSecond Tasa de agregado (en todos los orígenes) a la que Spark está procesando datos.

objeto durationMs

Información sobre el tiempo necesario para completar varias fases del proceso de ejecución de microlote.

Métrica Descripción
durationMs.addBatch Tiempo necesario para ejecutar el microlote. Esto excluye el tiempo que tarda Spark en planear el microlote.
durationMs.getBatch Tiempo necesario para recuperar los metadatos sobre los desplazamientos del origen.
durationMs.latestOffset Desplazamiento más reciente consumido para el microlote. Este objeto de progreso hace referencia al tiempo necesario para recuperar el desplazamiento más reciente de los orígenes.
durationMs.queryPlanning Tiempo necesario para generar el plan de ejecución.
durationMs.triggerExecution Tiempo necesario para planear y ejecutar el microlote.
durationMs.walCommit Tiempo necesario para confirmar los nuevos desplazamientos disponibles.

objeto eventTime

Información sobre el valor del tiempo del evento visto dentro de los datos que se procesan en el microlote. Esta marca de agua usa estos datos para averiguar cómo recortar el estado para procesar agregaciones con estado definidas en el trabajo de flujo estructurado.

Métrica Descripción
eventTime.avg Promedio de tiempo del evento visto en el desencadenador.
eventTime.max Tiempo máximo del evento visto en el desencadenador.
eventTime.min Tiempo mínimo del evento visto en el desencadenador.
eventTime.watermark Valor de la marca de agua utilizada en el desencadenador.

objeto stateOperators

Información sobre las operaciones con estado definidas en el trabajo de flujo estructurado y las agregaciones que se generan a partir de ellas.

Métrica Descripción
stateOperators.operatorName Nombre del operador con estado al que se refieren las métricas. Por ejemplo: symmetricHashJoin, dedupe y stateStoreSave.
stateOperators.numRowsTotal Número de filas en el estado como resultado del operador o agregación con estado.
stateOperators.numRowsUpdated Número de filas actualizadas en el estado como resultado del operador o agregación con estado.
stateOperators.numRowsRemoved Número de filas quitadas del estado como resultado del operador o agregación con estado.
stateOperators.commitTimeMs Tiempo necesario para confirmar todas las actualizaciones (colocaciones y eliminaciones) y devolver una nueva versión.
stateOperators.memoryUsedBytes Memoria usada por el almacén de estado.
stateOperators.numRowsDroppedByWatermark Número de filas que se consideran demasiado tardías para incluirse en la agregación con estado. Agregaciones de streaming solo: número de filas eliminadas después de la agregación y no las filas de entrada sin procesar. El número no es preciso, pero puede indicar que se quitan los datos atrasados.
stateOperators.numShufflePartitions Número de particiones aleatorias para este operador con estado.
stateOperators.numStateStoreInstances Instancia de almacén de estado real que el operador ha inicializado y mantenido. En muchos operadores con estado, este es el mismo que el número de particiones, pero la combinación de flujo inicializa cuatro instancias de almacén de estado por partición.

objeto stateOperators.customMetrics

Información recopilada de RocksDB que captura métricas sobre su rendimiento y operaciones con respecto a los valores con estado que mantiene para el trabajo de flujo estructurado. Para más información, consulte Configuración del almacén de estado de RocksDB en Azure Databricks.

Métrica Descripción
customMetrics.rocksdbBytesCopied Número de bytes copiados según el seguimiento del administrador de archivos de RocksDB.
customMetrics.rocksdbCommitCheckpointLatency Tiempo en milisegundos para tomar una instantánea de RocksDB nativo y escribirlo en un directorio local.
customMetrics.rocksdbCompactLatency El tiempo en milisegundos para realizar la compactación (opcional) durante la confirmación del punto de comprobación.
customMetrics.rocksdbCommitFileSyncLatencyMs El tiempo en milisegundos para sincronizar los archivos nativos relacionados con una instantánea de RocksDB con un almacenamiento externo (ubicación del punto de comprobación).
customMetrics.rocksdbCommitFlushLatency Tiempo en milisegundos para vaciar los cambios en la memoria de RocksDB en el disco local.
customMetrics.rocksdbCommitPauseLatency Tiempo en milisegundos para detener los subprocesos de trabajo en segundo plano (por ejemplo, para la compactación) como parte de la confirmación del punto de comprobación.
customMetrics.rocksdbCommitWriteBatchLatency Tiempo en milisegundos para aplicar las escrituras de almacenamiento provisional en la estructura en memoria (WriteBatch) a RocksDB nativo.
customMetrics.rocksdbFilesCopied Número de archivos copiados según el seguimiento del administrador de archivos de RocksDB.
customMetrics.rocksdbFilesReused Número de archivos reutilizados según el seguimiento del administrador de archivos de RocksDB.
customMetrics.rocksdbGetCount Número de llamadas get a la base de datos (esto no incluye gets de WriteBatch: en el lote de memoria usado para escrituras de almacenamiento provisional).
customMetrics.rocksdbGetLatency Promedio de tiempo en nanosegundos para la llamada nativa RocksDB::Get subyacente.
customMetrics.rocksdbReadBlockCacheHitCount Cuánto de la caché de bloques en RocksDB es útil o no y cómo evita las lecturas de disco local.
customMetrics.rocksdbReadBlockCacheMissCount Cuánto de la caché de bloques en RocksDB es útil o no y cómo evita las lecturas de disco local.
customMetrics.rocksdbSstFileSize Tamaño de todos los archivos SST. SST significa Tabla ordenada estática, que es la estructura tabular que RocksDB usa para almacenar datos.
customMetrics.rocksdbTotalBytesRead Número de bytes sin comprimir leídos por operaciones de get.
customMetrics.rocksdbTotalBytesReadByCompaction Número de bytes de lectura del proceso de compactación del disco.
customMetrics.rocksdbTotalBytesReadThroughIterator Algunas de las operaciones con estado (por ejemplo, el procesamiento de tiempo de espera en FlatMapGroupsWithState y la marca de agua) requieren leer datos en la base de datos a través de un iterador. Esta métrica representa el tamaño de los datos sin comprimir leídos mediante el iterador.
customMetrics.rocksdbTotalBytesWritten Número de bytes sin comprimir escritos por operaciones de put.
customMetrics.rocksdbTotalBytesWrittenByCompaction Número de bytes que escribe el proceso de compactación en el disco.
customMetrics.rocksdbTotalCompactionLatencyMs Tiempo en milisegundos para compactaciones de RocksDB, incluidas las compactaciones en segundo plano y la compactación opcional iniciada durante la confirmación.
customMetrics.rocksdbTotalFlushLatencyMs Tiempo de vaciado, incluido el vaciado en segundo plano. Las operaciones de vaciado son procesos por los que MemTable se vacía en el almacenamiento una vez que está lleno. MemTables son el primer nivel donde se almacenan los datos en RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed El administrador de archivos de RocksDB administra el uso y eliminación del espacio en disco del archivo SST físico. Esta métrica representa los archivos ZIP sin comprimir en bytes según lo notificado por el administrador de archivos.

objeto de origen (Kafka)

Métrica Descripción
sources.description Nombre del origen desde el que se lee la consulta de streaming. Por ejemplo, “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
Objecto sources.startOffset Número de desplazamiento inicial en el tema de Kafka en el que se inició el trabajo de streaming.
Objecto sources.endOffset Desplazamiento más reciente procesado por el microlote. Esto podría ser igual a latestOffset para una ejecución de microlote en curso.
Objecto sources.latestOffset Desplazamiento más reciente calculado por el microlote. Cuando hay limitaciones, es posible que el proceso de microlote no procese todos los desplazamientos, lo que provoca que endOffset y latestOffset difieran.
sources.numInputRows Número de filas de entrada procesadas desde este origen.
sources.inputRowsPerSecond Tasa a la que llegan los datos para su procesamiento para este origen.
sources.processedRowsPerSecond Tasa a la que Spark está procesando datos para este origen.

objeto sources.metrics (Kafka)

Métrica Descripción
sources.metrics.avgOffsetsBehindLatest Promedio del número de desplazamientos que la consulta de streaming está detrás del desplazamiento disponible más reciente entre todos los temas suscritos.
sources.metrics.estimatedTotalBytesBehindLatest Número estimado de bytes que el proceso de consulta no ha consumido de los temas suscritos.
sources.metrics.maxOffsetsBehindLatest Número máximo de desplazamientos que la consulta de streaming está detrás del desplazamiento disponible más reciente entre todos los temas suscritos.
sources.metrics.minOffsetsBehindLatest Número mínimo de desplazamientos que la consulta de streaming está detrás del desplazamiento disponible más reciente entre todos los temas suscritos.

objeto receptor (Kafka)

Métrica Descripción
sink.description Nombre del receptor en el que está escribiendo la consulta de streaming. Por ejemplo, “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows Número de filas escritas en la tabla o receptor de salida como parte del microlote. En algunas situaciones, este valor puede ser "-1" y, por lo general, se puede interpretar como "desconocido".

objeto de origen (Delta Lake)

Métrica Descripción
sources.description Nombre del origen desde el que se lee la consulta de streaming. Por ejemplo, “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion Versión de la serialización con la que se codifica este desplazamiento.
sources.[startOffset/endOffset].reservoirId Id. de la tabla de la que se está leyendo. Se usa para detectar errores de configuración al reiniciar una consulta.
sources.[startOffset/endOffset].reservoirVersion Versión de la tabla que está procesando actualmente.
sources.[startOffset/endOffset].index Índice en la secuencia de AddFiles en esta versión. Esto se usa para dividir las confirmaciones grandes en varios lotes. Este índice se crea ordenando en modificationTimestamp y path.
sources.[startOffset/endOffset].isStartingVersion Si este desplazamiento indica una consulta que se está iniciando en lugar de procesar cambios. Al iniciar una nueva consulta, se procesan todos los datos presentes en la tabla al principio y, a continuación, los nuevos datos que hayan llegado.
sources.latestOffset Desplazamiento más reciente procesado por la consulta del microlote.
sources.numInputRows Número de filas de entrada procesadas desde este origen.
sources.inputRowsPerSecond Tasa a la que llegan los datos para su procesamiento para este origen.
sources.processedRowsPerSecond Tasa a la que Spark está procesando datos para este origen.
sources.metrics.numBytesOutstanding Tamaño de los archivos pendientes (archivos rastreados por RocksDB) combinados. Esta es la métrica de trabajo pendiente para Delta y el cargador automático como origen de streaming.
sources.metrics.numFilesOutstanding Número de archivos pendientes que se van a procesar. Esta es la métrica de trabajo pendiente para Delta y el cargador automático como origen de streaming.

objeto receptor (Delta Lake)

Métrica Descripción
sink.description Nombre del receptor en el que escribe la consulta de streaming. Por ejemplo, “DeltaSink[table]”.
sink.numOutputRows El número de filas de esta métrica es "-1" porque Spark no puede inferir filas de salida para receptores DSv1, que es la clasificación para el receptor de Delta Lake.

Ejemplos

Ejemplo de evento StreamingQueryListener de Kafka a Kafka

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Ejemplo de evento StreamingQueryListener de Delta Lake a Delta Lake

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Ejemplo de origen de tasa para el evento StreamingQueryListener de Delta Lake

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}