다음을 통해 공유


작업, Lakeflow Spark 선언적 파이프라인 및 Lakeflow Connect에 대한 Azure Databricks의 관찰 가능성

스트리밍 애플리케이션의 성능, 비용 및 상태를 모니터링하는 것은 안정적이고 효율적인 ETL 파이프라인을 빌드하는 데 필수적입니다. Azure Databricks는 작업, Lakeflow Spark 선언적 파이프라인 및 Lakeflow Connect 전반에 걸쳐 다양한 관측 기능을 제공하여 병목 현상을 진단하고 성능을 최적화하며 리소스 사용 및 비용을 관리하는 데 도움이 됩니다.

이 문서에서는 다음 영역의 모범 사례를 간략하게 설명합니다.

  • 주요 스트리밍 성능 메트릭
  • 이벤트 로그 스키마 및 예제 쿼리
  • 스트리밍 쿼리 모니터링
  • 시스템 테이블을 사용한 비용 관찰
  • 로그 및 메트릭을 외부 도구로 내보내기

스트리밍 관찰성을 위한 주요 메트릭

스트리밍 파이프라인을 운영하는 경우 다음 주요 메트릭을 모니터링합니다.

Metric Purpose
Backpressure 파일 및 오프셋(크기)의 수를 모니터링합니다. 병목 상태를 식별하고 시스템이 뒤처지지 않고 들어오는 데이터를 처리할 수 있도록 합니다.
Throughput 마이크로 일괄 처리당 처리되는 메시지 수를 추적합니다. 파이프라인 효율성을 평가하고 데이터 수집과 보조를 맞추는지 확인합니다.
Duration 마이크로 일괄 처리의 평균 기간을 측정합니다. 처리 속도를 나타내며 일괄 처리 간격을 조정하는 데 도움이 됩니다.
Latency 시간에 따라 처리되는 레코드/메시지 수를 나타냅니다. 엔드 투 엔드 파이프라인 지연을 이해하고 짧은 대기 시간을 최적화하는 데 도움이 됩니다.
클러스터 사용률 CPU 및 메모리 사용량(%)을 반영합니다. 효율적인 리소스 사용을 보장하고 처리 요구를 충족하도록 클러스터 크기를 조정하는 데 도움이 됩니다.
Network 전송 및 수신된 데이터를 측정합니다. 네트워크 병목 상태를 식별하고 데이터 전송 성능을 향상시키는 데 유용합니다.
Checkpoint 처리된 데이터 및 오프셋을 식별합니다. 일관성을 보장하고 오류 발생 시 내결함성을 활성화합니다.
Cost 스트리밍 애플리케이션의 시간별, 일별 및 월별 비용을 표시합니다. 예산 및 리소스 최적화에 도움이 됩니다.
Lineage 스트리밍 애플리케이션에서 만든 데이터 세트 및 계층을 표시합니다. 데이터 변환, 추적, 품질 보증 및 디버깅을 용이하게 합니다.

클러스터 로그 및 메트릭

Azure Databricks 클러스터 로그 및 메트릭은 클러스터 성능 및 사용률에 대한 자세한 인사이트를 제공합니다. 이러한 로그 및 메트릭에는 CPU, 메모리, 디스크 I/O, 네트워크 트래픽 및 기타 시스템 메트릭에 대한 정보가 포함됩니다. 이러한 메트릭을 모니터링하는 것은 클러스터 성능을 최적화하고, 리소스를 효율적으로 관리하고, 문제를 해결하는 데 중요합니다.

Azure Databricks 클러스터 로그 및 메트릭은 클러스터 성능 및 리소스 사용률에 대한 자세한 인사이트를 제공합니다. 여기에는 CPU 및 메모리 사용량, 디스크 I/O 및 네트워크 트래픽이 포함됩니다. 이러한 메트릭을 모니터링하는 것은 다음을 위해 중요합니다.

  • 클러스터 성능 최적화
  • 리소스를 효율적으로 관리합니다.
  • 운영 문제 해결

