스트리밍 애플리케이션의 성능, 비용 및 상태를 모니터링하는 것은 안정적이고 효율적인 ETL 파이프라인을 빌드하는 데 필수적입니다. Azure Databricks는 작업, Lakeflow Spark 선언적 파이프라인 및 Lakeflow Connect 전반에 걸쳐 다양한 관측 기능을 제공하여 병목 현상을 진단하고 성능을 최적화하며 리소스 사용 및 비용을 관리하는 데 도움이 됩니다.
이 문서에서는 다음 영역의 모범 사례를 간략하게 설명합니다.
- 주요 스트리밍 성능 메트릭
- 이벤트 로그 스키마 및 예제 쿼리
- 스트리밍 쿼리 모니터링
- 시스템 테이블을 사용한 비용 관찰
- 로그 및 메트릭을 외부 도구로 내보내기
스트리밍 관찰성을 위한 주요 메트릭
스트리밍 파이프라인을 운영하는 경우 다음 주요 메트릭을 모니터링합니다.
| Metric | Purpose |
|---|---|
| Backpressure | 파일 및 오프셋(크기)의 수를 모니터링합니다. 병목 상태를 식별하고 시스템이 뒤처지지 않고 들어오는 데이터를 처리할 수 있도록 합니다. |
| Throughput | 마이크로 일괄 처리당 처리되는 메시지 수를 추적합니다. 파이프라인 효율성을 평가하고 데이터 수집과 보조를 맞추는지 확인합니다. |
| Duration | 마이크로 일괄 처리의 평균 기간을 측정합니다. 처리 속도를 나타내며 일괄 처리 간격을 조정하는 데 도움이 됩니다. |
| Latency | 시간에 따라 처리되는 레코드/메시지 수를 나타냅니다. 엔드 투 엔드 파이프라인 지연을 이해하고 짧은 대기 시간을 최적화하는 데 도움이 됩니다. |
| 클러스터 사용률 | CPU 및 메모리 사용량(%)을 반영합니다. 효율적인 리소스 사용을 보장하고 처리 요구를 충족하도록 클러스터 크기를 조정하는 데 도움이 됩니다. |
| Network | 전송 및 수신된 데이터를 측정합니다. 네트워크 병목 상태를 식별하고 데이터 전송 성능을 향상시키는 데 유용합니다. |
| Checkpoint | 처리된 데이터 및 오프셋을 식별합니다. 일관성을 보장하고 오류 발생 시 내결함성을 활성화합니다. |
| Cost | 스트리밍 애플리케이션의 시간별, 일별 및 월별 비용을 표시합니다. 예산 및 리소스 최적화에 도움이 됩니다. |
| Lineage | 스트리밍 애플리케이션에서 만든 데이터 세트 및 계층을 표시합니다. 데이터 변환, 추적, 품질 보증 및 디버깅을 용이하게 합니다. |
클러스터 로그 및 메트릭
Azure Databricks 클러스터 로그 및 메트릭은 클러스터 성능 및 사용률에 대한 자세한 인사이트를 제공합니다. 이러한 로그 및 메트릭에는 CPU, 메모리, 디스크 I/O, 네트워크 트래픽 및 기타 시스템 메트릭에 대한 정보가 포함됩니다. 이러한 메트릭을 모니터링하는 것은 클러스터 성능을 최적화하고, 리소스를 효율적으로 관리하고, 문제를 해결하는 데 중요합니다.
Azure Databricks 클러스터 로그 및 메트릭은 클러스터 성능 및 리소스 사용률에 대한 자세한 인사이트를 제공합니다. 여기에는 CPU 및 메모리 사용량, 디스크 I/O 및 네트워크 트래픽이 포함됩니다. 이러한 메트릭을 모니터링하는 것은 다음을 위해 중요합니다.
- 클러스터 성능 최적화
- 리소스를 효율적으로 관리합니다.
- 운영 문제 해결
메트릭은 Databricks UI를 통해 활용하거나 개인 모니터링 도구로 내보낼 수 있습니다. Notebook 예제: Datadog 메트릭을 참조하세요.
Spark UI 인터페이스
Spark UI는 완료, 보류 중 및 실패한 작업 수를 포함하여 작업 및 단계의 진행 상황에 대한 자세한 정보를 표시합니다. 이를 통해 실행 흐름을 이해하고 병목 상태를 식별할 수 있습니다.
스트리밍 애플리케이션의 경우 스트리밍 탭 에는 입력 속도, 처리 속도 및 일괄 처리 기간과 같은 메트릭이 표시됩니다. 스트리밍 작업의 성능을 모니터링하고 데이터 수집 또는 처리 문제를 식별하는 데 도움이 됩니다.
자세한 내용은 Spark UI를 사용한 디버깅 을 참조하세요.
컴퓨팅 메트릭
컴퓨팅 메트릭은 클러스터 사용률을 이해하는 데 도움이 됩니다. 작업이 실행되면 크기 조정 방법 및 리소스에 미치는 영향을 확인할 수 있습니다. 메모리 압력으로 인해 OOM 오류가 발생하거나 CPU 압력으로 인해 긴 지연이 발생할 수 있는 상황을 찾아낼 수 있습니다. 표시되는 특정 메트릭은 다음과 같습니다.
- 서버 부하 분산: 지난 1분 동안의 각 노드의 CPU 사용률입니다.
- CPU 사용률: CPU가 다양한 모드(예: 사용자, 시스템, 유휴 및 iowait)에서 소요된 시간의 백분율입니다.
- 메모리 사용률: 각 모드별 총 메모리 사용량(예: 사용, 무료, 버퍼 및 캐시됨).
- 메모리 교환 사용률: 총 메모리 교환 사용량입니다.
- 무료 파일 시스템 공간: 각 탑재 지점별 총 파일 시스템 사용량입니다.
- 네트워크 처리량: 각 디바이스에서 네트워크를 통해 수신 및 전송되는 바이트 수입니다.
- 활성 노드 수: 지정된 컴퓨팅에 대한 모든 타임스탬프의 활성 노드 수입니다.
자세한 내용은 모니터 성능 및 하드웨어 메트릭 차트 를 참조하세요.
시스템 테이블
비용 모니터링
Azure Databricks 시스템 테이블은 작업 비용 및 성능을 모니터링하는 구조화된 접근 방식을 제공합니다. 다음 테이블이 포함됩니다.
- 작업 실행 세부 정보입니다.
- 리소스 사용률입니다.
- 관련 비용.
이러한 테이블을 사용하여 운영 상태 및 재무 영향을 파악합니다.
Requirements
비용 모니터링에 시스템 테이블을 사용하려면 다음을 수행합니다.
- 계정 관리자는
system.lakeflow schema을(를) 사용하도록 설정해야 합니다. - 사용자는 다음 중 하나를 수행해야 합니다.
- metastore 관리자이면서 계정 관리자이거나, 또는
- 시스템 스키마에 대한
USE및SELECT권한이 있습니다.
예제 쿼리: 가장 비용이 많이 드는 작업(지난 30일)
이 쿼리는 지난 30일 동안 가장 비용이 많이 드는 작업을 식별하여 비용 분석 및 최적화를 지원합니다.
WITH list_cost_per_job AS (
SELECT
t1.workspace_id,
t1.usage_metadata.job_id,
COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
FIRST(identity_metadata.run_as, true) AS run_as,
FIRST(t1.custom_tags, true) AS custom_tags,
MAX(t1.usage_end_time) AS last_seen_date
FROM system.billing.usage t1
INNER JOIN system.billing.list_prices list_prices ON
t1.cloud = list_prices.cloud AND
t1.sku_name = list_prices.sku_name AND
t1.usage_start_time >= list_prices.price_start_time AND
(t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
WHERE
t1.billing_origin_product = "JOBS"
AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
GROUP BY ALL
),
most_recent_jobs AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
FROM
system.lakeflow.jobs QUALIFY rn=1
)
SELECT
t2.name,
t1.job_id,
t1.workspace_id,
t1.runs,
t1.run_as,
SUM(list_cost) AS list_cost,
t1.last_seen_date
FROM list_cost_per_job t1
LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY list_cost DESC
Lakeflow Spark 선언적 파이프라인
Lakeflow Spark 선언적 파이프라인 이벤트 로그는 다음을 비롯한 모든 파이프라인 이벤트의 포괄적인 레코드를 캡처합니다.
- 감사 로그.
- 데이터 품질 검사.
- 파이프라인 진행률입니다.
- 데이터 계보.
이벤트 로그는 모든 Lakeflow Spark 선언적 파이프라인에 대해 자동으로 사용하도록 설정되며 다음을 통해 액세스할 수 있습니다.
- 파이프라인 UI: 로그를 직접 봅니다.
- 파이프라인 API: 프로그래밍 방식 액세스.
- 직접 쿼리: 이벤트 로그 테이블을 쿼리합니다.
자세한 내용은 Lakeflow Spark 선언적 파이프라인에 대한 이벤트 로그 스키마를 참조하세요.
예제 쿼리
이러한 예제 쿼리는 일괄 처리 기간, 처리량, 백프레서 및 리소스 사용률과 같은 주요 메트릭을 제공하여 파이프라인의 성능 및 상태를 모니터링하는 데 도움이 됩니다.
평균 일괄 처리 기간
이 쿼리는 파이프라인에서 처리되는 일괄 처리의 평균 기간을 계산합니다.
SELECT
(max_t - min_t) / batch_count as avg_batch_duration_seconds,
batch_count,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
count(*) as batch_count,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
평균 처리량
이 쿼리는 초당 처리된 행의 측면에서 파이프라인의 평균 처리량을 계산합니다.
SELECT
(max_t - min_t) / total_rows as avg_throughput_rps,
total_rows,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
sum(
details:flow_progress:metrics:num_output_rows
) as total_rows,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Backpressure
이 쿼리는 데이터 백로그를 확인하여 파이프라인의 백프레이를 측정합니다.
SELECT
timestamp,
DOUBLE(
details:flow_progress:metrics:backlog_bytes
) AS backlog_bytes,
DOUBLE(
details:flow_progress:metrics:backlog_files
) AS backlog_files
FROM
event_log
WHERE
event_type = 'flow_progress'
클러스터 및 슬롯 사용률
이 쿼리는 파이프라인에서 사용하는 클러스터 또는 슬롯의 사용률에 대한 인사이트를 제공합니다.
SELECT
date_trunc("hour", timestamp) AS hour,
AVG (
DOUBLE (
details:cluster_resources:num_task_slots
)
) AS num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:avg_num_task_slots
)
) AS avg_num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:num_executors
)
) AS num_executors,
AVG (
DOUBLE (
details:cluster_resources:avg_task_slot_utilization
)
) AS avg_utilization,
AVG (
DOUBLE (
details:cluster_resources:avg_num_queued_tasks
)
) AS queue_size
FROM
event_log
WHERE
details : cluster_resources : avg_num_queued_tasks IS NOT NULL
AND origin.update_id = '${latest_update_id}'
GROUP BY
1;
Jobs
스트리밍 쿼리 수신기를 통해 작업에서 스트리밍 쿼리를 모니터링할 수 있습니다.
Spark 세션에 수신기를 연결하여Azure Databricks에서 스트리밍 쿼리 수신기를 사용하도록 설정합니다. 이 수신기는 스트리밍 쿼리의 진행률 및 메트릭을 모니터링합니다. 메트릭을 외부 모니터링 도구에 푸시하거나 추가 분석을 위해 기록하는 데 사용할 수 있습니다.
예: 외부 모니터링 도구로 메트릭 내보내기
Note
Python 및 Scala용 Databricks Runtime 11.3 LTS 이상에서 사용할 수 있습니다.
인터페이스를 사용하여 StreamingQueryListener 경고 또는 대시보드를 위해 스트리밍 메트릭을 외부 서비스로 내보낼 수 있습니다.
다음은 수신기를 구현하는 방법의 기본 예제입니다.
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: ", event.id)
def onQueryProgress(self, event):
print("Query made progress: ", event.progress)
def onQueryTerminated(self, event):
print("Query terminated: ", event.id)
spark.streams.addListener(MyListener())
예: Azure Databricks 내에서 쿼리 수신기 사용
다음은 Kafka-Delta Lake 스트리밍 쿼리에 대한 StreamingQueryListener 이벤트 로그의 예입니다.
{
"id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"timestamp": "2024-05-15T21:57:50.782Z",
"batchId": 0,
"batchDuration": 3601,
"numInputRows": 20,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 5.55401277422938,
"durationMs": {
"addBatch": 1544,
"commitBatch": 686,
"commitOffsets": 27,
"getBatch": 12,
"latestOffset": 577,
"queryPlanning": 105,
"triggerExecution": 3600,
"walCommit": 34
},
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 20,
"numRowsUpdated": 20,
"allUpdatesTimeMs": 473,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 277,
"memoryUsedBytes": 13120,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 5,
"numStateStoreInstances": 20,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"stateOnCurrentVersionSizeBytes": 5280
}
}
],
"sources": [
{
"description": "KafkaV2[Subscribe[topic-1]]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
},
{
"description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
]
}
자세한 예제는 예제를 참조하세요.
쿼리 진행률 메트릭
쿼리 진행률 메트릭은 스트리밍 쿼리의 성능 및 상태를 모니터링하는 데 필수적입니다. 이러한 메트릭에는 입력 행 수, 처리 속도 및 쿼리 실행과 관련된 다양한 기간이 포함됩니다. Spark 세션에 StreamingQueryListener를 부착하여 이러한 메트릭을 관찰할 수 있습니다. 수신기는 각 스트리밍 Epoch의 끝에 이러한 메트릭을 포함하는 이벤트를 내보냅니다.
예를 들어, 수신기의 StreamingQueryProgress.observedMetrics 메서드에서 onQueryProgress 맵을 사용하여 메트릭에 액세스할 수 있습니다. 이를 통해 스트리밍 쿼리의 성능을 실시간으로 추적하고 분석할 수 있습니다.
class MyListener(StreamingQueryListener):
def onQueryProgress(self, event):
print("Query made progress: ", event.progress.observedMetrics)