다음을 통해 공유


Azure Databricks에서 구조적 스트리밍 쿼리 모니터링

Azure Databricks는 스트리밍 탭의 Spark UI를 통해 구조적 스트리밍 애플리케이션에 대한 기본 제공 모니터링을 제공합니다.

Spark UI에서 구조적 스트리밍 쿼리 구분

writeStream 코드에 .queryName(<query-name>)을 추가하여 Spark UI의 스트림에 속하는 메트릭을 쉽게 구분하여 스트림에 고유한 쿼리 이름을 제공합니다.

외부 서비스에 구조적 스트리밍 메트릭 푸시

Apache Spark의 스트리밍 쿼리 수신기 인터페이스를 사용하여 경고 또는 대시보드 사용 사례를 위해 스트리밍 메트릭을 외부 서비스로 푸시할 수 있습니다. Databricks Runtime 11.3 LTS 이상에서는 Python 및 Scala에서 스트리밍 쿼리 수신기를 사용할 수 있습니다.

Important

Unity 카탈로그에서 관리하는 자격 증명 및 개체는 논리에서 StreamingQueryListener 사용할 수 없습니다.

참고 항목

수신기를 사용하여 대기 시간을 처리하면 쿼리 처리 속도에 큰 영향을 줄 수 있습니다. 이러한 수신기에서 처리 논리를 제한하고 효율성을 위해 Kafka와 같은 빠른 응답 시스템에 쓰기를 선택하는 것이 좋습니다.

다음 코드는 수신기를 구현하기 위한 구문의 기본 예제를 제공합니다.

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

구조적 스트리밍에서 관찰 가능한 메트릭 정의

관찰 가능한 메트릭은 쿼리(DataFrame)에서 정의할 수 있는 임의의 집계 함수로 명명됩니다. DataFrame의 실행이 완료 지점에 도달하는 즉시(즉, 일괄 처리 쿼리를 완료하거나 스트리밍 Epoch에 도달) 마지막 완료 지점 이후 처리된 데이터에 대한 메트릭을 포함하는 명명된 이벤트가 내보내집니다.

수신기를 Spark 세션에 연결하여 이러한 메트릭을 관찰할 수 있습니다. 수신기는 실행 모드에 따라 달라집니다.

  • 일괄 처리 모드: QueryExecutionListener을(를) 사용합니다.

    QueryExecutionListener는 쿼리가 완료되면 호출됩니다. QueryExecution.observedMetrics 맵을 사용하여 메트릭에 액세스합니다.

  • 스트리밍 또는 마이크로배치: .를 사용합니다 StreamingQueryListener.

    StreamingQueryListener는 스트리밍 쿼리가 Epoch를 완료할 때 호출됩니다. StreamingQueryProgress.observedMetrics 맵을 사용하여 메트릭에 액세스합니다. Azure Databricks는 연속 실행 스트리밍을 지원하지 않습니다.

예시:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

StreamingQueryListener 개체 메트릭

메트릭 설명
id 다시 시작할 때 지속되는 고유한 쿼리 ID입니다.
runId 시작/다시 시작할 때마다 고유한 쿼리 ID입니다. StreamingQuery.runId()를 참조하세요.
name 사용자가 지정한 쿼리 이름입니다. 이름이 지정되지 않은 경우 이름은 null입니다.
timestamp 마이크로배치 실행을 위한 타임스탬프입니다.
batchId 처리 중인 데이터의 현재 일괄 처리에 대한 고유 ID입니다. 실패 후 재시도하는 경우 지정된 일괄 처리 ID가 두 번 이상 실행될 수 있습니다. 마찬가지로 처리할 데이터가 없으면 일괄 처리 ID가 증가하지 않습니다.
numInputRows 트리거에서 처리된 레코드의 집계(모든 원본에서) 수입니다.
inputRowsPerSecond 도착하는 데이터의 집계(모든 원본에서) 속도입니다.
processedRowsPerSecond Spark가 데이터를 처리하는 집계(모든 원본에서) 속도입니다.

durationMs 개체

마이크로배치 실행 프로세스의 다양한 단계를 완료하는 데 걸리는 시간에 대한 정보입니다.

