Compartir a través de


Supervisión de consultas de Structured Streaming en Azure Databricks

Azure Databricks proporciona una supervisión integrada para las aplicaciones de Structured Streaming mediante la interfaz de usuario de Spark de la pestaña Streaming.

Cómo distinguir consultas de Structured Streaming en la UI de Spark

Para distinguir las métricas pertenecen a cada secuencia de manera sencilla mediante la UI de Spark, proporcione un nombre de consulta único a los flujos. Para ello, agregue el elemento .queryName(<query-name>) al código writeStream.

Inserción de métricas de Structured Streaming en servicios externos

Las métricas de streaming se pueden insertar en servicios externos para alertas o casos de uso de paneles mediante la interfaz del cliente de escucha de consultas de streaming de Apache Spark. En Databricks Runtime 11.3 LTS y versiones posteriores, el cliente de escucha de consultas de streaming está disponible en Python y Scala.

Importante

Las credenciales y los objetos administrados por Unity Catalog no se pueden usar en la lógica de StreamingQueryListener.

Nota:

La latencia de procesamiento con agentes de escucha puede afectar significativamente a las velocidades de procesamiento de consultas. Se recomienda limitar la lógica de procesamiento en estos agentes de escucha y optar por escribir en sistemas de respuesta rápida como Kafka para mejorar la eficacia.

En el código siguiente se proporcionan ejemplos básicos de la sintaxis para implementar un cliente de escucha:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definición de métricas observables en Structured Streaming

Las métricas observables son funciones de agregado arbitrarias con nombre que se pueden definir en una consulta (DataFrame). En cuanto la ejecución de un DataFrame alcanza un punto de finalización (es decir, finaliza una consulta por lotes o llega a una época de streaming), se emite un evento con nombre que contiene las métricas de los datos procesados desde el último punto de finalización.

Para observar estas métricas, adjunte un cliente de escucha a la sesión de Spark. El cliente de escucha que debe usar dependerá del modo de ejecución:

  • Modo por lotes: use el cliente QueryExecutionListener.

    El agente QueryExecutionListener recibe una llamada cuando se completa una consulta. Acceda a las métricas mediante el mapa QueryExecution.observedMetrics.

  • Streaming o microlote: use el agente StreamingQueryListener.

    El agente StreamingQueryListener recibe una llamada cuando la consulta de streaming completa una época. Acceda a las métricas mediante el mapa StreamingQueryProgress.observedMetrics. La plataforma Azure Databricks no admite el streaming de ejecución continua.

Por ejemplo:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Métricas del objeto StreamingQueryListener

Métrica Descripción
id Una id. de consulta único que persiste en los reinicios.
runId Identificador de consulta único para cada inicio o reinicio. Consulte StreamingQuery.runId().
name El nombre especificado por el usuario de la consulta. El nombre es NULL si no se especifica ningún nombre.
timestamp La marca de tiempo para la ejecución del microlote.
batchId Una iId. único del lote actual de datos que se está procesando. En el caso de reintentos tras un error, se puede ejecutar un id. de lote determinado más de una vez. Del mismo modo, cuando no hay datos que se van a procesar, el id. de lote no se incrementa.
numInputRows El número agregado (en todos los orígenes) de registros procesados en un desencadenador.
inputRowsPerSecond La tasa de agregado (en todos los orígenes) de los datos que llegan.
processedRowsPerSecond La tasa de agregado (en todos los orígenes) a la que Spark está procesando datos.

objeto durationMs

Información sobre el tiempo necesario para completar varias fases del proceso de ejecución de microlote.

Métrica Descripción
durationMs.addBatch El tiempo necesario para ejecutar el microlote. Esto excluye el tiempo que tarda Spark en planear el microlote.
durationMs.getBatch El tiempo necesario para recuperar los metadatos sobre los desplazamientos del origen.
durationMs.latestOffset El desplazamiento más reciente consumido para el microlote. Este objeto de progreso hace referencia al tiempo necesario para recuperar el desplazamiento más reciente de los orígenes.
durationMs.queryPlanning El tiempo necesario para generar el plan de ejecución.
durationMs.triggerExecution El tiempo necesario para planear y ejecutar el microlote.
durationMs.walCommit El tiempo necesario para confirmar los nuevos desplazamientos disponibles.

objeto eventTime

Información sobre el valor del tiempo del evento visto dentro de los datos que se procesan en el microlote. Esta marca de agua usa estos datos para averiguar cómo recortar el estado para procesar agregaciones con estado definidas en el trabajo de flujo estructurado.

Métrica Descripción
eventTime.avg El promedio de tiempo del evento visto en el desencadenador.
eventTime.max El tiempo máximo del evento visto en el desencadenador.
eventTime.min El tiempo mínimo del evento visto en el desencadenador.
eventTime.watermark El valor de la marca de agua utilizada en el desencadenador.

objeto stateOperators

Información sobre las operaciones con estado definidas en el trabajo de flujo estructurado y las agregaciones que se generan a partir de ellas.