메트릭은 Databricks UI를 통해 활용하거나 개인 모니터링 도구로 내보낼 수 있습니다. Notebook 예제: Datadog 메트릭을 참조하세요.

Spark UI 인터페이스

Spark UI는 완료, 보류 중 및 실패한 작업 수를 포함하여 작업 및 단계의 진행 상황에 대한 자세한 정보를 표시합니다. 이를 통해 실행 흐름을 이해하고 병목 상태를 식별할 수 있습니다.

스트리밍 애플리케이션의 경우 스트리밍 탭 에는 입력 속도, 처리 속도 및 일괄 처리 기간과 같은 메트릭이 표시됩니다. 스트리밍 작업의 성능을 모니터링하고 데이터 수집 또는 처리 문제를 식별하는 데 도움이 됩니다.

자세한 내용은 Spark UI를 사용한 디버깅 을 참조하세요.

컴퓨팅 메트릭

컴퓨팅 메트릭은 클러스터 사용률을 이해하는 데 도움이 됩니다. 작업이 실행되면 크기 조정 방법 및 리소스에 미치는 영향을 확인할 수 있습니다. 메모리 압력으로 인해 OOM 오류가 발생하거나 CPU 압력으로 인해 긴 지연이 발생할 수 있는 상황을 찾아낼 수 있습니다. 표시되는 특정 메트릭은 다음과 같습니다.

  • 서버 부하 분산: 지난 1분 동안의 각 노드의 CPU 사용률입니다.
  • CPU 사용률: CPU가 다양한 모드(예: 사용자, 시스템, 유휴 및 iowait)에서 소요된 시간의 백분율입니다.
  • 메모리 사용률: 각 모드별 총 메모리 사용량(예: 사용, 무료, 버퍼 및 캐시됨).
  • 메모리 교환 사용률: 총 메모리 교환 사용량입니다.
  • 무료 파일 시스템 공간: 각 탑재 지점별 총 파일 시스템 사용량입니다.
  • 네트워크 처리량: 각 디바이스에서 네트워크를 통해 수신 및 전송되는 바이트 수입니다.
  • 활성 노드 수: 지정된 컴퓨팅에 대한 모든 타임스탬프의 활성 노드 수입니다.

자세한 내용은 모니터 성능하드웨어 메트릭 차트 를 참조하세요.

시스템 테이블

비용 모니터링

Azure Databricks 시스템 테이블은 작업 비용 및 성능을 모니터링하는 구조화된 접근 방식을 제공합니다. 다음 테이블이 포함됩니다.

  • 작업 실행 세부 정보입니다.
  • 리소스 사용률입니다.
  • 관련 비용.

이러한 테이블을 사용하여 운영 상태 및 재무 영향을 파악합니다.

Requirements

비용 모니터링에 시스템 테이블을 사용하려면 다음을 수행합니다.

  • 계정 관리자는 system.lakeflow schema을(를) 사용하도록 설정해야 합니다.
  • 사용자는 다음 중 하나를 수행해야 합니다.
    • metastore 관리자이면서 계정 관리자이거나, 또는
    • 시스템 스키마에 대한 USESELECT 권한이 있습니다.

예제 쿼리: 가장 비용이 많이 드는 작업(지난 30일)

이 쿼리는 지난 30일 동안 가장 비용이 많이 드는 작업을 식별하여 비용 분석 및 최적화를 지원합니다.