메트릭 설명
durationMs.addBatch 마이크로배치를 실행하는 데 걸린 시간입니다. 그러면 Spark가 마이크로배치를 계획하는 데 걸리는 시간이 제외됩니다.
durationMs.getBatch 원본에서 오프셋에 대한 메타데이터를 검색하는 데 걸리는 시간입니다.
durationMs.latestOffset 마이크로배치에 사용되는 최신 오프셋입니다. 이 진행률 개체는 원본에서 최신 오프셋을 검색하는 데 걸린 시간을 나타냅니다.
durationMs.queryPlanning 실행 계획을 생성하는 데 걸린 시간입니다.
durationMs.triggerExecution 마이크로배치를 계획하고 실행하는 데 걸리는 시간입니다.
durationMs.walCommit 사용 가능한 새 오프셋을 커밋하는 데 걸린 시간입니다.

eventTime 개체

마이크로배치에서 처리되는 데이터 내에서 표시되는 이벤트 시간 값에 대한 정보입니다. 이 데이터는 워터마크에서 구조적 스트리밍 작업에 정의된 상태 저장 집계를 처리하기 위해 상태를 트리밍하는 방법을 파악하는 데 사용됩니다.

메트릭 설명
eventTime.avg 해당 트리거에 표시되는 평균 이벤트 시간입니다.
eventTime.max 해당 트리거에 표시되는 최대 이벤트 시간입니다.
eventTime.min 해당 트리거에 표시되는 최소 이벤트 시간입니다.
eventTime.watermark 해당 트리거에 사용되는 워터마크의 값입니다.

stateOperators 개체

구조적 스트리밍 작업에 정의된 상태 저장 작업 및 해당 작업에서 생성된 집계에 대한 정보입니다.

