Supervisión de consultas de Structured Streaming en Azure Databricks
Azure Databricks proporciona una supervisión integrada para las aplicaciones de Structured Streaming mediante la interfaz de usuario de Spark de la pestaña Streaming.
Cómo distinguir consultas de Structured Streaming en la UI de Spark
Para distinguir las métricas pertenecen a cada secuencia de manera sencilla mediante la UI de Spark, proporcione un nombre de consulta único a los flujos. Para ello, agregue el elemento .queryName(<query-name>)
al código writeStream
.
Inserción de métricas de Structured Streaming en servicios externos
Las métricas de streaming se pueden insertar en servicios externos para alertas o casos de uso de paneles mediante la interfaz del cliente de escucha de consultas de streaming de Apache Spark. En Databricks Runtime 11.3 LTS y versiones posteriores, el cliente de escucha de consultas de streaming está disponible en Python y Scala.
Importante
Las credenciales y los objetos administrados por Unity Catalog no se pueden usar en la lógica de StreamingQueryListener
.
Nota:
La latencia de procesamiento con agentes de escucha puede afectar significativamente a las velocidades de procesamiento de consultas. Se recomienda limitar la lógica de procesamiento en estos agentes de escucha y optar por escribir en sistemas de respuesta rápida como Kafka para mejorar la eficacia.
En el código siguiente se proporcionan ejemplos básicos de la sintaxis para implementar un cliente de escucha:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Definición de métricas observables en Structured Streaming
Las métricas observables son funciones de agregado arbitrarias con nombre que se pueden definir en una consulta (DataFrame). En cuanto la ejecución de un DataFrame alcanza un punto de finalización (es decir, finaliza una consulta por lotes o llega a una época de streaming), se emite un evento con nombre que contiene las métricas de los datos procesados desde el último punto de finalización.
Para observar estas métricas, adjunte un cliente de escucha a la sesión de Spark. El cliente de escucha que debe usar dependerá del modo de ejecución:
Modo por lotes: use el cliente
QueryExecutionListener
.El agente
QueryExecutionListener
recibe una llamada cuando se completa una consulta. Acceda a las métricas mediante el mapaQueryExecution.observedMetrics
.Streaming o microlote: use el agente
StreamingQueryListener
.El agente
StreamingQueryListener
recibe una llamada cuando la consulta de streaming completa una época. Acceda a las métricas mediante el mapaStreamingQueryProgress.observedMetrics
. La plataforma Azure Databricks no admite el streaming de ejecución continua.
Por ejemplo:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Métricas del objeto StreamingQueryListener
Métrica | Descripción |
---|---|
id |
Una id. de consulta único que persiste en los reinicios. |
runId |
Identificador de consulta único para cada inicio o reinicio. Consulte StreamingQuery.runId(). |
name |
El nombre especificado por el usuario de la consulta. El nombre es NULL si no se especifica ningún nombre. |
timestamp |
La marca de tiempo para la ejecución del microlote. |
batchId |
Una iId. único del lote actual de datos que se está procesando. En el caso de reintentos tras un error, se puede ejecutar un id. de lote determinado más de una vez. Del mismo modo, cuando no hay datos que se van a procesar, el id. de lote no se incrementa. |
numInputRows |
El número agregado (en todos los orígenes) de registros procesados en un desencadenador. |
inputRowsPerSecond |
La tasa de agregado (en todos los orígenes) de los datos que llegan. |
processedRowsPerSecond |
La tasa de agregado (en todos los orígenes) a la que Spark está procesando datos. |
objeto durationMs
Información sobre el tiempo necesario para completar varias fases del proceso de ejecución de microlote.
Métrica | Descripción |
---|---|
durationMs.addBatch |
El tiempo necesario para ejecutar el microlote. Esto excluye el tiempo que tarda Spark en planear el microlote. |
durationMs.getBatch |
El tiempo necesario para recuperar los metadatos sobre los desplazamientos del origen. |
durationMs.latestOffset |
El desplazamiento más reciente consumido para el microlote. Este objeto de progreso hace referencia al tiempo necesario para recuperar el desplazamiento más reciente de los orígenes. |
durationMs.queryPlanning |
El tiempo necesario para generar el plan de ejecución. |
durationMs.triggerExecution |
El tiempo necesario para planear y ejecutar el microlote. |
durationMs.walCommit |
El tiempo necesario para confirmar los nuevos desplazamientos disponibles. |
objeto eventTime
Información sobre el valor del tiempo del evento visto dentro de los datos que se procesan en el microlote. Esta marca de agua usa estos datos para averiguar cómo recortar el estado para procesar agregaciones con estado definidas en el trabajo de flujo estructurado.
Métrica | Descripción |
---|---|
eventTime.avg |
El promedio de tiempo del evento visto en el desencadenador. |
eventTime.max |
El tiempo máximo del evento visto en el desencadenador. |
eventTime.min |
El tiempo mínimo del evento visto en el desencadenador. |
eventTime.watermark |
El valor de la marca de agua utilizada en el desencadenador. |
objeto stateOperators
Información sobre las operaciones con estado definidas en el trabajo de flujo estructurado y las agregaciones que se generan a partir de ellas.
Métrica | Descripción |
---|---|
stateOperators.operatorName |
El nombre del operador con estado al que se refieren las métricas, comosymmetricHashJoin , dedupe y stateStoreSave . |
stateOperators.numRowsTotal |
El número total de filas en el estado como resultado del operador o agregación con estado. |
stateOperators.numRowsUpdated |
El número total de filas actualizadas en el estado como resultado del operador o agregación con estado. |
stateOperators.allUpdatesTimeMs |
Actualmente, Spark no puede medir esta métrica y está previsto quitarla en futuras actualizaciones. |
stateOperators.numRowsRemoved |
El número total de filas quitadas del estado como resultado del operador o agregación con estado. |
stateOperators.allRemovalsTimeMs |
Actualmente, Spark no puede medir esta métrica y está previsto quitarla en futuras actualizaciones. |
stateOperators.commitTimeMs |
El tiempo necesario para confirmar todas las actualizaciones (colocaciones y eliminaciones) y devolver una nueva versión. |
stateOperators.memoryUsedBytes |
Memoria usada por el almacén de estado. |
stateOperators.numRowsDroppedByWatermark |
El número de filas que se consideran demasiado tardías para incluirse en una agregación con estado. Agregaciones de streaming solo: el número de filas eliminadas después de la agregación (y no las filas de entrada sin procesar). Este número no es preciso, pero proporciona una indicación de que hay datos en tiempo de espera que se quiten. |
stateOperators.numShufflePartitions |
El número de particiones aleatorias para este operador con estado. |
stateOperators.numStateStoreInstances |
La instancia de almacén de estado real que el operador ha inicializado y mantenido. Para muchos operadores con estado, este es el mismo que el número de particiones. Sin embargo, las combinaciones stream-stream inicializan cuatro instancias de almacén de estado por partición. |
objeto stateOperators.customMetrics
Información recopilada de RocksDB que captura métricas sobre su rendimiento y operaciones con respecto a los valores con estado que mantiene para el trabajo de flujo estructurado. Para más información, consulte Configuración del almacén de estado de RocksDB en Azure Databricks.
Métrica | Descripción |
---|---|
customMetrics.rocksdbBytesCopied |
El número de bytes copiados según el seguimiento del administrador de archivos de RocksDB. |
customMetrics.rocksdbCommitCheckpointLatency |
El tiempo en milisegundos para tomar una instantánea de RocksDB nativo y escribirlo en un directorio local. |
customMetrics.rocksdbCompactLatency |
El tiempo en milisegundos para realizar la compactación (opcional) durante la confirmación del punto de comprobación. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
El tiempo en milisegundos que sincroniza la instantánea nativa de RocksDB con el almacenamiento externo (la ubicación del punto de control). |
customMetrics.rocksdbCommitFlushLatency |
El tiempo en milisegundos que vacía los cambios en memoria de RocksDB en memoria en el disco local. |
customMetrics.rocksdbCommitPauseLatency |
Tiempo en milisegundos que detiene los subprocesos de trabajo en segundo plano como parte de la confirmación del punto de comprobación, como la compactación. |
customMetrics.rocksdbCommitWriteBatchLatency |
El tiempo en milisegundos aplicando las escrituras almacenadas provisionalmente en la estructura en memoria (WriteBatch ) a RocksDB nativa. |
customMetrics.rocksdbFilesCopied |
El número de archivos copiados según el seguimiento del administrador de archivos de RocksDB. |
customMetrics.rocksdbFilesReused |
El número de archivos reutilizados según el seguimiento del administrador de archivos de RocksDB. |
customMetrics.rocksdbGetCount |
El número de llamadas get a la base de datos (no incluye gets de WriteBatch : en el lote de memoria usado para escrituras de almacenamiento provisional). |
customMetrics.rocksdbGetLatency |
El promedio de tiempo en nanosegundos para la llamada nativa RocksDB::Get subyacente. |
customMetrics.rocksdbReadBlockCacheHitCount |
Recuento de aciertos de caché de la caché de bloques en RocksDB que son útiles para evitar las lecturas del disco local. |
customMetrics.rocksdbReadBlockCacheMissCount |
El recuento de la caché de bloques en RocksDB no es útil para evitar lecturas de disco local. |
customMetrics.rocksdbSstFileSize |
Tamaño de todos los archivos de tabla ordenada estática (SST): la estructura tabular RocksDB usa para almacenar datos. |
customMetrics.rocksdbTotalBytesRead |
El número de bytes sin comprimir leídos por operaciones de get . |
customMetrics.rocksdbTotalBytesReadByCompaction |
El número de bytes de lectura del proceso de compactación del disco. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
El número de bytes totales de los datos sin comprimir leídos mediante un iterador. Algunas operaciones con estado (por ejemplo, el procesamiento de tiempo de espera en FlatMapGroupsWithState y la marca de agua) requieren leer datos en la base de datos a través de un iterador. |
customMetrics.rocksdbTotalBytesWritten |
El número total de bytes sin comprimir escritos por operaciones de put . |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
El número total de bytes que escribe el proceso de compactación en el disco. |
customMetrics.rocksdbTotalCompactionLatencyMs |
El tiempo en milisegundos para compactaciones de RocksDB, incluidas las compactaciones en segundo plano y la compactación opcional iniciada durante la confirmación. |
customMetrics.rocksdbTotalFlushLatencyMs |
El tiempo total de vaciado, incluido el vaciado en segundo plano. Las operaciones de vaciado son procesos por los que MemTable se vacía en el almacenamiento una vez que está lleno. MemTables son el primer nivel donde se almacenan los datos en RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
Tamaño en bytes de los archivos ZIP sin comprimir, tal como lo informa el Administrador de archivos. El administrador de archivos administra el uso y eliminación del espacio en disco del archivo SST físico. |
objeto de origen (Kafka)
Métrica | Descripción |
---|---|
sources.description |
Descripción detallada del origen de Kafka, especificando el tema exacto de Kafka que se lee. Por ejemplo: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” . |
Objecto sources.startOffset |
El número de desplazamiento inicial en el tema de Kafka en el que se inició el trabajo de streaming. |
Objecto sources.endOffset |
El desplazamiento más reciente procesado por el microlote. Esto podría ser igual a latestOffset para una ejecución de microlote en curso. |
Objecto sources.latestOffset |
El desplazamiento más reciente calculado por el microlote. Es posible que el proceso de microbatching no procese todos los desplazamientos cuando hay limitación, lo que da como resultado la diferenciación de endOffset y latestOffset . |
sources.numInputRows |
El número de filas de entrada procesadas desde este origen. |
sources.inputRowsPerSecond |
La tasa a la que llegan los datos para su procesamiento desde este origen. |
sources.processedRowsPerSecond |
La tasa a la que Spark está procesando datos desde este origen. |
objeto sources.metrics (Kafka)
Métrica | Descripción |
---|---|
sources.metrics.avgOffsetsBehindLatest |
El promedio del número de desplazamientos que la consulta de streaming está detrás del desplazamiento disponible más reciente entre todos los temas suscritos. |
sources.metrics.estimatedTotalBytesBehindLatest |
El número estimado de bytes que el proceso de consulta no ha consumido de los temas suscritos. |
sources.metrics.maxOffsetsBehindLatest |
El número máximo de desplazamientos que la consulta de streaming está detrás del desplazamiento disponible más reciente entre todos los temas suscritos. |
sources.metrics.minOffsetsBehindLatest |
El número mínimo de desplazamientos que la consulta de streaming está detrás del desplazamiento disponible más reciente entre todos los temas suscritos. |
objeto receptor (Kafka)
Métrica | Descripción |
---|---|
sink.description |
La descripción del receptor de Kafka en el que está escribiendo la consulta de streaming, detallando la implementación específica del receptor de Kafka que se está usando. Por ejemplo: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” . |
sink.numOutputRows |
El número de filas escritas en la tabla o receptor de salida como parte del microlote. En algunas situaciones, este valor puede ser "-1" y, por lo general, se puede interpretar como "desconocido". |
objeto de origen (Delta Lake)
Métrica | Descripción |
---|---|
sources.description |
La descripción del origen desde el que se lee la consulta de streaming. Por ejemplo: “DeltaSource[table]” . |
sources.[startOffset/endOffset].sourceVersion |
La versión de la serialización con la que se codifica este desplazamiento. |
sources.[startOffset/endOffset].reservoirId |
El id. de la tabla que se leerá. Se usa para detectar errores de configuración al reiniciar una consulta. |
sources.[startOffset/endOffset].reservoirVersion |
La versión de la tabla que está procesando actualmente. |
sources.[startOffset/endOffset].index |
El índice en la secuencia de AddFiles en esta versión. Esto se usa para dividir las confirmaciones grandes en varios lotes. Este índice se crea ordenando en modificationTimestamp y path . |
sources.[startOffset/endOffset].isStartingVersion |
Identifica si el desplazamiento actual marca el inicio de una nueva consulta de streaming en lugar del procesamiento de los cambios que se produjeron después de procesar los datos iniciales. Al iniciar una nueva consulta, se procesan todos los datos presentes en la tabla al principio y, a continuación, los nuevos datos que hayan llegado. |
sources.latestOffset |
El desplazamiento más reciente procesado por la consulta del microlote. |
sources.numInputRows |
El número de filas de entrada procesadas desde este origen. |
sources.inputRowsPerSecond |
La tasa a la que llegan los datos para su procesamiento desde este origen. |
sources.processedRowsPerSecond |
La tasa a la que Spark está procesando datos desde este origen. |
sources.metrics.numBytesOutstanding |
El tamaño combinado de los archivos pendientes (archivos rastreados por RocksDB). Esta es la métrica de trabajo pendiente para Delta y el cargador automático como origen de streaming. |
sources.metrics.numFilesOutstanding |
El número de archivos pendientes que se van a procesar. Esta es la métrica de trabajo pendiente para Delta y el cargador automático como origen de streaming. |
objeto receptor (Delta Lake)
Métrica | Descripción |
---|---|
sink.description |
Descripción del receptor Delta, que detalla la implementación específica del receptor delta que se usa. Por ejemplo: “DeltaSink[table]” . |
sink.numOutputRows |
El número de filas es siempre "-1" porque Spark no puede inferir filas de salida para receptores DSv1, que es la clasificación para el receptor de Delta Lake. |
Ejemplos
Ejemplo de evento StreamingQueryListener de Kafka a Kafka
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Ejemplo de evento StreamingQueryListener de Delta Lake a Delta Lake
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Ejemplo de evento StreamingQueryListener de Kinesis a Delta Lake
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Ejemplo de evento StreamingQueryListener de Kafka a Delta Lake
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Ejemplo de origen de tasa para el evento StreamingQueryListener de Delta Lake
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}