Métrica Descripción
stateOperators.operatorName El nombre del operador con estado al que se refieren las métricas, comosymmetricHashJoin, dedupe y stateStoreSave.
stateOperators.numRowsTotal El número total de filas en el estado como resultado del operador o agregación con estado.
stateOperators.numRowsUpdated El número total de filas actualizadas en el estado como resultado del operador o agregación con estado.
stateOperators.allUpdatesTimeMs Actualmente, Spark no puede medir esta métrica y está previsto quitarla en futuras actualizaciones.
stateOperators.numRowsRemoved El número total de filas quitadas del estado como resultado del operador o agregación con estado.
stateOperators.allRemovalsTimeMs Actualmente, Spark no puede medir esta métrica y está previsto quitarla en futuras actualizaciones.
stateOperators.commitTimeMs El tiempo necesario para confirmar todas las actualizaciones (colocaciones y eliminaciones) y devolver una nueva versión.
stateOperators.memoryUsedBytes Memoria usada por el almacén de estado.
stateOperators.numRowsDroppedByWatermark El número de filas que se consideran demasiado tardías para incluirse en una agregación con estado. Agregaciones de streaming solo: el número de filas eliminadas después de la agregación (y no las filas de entrada sin procesar). Este número no es preciso, pero proporciona una indicación de que hay datos en tiempo de espera que se quiten.
stateOperators.numShufflePartitions El número de particiones aleatorias para este operador con estado.
stateOperators.numStateStoreInstances La instancia de almacén de estado real que el operador ha inicializado y mantenido. Para muchos operadores con estado, este es el mismo que el número de particiones. Sin embargo, las combinaciones stream-stream inicializan cuatro instancias de almacén de estado por partición.

objeto stateOperators.customMetrics

Información recopilada de RocksDB que captura métricas sobre su rendimiento y operaciones con respecto a los valores con estado que mantiene para el trabajo de flujo estructurado. Para más información, consulte Configuración del almacén de estado de RocksDB en Azure Databricks.

Métrica Descripción
customMetrics.rocksdbBytesCopied El número de bytes copiados según el seguimiento del administrador de archivos de RocksDB.
customMetrics.rocksdbCommitCheckpointLatency El tiempo en milisegundos para tomar una instantánea de RocksDB nativo y escribirlo en un directorio local.
customMetrics.rocksdbCompactLatency El tiempo en milisegundos para realizar la compactación (opcional) durante la confirmación del punto de comprobación.
customMetrics.rocksdbCommitFileSyncLatencyMs El tiempo en milisegundos que sincroniza la instantánea nativa de RocksDB con el almacenamiento externo (la ubicación del punto de control).
customMetrics.rocksdbCommitFlushLatency El tiempo en milisegundos que vacía los cambios en memoria de RocksDB en memoria en el disco local.
customMetrics.rocksdbCommitPauseLatency Tiempo en milisegundos que detiene los subprocesos de trabajo en segundo plano como parte de la confirmación del punto de comprobación, como la compactación.
customMetrics.rocksdbCommitWriteBatchLatency El tiempo en milisegundos aplicando las escrituras almacenadas provisionalmente en la estructura en memoria (WriteBatch) a RocksDB nativa.
customMetrics.rocksdbFilesCopied El número de archivos copiados según el seguimiento del administrador de archivos de RocksDB.
customMetrics.rocksdbFilesReused El número de archivos reutilizados según el seguimiento del administrador de archivos de RocksDB.
customMetrics.rocksdbGetCount El número de llamadas get a la base de datos (no incluye gets de WriteBatch: en el lote de memoria usado para escrituras de almacenamiento provisional).
customMetrics.rocksdbGetLatency El promedio de tiempo en nanosegundos para la llamada nativa RocksDB::Get subyacente.
customMetrics.rocksdbReadBlockCacheHitCount Recuento de aciertos de caché de la caché de bloques en RocksDB que son útiles para evitar las lecturas del disco local.
customMetrics.rocksdbReadBlockCacheMissCount El recuento de la caché de bloques en RocksDB no es útil para evitar lecturas de disco local.
customMetrics.rocksdbSstFileSize Tamaño de todos los archivos de tabla ordenada estática (SST): la estructura tabular RocksDB usa para almacenar datos.
customMetrics.rocksdbTotalBytesRead El número de bytes sin comprimir leídos por operaciones de get.
customMetrics.rocksdbTotalBytesReadByCompaction El número de bytes de lectura del proceso de compactación del disco.
customMetrics.rocksdbTotalBytesReadThroughIterator El número de bytes totales de los datos sin comprimir leídos mediante un iterador. Algunas operaciones con estado (por ejemplo, el procesamiento de tiempo de espera en FlatMapGroupsWithState y la marca de agua) requieren leer datos en la base de datos a través de un iterador.
customMetrics.rocksdbTotalBytesWritten El número total de bytes sin comprimir escritos por operaciones de put.
customMetrics.rocksdbTotalBytesWrittenByCompaction El número total de bytes que escribe el proceso de compactación en el disco.
customMetrics.rocksdbTotalCompactionLatencyMs El tiempo en milisegundos para compactaciones de RocksDB, incluidas las compactaciones en segundo plano y la compactación opcional iniciada durante la confirmación.
customMetrics.rocksdbTotalFlushLatencyMs El tiempo total de vaciado, incluido el vaciado en segundo plano. Las operaciones de vaciado son procesos por los que MemTable se vacía en el almacenamiento una vez que está lleno. MemTables son el primer nivel donde se almacenan los datos en RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed Tamaño en bytes de los archivos ZIP sin comprimir, tal como lo informa el Administrador de archivos. El administrador de archivos administra el uso y eliminación del espacio en disco del archivo SST físico.

