Partilhar via


Monitorizar as consultas de Transmissão em Fluxo Estruturada no Azure Databricks

O Azure Databricks fornece monitoramento interno para aplicativos de Streaming Estruturado por meio da interface do usuário do Spark na guia Streaming .

Distinguir consultas de streaming estruturado na interface do usuário do Spark

Forneça aos seus streams um nome de consulta exclusivo adicionando .queryName(<query-name>) ao seu writeStream código para distinguir facilmente quais métricas pertencem a qual fluxo na interface do usuário do Spark.

Envie métricas de Streaming estruturado para serviços externos

As métricas de streaming podem ser enviadas por push para serviços externos para alertas ou painéis de casos de uso usando a interface Streaming Query Listener do Apache Spark. No Databricks Runtime 11.3 LTS e superior, o Streaming Query Listener está disponível em Python e Scala.

Importante

Credenciais e objetos gerenciados pelo Unity Catalog não podem ser usados na StreamingQueryListener lógica.

Nota

A latência de processamento com ouvintes pode afetar significativamente as velocidades de processamento de consultas. É aconselhável limitar a lógica de processamento nesses ouvintes e optar por escrever em sistemas de resposta rápida como Kafka para eficiência.

O código a seguir fornece exemplos básicos da sintaxe para implementar um ouvinte:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definição de métricas observáveis no Streaming Estruturado

As métricas observáveis são chamadas de funções agregadas arbitrárias que podem ser definidas em uma consulta (DataFrame). Assim que a execução de um DataFrame atinge um ponto de conclusão (ou seja, conclui uma consulta em lote ou atinge uma época de streaming), é emitido um evento nomeado que contém as métricas para os dados processados desde o último ponto de conclusão.

Você pode observar essas métricas anexando um ouvinte à sessão do Spark. O ouvinte depende do modo de execução:

  • Modo de lote: Use QueryExecutionListener.

    QueryExecutionListener é chamado quando a consulta é concluída. Acesse as métricas usando o QueryExecution.observedMetrics mapa.

  • Streaming, ou microlote: Use StreamingQueryListener.

    StreamingQueryListener é chamado quando a consulta de streaming completa uma época. Acesse as métricas usando o StreamingQueryProgress.observedMetrics mapa. O Azure Databricks não oferece suporte ao streaming de execução contínua.

Por exemplo:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Métricas de objeto StreamingQueryListener

Métrico Description
id Um ID de consulta exclusivo que persiste nas reinicializações.
runId Uma id de consulta que é exclusiva para cada início/reinício. Consulte StreamingQuery.runId().
name O nome especificado pelo usuário da consulta. Nome é nulo se nenhum nome for especificado.
timestamp O carimbo de data/hora para a execução do microlote.
batchId Um ID exclusivo para o lote atual de dados que está sendo processado. No caso de novas tentativas após uma falha, um determinado ID de lote pode ser executado mais de uma vez. Da mesma forma, quando não há dados a serem processados, o ID do lote não é incrementado.
numInputRows O número agregado (em todas as fontes) de registros processados em um gatilho.
inputRowsPerSecond A taxa agregada (em todas as fontes) de dados que chegam.
processedRowsPerSecond A taxa agregada (em todas as fontes) na qual o Spark está processando dados.

objeto durationMs

Informações sobre o tempo necessário para concluir várias etapas do processo de execução do microlote.

Métrico Description
durationMs.addBatch O tempo necessário para executar o microlote. Isso exclui o tempo que o Spark leva para planejar o microlote.
durationMs.getBatch O tempo necessário para recuperar os metadados sobre os deslocamentos da fonte.
durationMs.latestOffset O último offset consumido para o microlote. Este objeto de progresso refere-se ao tempo necessário para recuperar o deslocamento mais recente das fontes.
durationMs.queryPlanning O tempo necessário para gerar o plano de execução.
durationMs.triggerExecution O tempo necessário para planear e executar o microlote.
durationMs.walCommit O tempo necessário para comprometer as novas compensações disponíveis.

objeto eventTime

Informações sobre o valor de tempo do evento visto nos dados que estão sendo processados no microlote. Esses dados são usados pela marca d'água para descobrir como cortar o estado para processar agregações com estado definidas no trabalho de Streaming Estruturado.

Métrico Description
eventTime.avg O tempo médio de evento visto nesse gatilho.
eventTime.max O tempo máximo de evento visto nesse gatilho.
eventTime.min O tempo mínimo de evento visto nesse gatilho.
eventTime.watermark O valor da marca d'água usada nesse gatilho.

objeto stateOperators

Informações sobre as operações com estado definidas no trabalho de Streaming Estruturado e as agregações produzidas a partir delas.