메트릭 설명
stateOperators.operatorName 메트릭이 관련되는 상태 저장 연산자의 이름(예: symmetricHashJoin, dedupe. stateStoreSave
stateOperators.numRowsTotal 상태 저장 연산자 또는 집계의 결과로 발생한 상태의 총 행 수입니다.
stateOperators.numRowsUpdated 상태 저장 연산자 또는 집계의 결과로 상태에서 업데이트된 총 행 수입니다.
stateOperators.allUpdatesTimeMs 이 메트릭은 현재 Spark에서 측정할 수 없으며 향후 업데이트에서 제거될 예정입니다.
stateOperators.numRowsRemoved 상태 저장 연산자 또는 집계의 결과로 상태에서 제거된 총 행 수입니다.
stateOperators.allRemovalsTimeMs 이 메트릭은 현재 Spark에서 측정할 수 없으며 향후 업데이트에서 제거될 예정입니다.
stateOperators.commitTimeMs 모든 업데이트를 커밋하고(배치 및 제거) 새 버전을 반환하는 데 걸린 시간입니다.
stateOperators.memoryUsedBytes 상태 저장소에서 사용하는 메모리입니다.
stateOperators.numRowsDroppedByWatermark 상태 저장 집계에 포함하기에는 너무 늦은 것으로 간주되는 행의 수입니다. 스트리밍 집계만 해당: 집계 후 삭제된 행의 수입니다(원시 입력 행이 아님). 이 숫자는 정확하지는 않지만 삭제되는 지연 데이터가 있음을 나타냅니다.
stateOperators.numShufflePartitions 이 상태 저장 연산자의 순서 섞기 파티션 수입니다.
stateOperators.numStateStoreInstances 연산자가 초기화하고 유지 관리한 실제 상태 저장소 인스턴스입니다. 많은 상태 저장 연산자의 경우 파티션 수와 동일합니다. 그러나 스트림 스트림 조인은 파티션당 4개의 상태 저장소 인스턴스를 초기화합니다.

stateOperators.customMetrics 개체

Structured Streaming 작업에 대해 유지 관리되는 상태 저장 값과 관련하여 해당 성능 및 작업에 대한 메트릭을 캡처하는 RocksDB에서 수집된 정보입니다. 자세한 내용은 Azure Databricks에서 RocksDB 상태 저장소 구성을 참조하세요.

메트릭 설명
customMetrics.rocksdbBytesCopied RocksDB 파일 관리자가 추적한 대로 복사한 바이트 수입니다.
customMetrics.rocksdbCommitCheckpointLatency 네이티브 RocksDB의 스냅샷을 만들고 로컬 디렉터리에 쓰는 시간(밀리초)입니다.
customMetrics.rocksdbCompactLatency 검사점 커밋 중 압축(선택 사항)의 시간(밀리초)입니다.
customMetrics.rocksdbCommitFileSyncLatencyMs 네이티브 RocksDB 스냅샷을 외부 스토리지(검사점 위치)에 동기화하는 시간(밀리초)입니다.
customMetrics.rocksdbCommitFlushLatency RocksDB 메모리 내 변경 내용을 로컬 디스크로 플러시하는 시간(밀리초)입니다.
customMetrics.rocksdbCommitPauseLatency 압축과 같이 검사점 커밋의 일부로 백그라운드 작업자 스레드를 중지하는 시간(밀리초)입니다.
customMetrics.rocksdbCommitWriteBatchLatency 스테이징된 쓰기를 메모리 내 구조()에서 네이티브 RocksDB에 적용하는 시간(WriteBatch밀리초)입니다.
customMetrics.rocksdbFilesCopied RocksDB 파일 관리자가 추적한 대로 복사한 파일 수입니다.
customMetrics.rocksdbFilesReused RocksDB 파일 관리자가 추적한 대로 다시 사용하는 파일 수입니다.
customMetrics.rocksdbGetCount DB에 대한 호출 수 get 입니다(준비 쓰기에 사용되는 메모리 내 일괄 처리에서 포함되지 gets WriteBatch 않음).
customMetrics.rocksdbGetLatency 기본 네이티브 RocksDB::Get 호출의 평균 시간(나노초)입니다.
customMetrics.rocksdbReadBlockCacheHitCount 로컬 디스크 읽기를 방지하는 데 유용한 RocksDB의 블록 캐시에서 캐시 적중 횟수입니다.
customMetrics.rocksdbReadBlockCacheMissCount RocksDB의 블록 캐시 수는 로컬 디스크 읽기를 방지하는 데 유용하지 않습니다.
customMetrics.rocksdbSstFileSize 모든 SST(정적 정렬 테이블) 파일의 크기 - RocksDB가 데이터를 저장하는 데 사용하는 테이블 형식 구조입니다.
customMetrics.rocksdbTotalBytesRead 작업에서 읽 get 은 압축되지 않은 바이트 수입니다.
customMetrics.rocksdbTotalBytesReadByCompaction 압축 프로세스가 디스크에서 읽는 바이트 수입니다.
customMetrics.rocksdbTotalBytesReadThroughIterator 반복기를 사용하여 읽은 압축되지 않은 데이터의 총 바이트 수입니다. 일부 상태 저장 작업(예: 시간 제한 처리 및 FlatMapGroupsWithState 워터마크)에는 반복기를 통해 DB에서 데이터를 읽어야 합니다.
customMetrics.rocksdbTotalBytesWritten 연산으로 작성된 put 압축되지 않은 총 바이트 수입니다.
customMetrics.rocksdbTotalBytesWrittenByCompaction 압축 프로세스가 디스크에 쓰는 총 바이트 수입니다.
customMetrics.rocksdbTotalCompactionLatencyMs 백그라운드 압축 및 커밋 중에 시작된 선택적 압축을 포함하여 RocksDB 압축의 시간(밀리초)입니다.
customMetrics.rocksdbTotalFlushLatencyMs 백그라운드 플러시를 포함한 총 플러시 시간입니다. 플러시 작업은 가득 차면 스토리지로 플러시되는 프로세스 MemTable 입니다. MemTables 는 데이터가 RocksDB에 저장되는 첫 번째 수준입니다.
customMetrics.rocksdbZipFileBytesUncompressed 파일 관리자가 보고한 압축되지 않은 zip 파일의 크기(바이트)입니다. 파일 관리자는 물리적 SST 파일 디스크 공간 사용률 및 삭제를 관리합니다.

sources 개체(Kafka)

메트릭 설명
sources.description 읽는 정확한 Kafka 토픽을 지정하는 Kafka 원본에 대한 자세한 설명입니다. 예: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”
sources.startOffset 개체 스트리밍 작업이 시작된 Kafka 토픽 내의 시작 오프셋 번호입니다.
sources.endOffset 개체 마이크로배치에서 처리한 마지막 오프셋입니다. 이는 진행 중인 마이크로배치 실행의 경우와 같을 latestOffset 수 있습니다.
sources.latestOffset 개체 마이크로배치가 파악한 최신 오프셋입니다. 마이크로배칭 프로세스는 제한이 있을 때 모든 오프셋을 처리하지 않을 수 있으며, 이로 인해 차이가 발생 endOffset latestOffset 합니다.
sources.numInputRows 이 원본에서 처리된 입력 행의 수입니다.
sources.inputRowsPerSecond 이 원본에서 처리하기 위해 데이터가 도착하는 속도입니다.
sources.processedRowsPerSecond Spark가 이 원본의 데이터를 처리하는 속도입니다.

sources.metrics 개체(Kafka)

메트릭 설명
sources.metrics.avgOffsetsBehindLatest 스트리밍 쿼리가 구독된 모든 항목 중에서 사용 가능한 최신 오프셋 뒤에 있는 평균 오프셋 수입니다.
sources.metrics.estimatedTotalBytesBehindLatest 구독된 토픽에서 쿼리 프로세스가 사용하지 않은 예상 바이트 수입니다.
sources.metrics.maxOffsetsBehindLatest 스트리밍 쿼리가 구독된 모든 항목 중에서 사용 가능한 최신 오프셋 뒤에 있는 최대 오프셋 수입니다.
sources.metrics.minOffsetsBehindLatest 스트리밍 쿼리가 구독된 모든 항목 중에서 사용 가능한 최신 오프셋 뒤에 있는 최소 오프셋 수입니다.

sink 개체(Kafka)

메트릭 설명
sink.description 스트리밍 쿼리가 작성 중인 Kafka 싱크에 대한 설명으로, 사용 중인 특정 Kafka 싱크 구현을 자세히 설명합니다. 예: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”
sink.numOutputRows 마이크로배치의 일부로 출력 테이블 또는 싱크에 기록된 행 수입니다. 경우에 따라 이 값은 "-1"일 수 있으며 일반적으로 "알 수 없음"으로 해석될 수 있습니다.

sources 개체(Delta Lake)

메트릭 설명
sources.description 스트리밍 쿼리가 읽는 원본에 대한 설명입니다. 예: “DeltaSource[table]”
sources.[startOffset/endOffset].sourceVersion 이 오프셋이 인코딩되는 serialization의 버전입니다.
sources.[startOffset/endOffset].reservoirId 읽는 테이블의 ID입니다. 쿼리를 다시 시작할 때 잘못된 구성을 검색하는 데 사용됩니다.
sources.[startOffset/endOffset].reservoirVersion 현재 처리 중인 테이블의 버전입니다.
sources.[startOffset/endOffset].index 이 버전의 시퀀스에 있는 인덱스입니다 AddFiles . 이는 큰 커밋을 여러 일괄 처리로 분리하는 데 사용됩니다. 이 인덱스는 정렬하여 modificationTimestamp path만들어집니다.
sources.[startOffset/endOffset].isStartingVersion 초기 데이터가 처리된 후 발생한 변경 내용을 처리하는 대신 현재 오프셋이 새 스트리밍 쿼리의 시작을 표시하는지 여부를 식별합니다. 새 쿼리를 시작할 때 시작 시 테이블에 있는 모든 데이터가 먼저 처리된 다음 도착하는 모든 새 데이터가 처리됩니다.
sources.latestOffset 마이크로배치 쿼리에서 처리된 최신 오프셋입니다.
sources.numInputRows 이 원본에서 처리된 입력 행의 수입니다.
sources.inputRowsPerSecond 이 원본에서 처리하기 위해 데이터가 도착하는 속도입니다.
sources.processedRowsPerSecond Spark가 이 원본의 데이터를 처리하는 속도입니다.
sources.metrics.numBytesOutstanding 미해결 파일(RocksDB에서 추적한 파일)의 결합된 크기입니다. 이는 스트리밍 원본으로 델타 및 자동 로더에 대한 백로그 메트릭입니다.
sources.metrics.numFilesOutstanding 처리할 미해결 파일 수입니다. 이는 스트리밍 원본으로 델타 및 자동 로더에 대한 백로그 메트릭입니다.

sink 개체(Delta Lake)

메트릭 설명
sink.description 사용되는 특정 델타 싱크 구현을 자세히 설명하는 델타 싱크에 대한 설명입니다. 예: “DeltaSink[table]”
sink.numOutputRows Spark는 Delta Lake 싱크에 대한 분류인 DSv1 싱크의 출력 행을 유추할 수 없으므로 행 수는 항상 "-1"입니다.

예제

Kafka-to-Kafka StreamingQueryListener 이벤트 예제

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

예제 Delta Lake-To-Delta Lake StreamingQueryListener 이벤트

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Kinesis-Delta Lake StreamingQueryListener 이벤트 예제

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Kafka+Delta Lake-To-Delta Lake StreamingQueryListener 이벤트 예제

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Delta Lake StreamingQueryListener 이벤트에 대한 예제 속도 원본

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}