WITH list_cost_per_job AS (
     SELECT
       t1.workspace_id,
       t1.usage_metadata.job_id,
       COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
       SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
       FIRST(identity_metadata.run_as, true) AS run_as,
       FIRST(t1.custom_tags, true) AS custom_tags,
       MAX(t1.usage_end_time) AS last_seen_date
     FROM system.billing.usage t1
     INNER JOIN system.billing.list_prices list_prices ON
       t1.cloud = list_prices.cloud AND
       t1.sku_name = list_prices.sku_name AND
       t1.usage_start_time >= list_prices.price_start_time AND
       (t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
     WHERE
       t1.billing_origin_product = "JOBS"
       AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
     GROUP BY ALL
   ),
   most_recent_jobs AS (
     SELECT
       *,
       ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
     FROM
       system.lakeflow.jobs QUALIFY rn=1
   )
   SELECT
     t2.name,
     t1.job_id,
     t1.workspace_id,
     t1.runs,
     t1.run_as,
     SUM(list_cost) AS list_cost,
     t1.last_seen_date
   FROM list_cost_per_job t1
   LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
   GROUP BY ALL
   ORDER BY list_cost DESC

Lakeflow Spark 선언적 파이프라인

Lakeflow Spark 선언적 파이프라인 이벤트 로그는 다음을 비롯한 모든 파이프라인 이벤트의 포괄적인 레코드를 캡처합니다.

  • 감사 로그.
  • 데이터 품질 검사.
  • 파이프라인 진행률입니다.
  • 데이터 계보.

이벤트 로그는 모든 Lakeflow Spark 선언적 파이프라인에 대해 자동으로 사용하도록 설정되며 다음을 통해 액세스할 수 있습니다.

  • 파이프라인 UI: 로그를 직접 봅니다.
  • 파이프라인 API: 프로그래밍 방식 액세스.
  • 직접 쿼리: 이벤트 로그 테이블을 쿼리합니다.

자세한 내용은 Lakeflow Spark 선언적 파이프라인에 대한 이벤트 로그 스키마를 참조하세요.

예제 쿼리

이러한 예제 쿼리는 일괄 처리 기간, 처리량, 백프레서 및 리소스 사용률과 같은 주요 메트릭을 제공하여 파이프라인의 성능 및 상태를 모니터링하는 데 도움이 됩니다.

평균 일괄 처리 기간

이 쿼리는 파이프라인에서 처리되는 일괄 처리의 평균 기간을 계산합니다.

SELECT
  (max_t - min_t) / batch_count as avg_batch_duration_seconds,
  batch_count,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      count(*) as batch_count,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

평균 처리량

이 쿼리는 초당 처리된 행의 측면에서 파이프라인의 평균 처리량을 계산합니다.

SELECT
  (max_t - min_t) / total_rows as avg_throughput_rps,
  total_rows,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      sum(
        details:flow_progress:metrics:num_output_rows
      ) as total_rows,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Backpressure

이 쿼리는 데이터 백로그를 확인하여 파이프라인의 백프레이를 측정합니다.

SELECT
  timestamp,
  DOUBLE(
    details:flow_progress:metrics:backlog_bytes
  ) AS backlog_bytes,
  DOUBLE(
    details:flow_progress:metrics:backlog_files
  ) AS backlog_files
FROM
  event_log
WHERE
  event_type = 'flow_progress'

클러스터 및 슬롯 사용률

이 쿼리는 파이프라인에서 사용하는 클러스터 또는 슬롯의 사용률에 대한 인사이트를 제공합니다.

SELECT
  date_trunc("hour", timestamp) AS hour,
  AVG (
    DOUBLE (
      details:cluster_resources:num_task_slots
    )
  ) AS num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_task_slots
    )
  ) AS avg_num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:num_executors
    )
  ) AS num_executors,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_task_slot_utilization
    )
  ) AS avg_utilization,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_queued_tasks
    )
  ) AS queue_size
FROM
  event_log
WHERE
  details : cluster_resources : avg_num_queued_tasks IS NOT NULL
  AND origin.update_id = '${latest_update_id}'
GROUP BY
  1;

Jobs

스트리밍 쿼리 수신기를 통해 작업에서 스트리밍 쿼리를 모니터링할 수 있습니다.