Métrico Description
stateOperators.operatorName O nome do operador stateful ao qual as métricas se relacionam, como symmetricHashJoin, dedupe, stateStoreSave.
stateOperators.numRowsTotal O número total de linhas no estado como resultado de um operador ou agregação com monitoração de estado.
stateOperators.numRowsUpdated O número total de linhas atualizadas no estado como resultado de um operador ou agregação com monitoração de estado.
stateOperators.allUpdatesTimeMs Atualmente, essa métrica não é mensurável pelo Spark e está planejada para ser removida em atualizações futuras.
stateOperators.numRowsRemoved O número total de linhas removidas do estado como resultado de um operador ou agregação com monitoração de estado.
stateOperators.allRemovalsTimeMs Atualmente, essa métrica não é mensurável pelo Spark e está planejada para ser removida em atualizações futuras.
stateOperators.commitTimeMs O tempo necessário para confirmar todas as atualizações (coloca e remove) e retornar uma nova versão.
stateOperators.memoryUsedBytes Memória usada pelo armazenamento de estado.
stateOperators.numRowsDroppedByWatermark O número de linhas que são consideradas tardias demais para serem incluídas em uma agregação com estado. Somente agregações de streaming: o número de linhas descartadas pós-agregação (não linhas de entrada brutas). Este número não é preciso, mas fornece uma indicação de que há dados atrasados sendo descartados.
stateOperators.numShufflePartitions O número de partições aleatórias para este operador stateful.
stateOperators.numStateStoreInstances A instância de armazenamento de estado real que o operador inicializou e manteve. Para muitos operadores com estado, isso é o mesmo que o número de partições. No entanto, as junções de fluxo inicializam quatro instâncias de armazenamento de estado por partição.

objeto stateOperators.customMetrics

Informações coletadas do RocksDB capturando métricas sobre seu desempenho e operações em relação aos valores de estado que mantém para o trabalho de Streaming Estruturado. Para obter mais informações, consulte Configurar o armazenamento de estado do RocksDB no Azure Databricks.