objeto de origen (Kafka)

Métrica Descripción
sources.description Descripción detallada del origen de Kafka, especificando el tema exacto de Kafka que se lee. Por ejemplo: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
Objecto sources.startOffset El número de desplazamiento inicial en el tema de Kafka en el que se inició el trabajo de streaming.
Objecto sources.endOffset El desplazamiento más reciente procesado por el microlote. Esto podría ser igual a latestOffset para una ejecución de microlote en curso.
Objecto sources.latestOffset El desplazamiento más reciente calculado por el microlote. Es posible que el proceso de microbatching no procese todos los desplazamientos cuando hay limitación, lo que da como resultado la diferenciación de endOffset y latestOffset.
sources.numInputRows El número de filas de entrada procesadas desde este origen.
sources.inputRowsPerSecond La tasa a la que llegan los datos para su procesamiento desde este origen.
sources.processedRowsPerSecond La tasa a la que Spark está procesando datos desde este origen.

objeto sources.metrics (Kafka)

Métrica Descripción
sources.metrics.avgOffsetsBehindLatest El promedio del número de desplazamientos que la consulta de streaming está detrás del desplazamiento disponible más reciente entre todos los temas suscritos.
sources.metrics.estimatedTotalBytesBehindLatest El número estimado de bytes que el proceso de consulta no ha consumido de los temas suscritos.
sources.metrics.maxOffsetsBehindLatest El número máximo de desplazamientos que la consulta de streaming está detrás del desplazamiento disponible más reciente entre todos los temas suscritos.
sources.metrics.minOffsetsBehindLatest El número mínimo de desplazamientos que la consulta de streaming está detrás del desplazamiento disponible más reciente entre todos los temas suscritos.

objeto receptor (Kafka)

Métrica Descripción
sink.description La descripción del receptor de Kafka en el que está escribiendo la consulta de streaming, detallando la implementación específica del receptor de Kafka que se está usando. Por ejemplo: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows El número de filas escritas en la tabla o receptor de salida como parte del microlote. En algunas situaciones, este valor puede ser "-1" y, por lo general, se puede interpretar como "desconocido".

objeto de origen (Delta Lake)

Métrica Descripción
sources.description La descripción del origen desde el que se lee la consulta de streaming. Por ejemplo: “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion La versión de la serialización con la que se codifica este desplazamiento.
sources.[startOffset/endOffset].reservoirId El id. de la tabla que se leerá. Se usa para detectar errores de configuración al reiniciar una consulta.
sources.[startOffset/endOffset].reservoirVersion La versión de la tabla que está procesando actualmente.
sources.[startOffset/endOffset].index El índice en la secuencia de AddFiles en esta versión. Esto se usa para dividir las confirmaciones grandes en varios lotes. Este índice se crea ordenando en modificationTimestamp y path.
sources.[startOffset/endOffset].isStartingVersion Identifica si el desplazamiento actual marca el inicio de una nueva consulta de streaming en lugar del procesamiento de los cambios que se produjeron después de procesar los datos iniciales. Al iniciar una nueva consulta, se procesan todos los datos presentes en la tabla al principio y, a continuación, los nuevos datos que hayan llegado.
sources.latestOffset El desplazamiento más reciente procesado por la consulta del microlote.
sources.numInputRows El número de filas de entrada procesadas desde este origen.
sources.inputRowsPerSecond La tasa a la que llegan los datos para su procesamiento desde este origen.
sources.processedRowsPerSecond La tasa a la que Spark está procesando datos desde este origen.
sources.metrics.numBytesOutstanding El tamaño combinado de los archivos pendientes (archivos rastreados por RocksDB). Esta es la métrica de trabajo pendiente para Delta y el cargador automático como origen de streaming.
sources.metrics.numFilesOutstanding El número de archivos pendientes que se van a procesar. Esta es la métrica de trabajo pendiente para Delta y el cargador automático como origen de streaming.

objeto receptor (Delta Lake)

Métrica Descripción
sink.description Descripción del receptor Delta, que detalla la implementación específica del receptor delta que se usa. Por ejemplo: “DeltaSink[table]”.
sink.numOutputRows El número de filas es siempre "-1" porque Spark no puede inferir filas de salida para receptores DSv1, que es la clasificación para el receptor de Delta Lake.

Ejemplos

Ejemplo de evento StreamingQueryListener de Kafka a Kafka

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Ejemplo de evento StreamingQueryListener de Delta Lake a Delta Lake

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Ejemplo de evento StreamingQueryListener de Kinesis a Delta Lake

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Ejemplo de evento StreamingQueryListener de Kafka a Delta Lake

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Ejemplo de origen de tasa para el evento StreamingQueryListener de Delta Lake

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}