Spark 세션에 수신기를 연결하여Azure Databricks에서 스트리밍 쿼리 수신기를 사용하도록 설정합니다. 이 수신기는 스트리밍 쿼리의 진행률 및 메트릭을 모니터링합니다. 메트릭을 외부 모니터링 도구에 푸시하거나 추가 분석을 위해 기록하는 데 사용할 수 있습니다.

예: 외부 모니터링 도구로 메트릭 내보내기

Note

Python 및 Scala용 Databricks Runtime 11.3 LTS 이상에서 사용할 수 있습니다.

인터페이스를 사용하여 StreamingQueryListener 경고 또는 대시보드를 위해 스트리밍 메트릭을 외부 서비스로 내보낼 수 있습니다.

다음은 수신기를 구현하는 방법의 기본 예제입니다.

from pyspark.sql.streaming import StreamingQueryListener

class MyListener(StreamingQueryListener):
   def onQueryStarted(self, event):
       print("Query started: ", event.id)

   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress)

   def onQueryTerminated(self, event):
       print("Query terminated: ", event.id)

spark.streams.addListener(MyListener())

예: Azure Databricks 내에서 쿼리 수신기 사용

다음은 Kafka-Delta Lake 스트리밍 쿼리에 대한 StreamingQueryListener 이벤트 로그의 예입니다.

{
  "id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
  "runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
  "timestamp": "2024-05-15T21:57:50.782Z",
  "batchId": 0,
  "batchDuration": 3601,
  "numInputRows": 20,
  "inputRowsPerSecond": 0.0,
  "processedRowsPerSecond": 5.55401277422938,
  "durationMs": {
    "addBatch": 1544,
    "commitBatch": 686,
    "commitOffsets": 27,
    "getBatch": 12,
    "latestOffset": 577,
    "queryPlanning": 105,
    "triggerExecution": 3600,
    "walCommit": 34
  },
  "stateOperators": [
    {
      "operatorName": "symmetricHashJoin",
      "numRowsTotal": 20,
      "numRowsUpdated": 20,
      "allUpdatesTimeMs": 473,
      "numRowsRemoved": 0,
      "allRemovalsTimeMs": 0,
      "commitTimeMs": 277,
      "memoryUsedBytes": 13120,
      "numRowsDroppedByWatermark": 0,
      "numShufflePartitions": 5,
      "numStateStoreInstances": 20,
      "customMetrics": {
        "loadedMapCacheHitCount": 0,
        "loadedMapCacheMissCount": 0,
        "stateOnCurrentVersionSizeBytes": 5280
      }
    }
  ],
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic-1]]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "avgOffsetsBehindLatest": "0.0",
        "estimatedTotalBytesBehindLatest": "0.0",
        "maxOffsetsBehindLatest": "0",
        "minOffsetsBehindLatest": "0"
      }
    },
    {
      "description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "numBytesOutstanding": "0",
        "numFilesOutstanding": "0"
      }
    }
  ]
}

자세한 예제는 예제를 참조하세요.

쿼리 진행률 메트릭

쿼리 진행률 메트릭은 스트리밍 쿼리의 성능 및 상태를 모니터링하는 데 필수적입니다. 이러한 메트릭에는 입력 행 수, 처리 속도 및 쿼리 실행과 관련된 다양한 기간이 포함됩니다. Spark 세션에 StreamingQueryListener를 부착하여 이러한 메트릭을 관찰할 수 있습니다. 수신기는 각 스트리밍 Epoch의 끝에 이러한 메트릭을 포함하는 이벤트를 내보냅니다.

예를 들어, 수신기의 StreamingQueryProgress.observedMetrics 메서드에서 onQueryProgress 맵을 사용하여 메트릭에 액세스할 수 있습니다. 이를 통해 스트리밍 쿼리의 성능을 실시간으로 추적하고 분석할 수 있습니다.

class MyListener(StreamingQueryListener):
   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress.observedMetrics)