Métrico Description
customMetrics.rocksdbBytesCopied O número de bytes copiados conforme rastreados pelo Gerenciador de arquivos RocksDB.
customMetrics.rocksdbCommitCheckpointLatency O tempo em milissegundos tirar um instantâneo do RocksDB nativo e gravá-lo em um diretório local.
customMetrics.rocksdbCompactLatency O tempo em milissegundos de compactação (opcional) durante a confirmação do ponto de verificação.
customMetrics.rocksdbCommitFileSyncLatencyMs O tempo em milissegundos sincronizando o snapshot nativo do RocksDB com o armazenamento externo (o local do ponto de verificação).
customMetrics.rocksdbCommitFlushLatency O tempo em milissegundos liberando o RocksDB na memória muda para o disco local.
customMetrics.rocksdbCommitPauseLatency O tempo em milissegundos parando os threads de trabalho em segundo plano como parte da confirmação do ponto de verificação, como para compactação.
customMetrics.rocksdbCommitWriteBatchLatency O tempo em milissegundos aplicando as gravações em estágios na estrutura na memória (WriteBatch) ao RocksDB nativo.
customMetrics.rocksdbFilesCopied O número de arquivos copiados conforme rastreados pelo Gerenciador de arquivos RocksDB.
customMetrics.rocksdbFilesReused O número de arquivos reutilizados conforme rastreado pelo RocksDB File Manager.
customMetrics.rocksdbGetCount O número de chamadas para o banco de get dados (não inclui gets de - lote na WriteBatch memória usado para gravações de preparação).
customMetrics.rocksdbGetLatency O tempo médio em nanossegundos para a chamada nativa RocksDB::Get subjacente.
customMetrics.rocksdbReadBlockCacheHitCount A contagem de acertos de cache do cache de bloco no RocksDB que são úteis para evitar leituras de disco local.
customMetrics.rocksdbReadBlockCacheMissCount A contagem do cache de bloco no RocksDB não é útil para evitar leituras de disco local.
customMetrics.rocksdbSstFileSize O tamanho de todos os arquivos SST (Static Sorted Table) - a estrutura tabular que o RocksDB usa para armazenar dados.
customMetrics.rocksdbTotalBytesRead O número de bytes não compactados lidos por get operações.
customMetrics.rocksdbTotalBytesReadByCompaction O número de bytes que o processo de compactação lê do disco.
customMetrics.rocksdbTotalBytesReadThroughIterator O número total de bytes de dados não compactados lidos usando um iterador. Algumas operações com monitoração de estado (por exemplo, processamento de tempo limite e FlatMapGroupsWithState marca d'água) exigem a leitura de dados no banco de dados por meio de um iterador.
customMetrics.rocksdbTotalBytesWritten O número total de bytes não compactados gravados por put operações.
customMetrics.rocksdbTotalBytesWrittenByCompaction O número total de bytes que o processo de compactação grava no disco.
customMetrics.rocksdbTotalCompactionLatencyMs O tempo em milissegundos para compactações do RocksDB, incluindo compactações em segundo plano e a compactação opcional iniciada durante a confirmação.
customMetrics.rocksdbTotalFlushLatencyMs O tempo total de descarga, incluindo lavagem de fundo. As operações de descarga são processos pelos quais o é liberado MemTable para o armazenamento assim que estiver cheio. MemTables são o primeiro nível onde os dados são armazenados no RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed O tamanho em bytes dos arquivos zip não compactados, conforme relatado pelo Gerenciador de arquivos. O Gerenciador de arquivos gerencia a utilização e a exclusão do espaço em disco do arquivo SST físico.

objeto sources (Kafka)

Métrico Description
sources.description Uma descrição detalhada da fonte Kafka, especificando o tópico exato de Kafka que está sendo lido. Por exemplo: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
sources.startOffset objeto O número de deslocamento inicial dentro do tópico Kafka no qual o trabalho de streaming começou.
sources.endOffset objeto O último offset processado pelo microlote. Isso pode ser igual a latestOffset uma execução de microlote em andamento.
sources.latestOffset objeto O último offset figurado pelo microlote. O processo de microbatching pode não processar todas as compensações quando há limitação, o que resulta em endOffset e latestOffset differiong.
sources.numInputRows O número de linhas de entrada processadas a partir dessa fonte.
sources.inputRowsPerSecond O ritmo a que os dados estão a chegar para processamento a partir desta fonte.
sources.processedRowsPerSecond A taxa na qual o Spark está processando dados dessa fonte.

objeto sources.metrics (Kafka)

Métrico Description
sources.metrics.avgOffsetsBehindLatest O número médio de compensações que a consulta de streaming está por trás do último deslocamento disponível entre todos os tópicos inscritos.
sources.metrics.estimatedTotalBytesBehindLatest O número estimado de bytes que o processo de consulta não consumiu dos tópicos inscritos.
sources.metrics.maxOffsetsBehindLatest O número máximo de compensações que a consulta de streaming está por trás do último deslocamento disponível entre todos os tópicos inscritos.
sources.metrics.minOffsetsBehindLatest O número mínimo de compensações que a consulta de streaming está por trás do último deslocamento disponível entre todos os tópicos inscritos.

objeto da pia (Kafka)

Métrico Description
sink.description A descrição do coletor Kafka no qual a consulta de streaming está escrevendo, detalhando a implementação específica do coletor Kafka que está sendo usada. Por exemplo: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows O número de linhas que foram gravadas na tabela de saída ou no coletor como parte do microlote. Para algumas situações, este valor pode ser "-1" e geralmente pode ser interpretado como "desconhecido".

objeto sources (Delta Lake)

Métrico Description
sources.description A descrição da fonte a partir da qual a consulta de streaming está sendo lida. Por exemplo: “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion A versão da serialização com a qual esse deslocamento é codificado.
sources.[startOffset/endOffset].reservoirId A ID da tabela que está sendo lida. Isso é usado para detetar erros de configuração ao reiniciar uma consulta.
sources.[startOffset/endOffset].reservoirVersion A versão da tabela que está sendo processada no momento.
sources.[startOffset/endOffset].index O índice na sequência de AddFiles nesta versão. Isso é usado para dividir grandes confirmações em vários lotes. Esse índice é criado classificando em modificationTimestamp e path.
sources.[startOffset/endOffset].isStartingVersion Identifica se o deslocamento atual marca o início de uma nova consulta de streaming em vez do processamento de alterações que ocorreram após o processamento dos dados iniciais. Ao iniciar uma nova consulta, todos os dados presentes na tabela no início são processados primeiro e, em seguida, todos os novos dados que chegam.
sources.latestOffset O último deslocamento processado pela consulta de microlote.
sources.numInputRows O número de linhas de entrada processadas a partir dessa fonte.
sources.inputRowsPerSecond O ritmo a que os dados estão a chegar para processamento a partir desta fonte.
sources.processedRowsPerSecond A taxa na qual o Spark está processando dados dessa fonte.
sources.metrics.numBytesOutstanding O tamanho combinado dos arquivos pendentes (arquivos rastreados pelo RocksDB). Esta é a métrica de backlog para Delta e Auto Loader como a fonte de streaming.
sources.metrics.numFilesOutstanding O número de processos pendentes a processar. Esta é a métrica de backlog para Delta e Auto Loader como a fonte de streaming.

objeto de pia (Lago Delta)

Métrico Description
sink.description A descrição do sumidouro Delta, detalhando a implementação específica do sumidouro Delta que está sendo usada. Por exemplo: “DeltaSink[table]”.
sink.numOutputRows O número de linhas é sempre "-1" porque o Spark não pode inferir linhas de saída para coletores DSv1, que é a classificação para o dissipador Delta Lake.

Exemplos

Exemplo de evento Kafka-to-Kafka StreamingQueryListener

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Exemplo de evento Delta Lake-to-Delta Lake StreamingQueryListener

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Exemplo de evento Kinesis-to-Delta Lake StreamingQueryListener

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Exemplo de evento Kafka+Delta Lake-to-Delta Lake StreamingQueryListener

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Exemplo de fonte de taxa para o evento Delta Lake StreamingQueryListener

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}