Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Мониторинг производительности, затрат и работоспособности приложений потоковой передачи необходим для создания надежных, эффективных конвейеров ETL. Azure Databricks предоставляет широкий набор функций наблюдаемости в заданиях, Декларативных конвейерах Lakeflow Spark и Lakeflow Connect для диагностики узких мест, оптимизации производительности и управления использованием ресурсов и затратами.
В этой статье описаны рекомендации в следующих областях:
- Ключевые метрики производительности потоковой передачи
- Схемы журналов событий и примеры запросов
- Мониторинг потоковой обработки запросов
- Наблюдаемость затрат с помощью системных таблиц
- Экспорт журналов и метрик во внешние средства
Ключевые метрики для отслеживания потоковой передачи
При работе с конвейерами потоковой передачи отслеживайте следующие ключевые метрики:
| Metric | Purpose |
|---|---|
| Backpressure | Отслеживает количество файлов и смещения (размеров). Помогает выявлять узкие места и гарантирует, что система может обрабатывать входящие данные без отставания. |
| Throughput | Отслеживает количество сообщений, обработанных на микропакет. Оцените эффективность конвейера и убедитесь, что он удерживает темп с поглощением данных. |
| Duration | Измеряет среднюю длительность микропартии. Указывает скорость обработки и помогает настроить интервалы пакетной обработки. |
| Latency | Указывает, сколько записей и сообщений обрабатывается с течением времени. Помогает понять сквозные задержки конвейера и оптимизировать их для снижения задержки. |
| Использование кластера | Отражает использование ЦП и памяти (%). Обеспечивает эффективное использование ресурсов и помогает масштабировать кластеры для удовлетворения требований к обработке. |
| Network | Измерение переданных и полученных данных. Полезно для выявления узких мест сети и повышения производительности передачи данных. |
| Checkpoint | Определяет обработанные данные и смещения. Обеспечивает согласованность и обеспечивает отказоустойчивость во время сбоев. |
| Cost | Отображает почасовые, ежедневные и ежемесячные затраты на приложение потоковой передачи. Помогает в оптимизации бюджета и ресурсов. |
| Lineage | Отображает наборы данных и слои, созданные в приложении потоковой передачи. Упрощает преобразование данных, отслеживание, обеспечение качества и отладку. |
Журналы и метрики кластера
Журналы и метрики кластера Azure Databricks предоставляют подробные сведения о производительности и использовании кластера. Эти журналы и метрики включают сведения о ЦП, памяти, операций ввода-вывода диска, сетевом трафике и других системных метрик. Мониторинг этих метрик имеет решающее значение для оптимизации производительности кластера, эффективного управления ресурсами и устранения неполадок.
Журналы и метрики кластера Azure Databricks предоставляют подробные сведения о производительности кластера и использовании ресурсов. К ним относятся использование ЦП и памяти, операции ввода-вывода диска и сетевой трафик. Мониторинг этих метрик имеет решающее значение для:
- Оптимизация производительности кластера.
- Эффективное управление ресурсами.
- Устранение неполадок в работе.
Метрики можно использовать с помощью пользовательского интерфейса Databricks или экспортировать в личные средства мониторинга. См . пример записной книжки: метрики Datadog.
Пользовательский интерфейс Spark
Пользовательский интерфейс Spark содержит подробные сведения о ходе выполнения заданий и этапов, включая количество задач, завершенных, ожидающих и неудачных. Это помогает понять поток выполнения и определить узкие места.
Для приложений потоковой передачи на вкладке "Потоковая передача " отображаются такие метрики, как скорость ввода, скорость обработки и длительность пакетной обработки. Он помогает отслеживать производительность заданий потоковой передачи и выявлять проблемы приема или обработки данных.
Дополнительные сведения см. в разделе "Отладка с помощью пользовательского интерфейса Spark ".
Метрики вычислений
Метрики вычислений помогут вам понять использование кластера. По мере выполнения задания вы можете увидеть, как оно масштабируется и как на него влияют ваши ресурсы. Вы сможете найти давление памяти, которое может привести к сбоям OOM или нагрузке ЦП, которые могут привести к длительным задержкам. Ниже приведены конкретные метрики, которые вы увидите:
- Распределение нагрузки сервера: загрузка ЦП каждого узла за последние минуты.
- Использование ЦП: процент времени, затраченного ЦП в различных режимах (например, пользователь, система, бездействие и айовайт).
- Использование памяти: общее использование памяти в каждом режиме (например, используется, свободно, буфер и кэширован).
- Использование буфера памяти: общее использование буфера памяти.
- Свободное пространство файловой системы: общее использование файловой системы по каждой точке подключения.
- Пропускная способность сети: количество полученных и передаваемых по сети байтов каждым устройством.
- Количество активных узлов: количество активных узлов при каждом метке времени для заданного вычисления.
Дополнительные сведения см. в разделе "Мониторинг производительности и аппаратных метрик ".
Системные таблицы
Мониторинг затрат
Системные таблицы Azure Databricks предоставляют структурированный подход для мониторинга затрат и производительности заданий. Эти таблицы включают:
- Сведения о выполнении задания.
- Использование ресурсов.
- Связанные затраты.
Используйте эти таблицы для понимания работоспособности операций и финансового влияния.
Requirements
Чтобы использовать системные таблицы для мониторинга затрат:
- Администратор учетной записи должен включить
system.lakeflow schema. - Пользователи должны либо:
- Быть администратором хранилища метаданных и администратором учетной записи или
- Иметь разрешения
USEиSELECTна системные схемы.
Пример запроса: самые дорогие задания (последние 30 дней)
Этот запрос определяет самые дорогие задания за последние 30 дней, помогая в анализе затрат и оптимизации.
WITH list_cost_per_job AS (
SELECT
t1.workspace_id,
t1.usage_metadata.job_id,
COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
FIRST(identity_metadata.run_as, true) AS run_as,
FIRST(t1.custom_tags, true) AS custom_tags,
MAX(t1.usage_end_time) AS last_seen_date
FROM system.billing.usage t1
INNER JOIN system.billing.list_prices list_prices ON
t1.cloud = list_prices.cloud AND
t1.sku_name = list_prices.sku_name AND
t1.usage_start_time >= list_prices.price_start_time AND
(t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
WHERE
t1.billing_origin_product = "JOBS"
AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
GROUP BY ALL
),
most_recent_jobs AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
FROM
system.lakeflow.jobs QUALIFY rn=1
)
SELECT
t2.name,
t1.job_id,
t1.workspace_id,
t1.runs,
t1.run_as,
SUM(list_cost) AS list_cost,
t1.last_seen_date
FROM list_cost_per_job t1
LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY list_cost DESC
Декларативные конвейеры Lakeflow Spark
Журнал событий Lakeflow Spark Declarative Pipelines фиксирует полную запись всех событий конвейера, в том числе:
- Журналы аудита.
- Проверки качества данных.
- Ход выполнения конвейера.
- Происхождение данных.
Журнал событий автоматически включен для всех декларативных конвейеров Spark Lakeflow и доступен через:
- Пользовательский интерфейс конвейера: просмотр журналов напрямую.
- API конвейеров: программный доступ.
- Прямой запрос: запрос таблицы журнала событий.
Дополнительные сведения см. в схеме журнала событий для декларативных конвейеров Spark Lakeflow.
Примеры запросов
В этих примерах запросы помогают отслеживать производительность и работоспособность конвейеров, предоставляя ключевые метрики, такие как длительность пакетов, пропускная способность, обратное давление и использование ресурсов систем.
Средняя длительность партии
Этот запрос вычисляет среднюю длительность пакетов, обработанных конвейером.
SELECT
(max_t - min_t) / batch_count as avg_batch_duration_seconds,
batch_count,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
count(*) as batch_count,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Средняя пропускная способность
Этот запрос вычисляет среднюю пропускную способность конвейера с точки зрения обработанных строк в секунду.
SELECT
(max_t - min_t) / total_rows as avg_throughput_rps,
total_rows,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
sum(
details:flow_progress:metrics:num_output_rows
) as total_rows,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Backpressure
Этот запрос измеряет обратное давление потока данных, проверяя накопление данных.
SELECT
timestamp,
DOUBLE(
details:flow_progress:metrics:backlog_bytes
) AS backlog_bytes,
DOUBLE(
details:flow_progress:metrics:backlog_files
) AS backlog_files
FROM
event_log
WHERE
event_type = 'flow_progress'
Использование кластеров и слотов
Этот запрос содержит аналитические сведения об использовании кластеров или слотов, используемых конвейером.
SELECT
date_trunc("hour", timestamp) AS hour,
AVG (
DOUBLE (
details:cluster_resources:num_task_slots
)
) AS num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:avg_num_task_slots
)
) AS avg_num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:num_executors
)
) AS num_executors,
AVG (
DOUBLE (
details:cluster_resources:avg_task_slot_utilization
)
) AS avg_utilization,
AVG (
DOUBLE (
details:cluster_resources:avg_num_queued_tasks
)
) AS queue_size
FROM
event_log
WHERE
details : cluster_resources : avg_num_queued_tasks IS NOT NULL
AND origin.update_id = '${latest_update_id}'
GROUP BY
1;
Jobs
Вы можете отслеживать потоковые запросы в заданиях с помощью прослушивателя потоковых запросов.
Подключите прослушиватель к сеансу Spark, чтобы включить прослушиватель потоковых запросов вAzure Databricks. Этот слушатель будет отслеживать ход и метрики потоковых запросов. Его можно использовать для отправки метрик во внешние средства мониторинга или их записи для дальнейшего анализа.
Пример. Экспорт метрик во внешние средства мониторинга
Note
Это доступно в Databricks Runtime 11.3 LTS и выше для Python и Scala.
Метрики потоковой передачи можно экспортировать во внешние службы для оповещения или мониторинга с помощью StreamingQueryListener интерфейса.
Ниже приведен базовый пример реализации прослушивателя:
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: ", event.id)
def onQueryProgress(self, event):
print("Query made progress: ", event.progress)
def onQueryTerminated(self, event):
print("Query terminated: ", event.id)
spark.streams.addListener(MyListener())
Пример. Использование прослушивателя запросов в Azure Databricks
Ниже приведен пример журнала событий StreamingQueryListener для запроса потоковой передачи Kafka в Delta Lake:
{
"id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"timestamp": "2024-05-15T21:57:50.782Z",
"batchId": 0,
"batchDuration": 3601,
"numInputRows": 20,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 5.55401277422938,
"durationMs": {
"addBatch": 1544,
"commitBatch": 686,
"commitOffsets": 27,
"getBatch": 12,
"latestOffset": 577,
"queryPlanning": 105,
"triggerExecution": 3600,
"walCommit": 34
},
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 20,
"numRowsUpdated": 20,
"allUpdatesTimeMs": 473,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 277,
"memoryUsedBytes": 13120,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 5,
"numStateStoreInstances": 20,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"stateOnCurrentVersionSizeBytes": 5280
}
}
],
"sources": [
{
"description": "KafkaV2[Subscribe[topic-1]]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
},
{
"description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
]
}
Дополнительные примеры см. в следующих примерах: Примеры.
Метрики хода выполнения запроса
Метрики хода выполнения запросов необходимы для мониторинга производительности и работоспособности потоковых запросов. Эти метрики включают количество входных строк, скорость обработки и различные продолжительности, связанные с выполнением запроса. Эти метрики можно наблюдать, прикрепив StreamingQueryListener к сеансу Spark. Прослушиватель будет выдавать события, содержащие эти метрики, в конце каждого этапа потоковой передачи.
Например, можно получить доступ к метрикам с помощью StreamingQueryProgress.observedMetrics карты в методе прослушивателя onQueryProgress . Это позволяет отслеживать и анализировать производительность потоковых запросов в режиме реального времени.
class MyListener(StreamingQueryListener):
def onQueryProgress(self, event):
print("Query made progress: ", event.progress.observedMetrics)