Поделиться через


Наблюдаемость в Azure Databricks для заданий, Декларативные конвейеры Lakeflow Spark и Lakeflow Connect

Мониторинг производительности, затрат и работоспособности приложений потоковой передачи необходим для создания надежных, эффективных конвейеров ETL. Azure Databricks предоставляет широкий набор функций наблюдаемости в заданиях, Декларативных конвейерах Lakeflow Spark и Lakeflow Connect для диагностики узких мест, оптимизации производительности и управления использованием ресурсов и затратами.

В этой статье описаны рекомендации в следующих областях:

  • Ключевые метрики производительности потоковой передачи
  • Схемы журналов событий и примеры запросов
  • Мониторинг потоковой обработки запросов
  • Наблюдаемость затрат с помощью системных таблиц
  • Экспорт журналов и метрик во внешние средства

Ключевые метрики для отслеживания потоковой передачи

При работе с конвейерами потоковой передачи отслеживайте следующие ключевые метрики:

Metric Purpose
Backpressure Отслеживает количество файлов и смещения (размеров). Помогает выявлять узкие места и гарантирует, что система может обрабатывать входящие данные без отставания.
Throughput Отслеживает количество сообщений, обработанных на микропакет. Оцените эффективность конвейера и убедитесь, что он удерживает темп с поглощением данных.
Duration Измеряет среднюю длительность микропартии. Указывает скорость обработки и помогает настроить интервалы пакетной обработки.
Latency Указывает, сколько записей и сообщений обрабатывается с течением времени. Помогает понять сквозные задержки конвейера и оптимизировать их для снижения задержки.
Использование кластера Отражает использование ЦП и памяти (%). Обеспечивает эффективное использование ресурсов и помогает масштабировать кластеры для удовлетворения требований к обработке.
Network Измерение переданных и полученных данных. Полезно для выявления узких мест сети и повышения производительности передачи данных.
Checkpoint Определяет обработанные данные и смещения. Обеспечивает согласованность и обеспечивает отказоустойчивость во время сбоев.
Cost Отображает почасовые, ежедневные и ежемесячные затраты на приложение потоковой передачи. Помогает в оптимизации бюджета и ресурсов.
Lineage Отображает наборы данных и слои, созданные в приложении потоковой передачи. Упрощает преобразование данных, отслеживание, обеспечение качества и отладку.

Журналы и метрики кластера

Журналы и метрики кластера Azure Databricks предоставляют подробные сведения о производительности и использовании кластера. Эти журналы и метрики включают сведения о ЦП, памяти, операций ввода-вывода диска, сетевом трафике и других системных метрик. Мониторинг этих метрик имеет решающее значение для оптимизации производительности кластера, эффективного управления ресурсами и устранения неполадок.

Журналы и метрики кластера Azure Databricks предоставляют подробные сведения о производительности кластера и использовании ресурсов. К ним относятся использование ЦП и памяти, операции ввода-вывода диска и сетевой трафик. Мониторинг этих метрик имеет решающее значение для:

  • Оптимизация производительности кластера.
  • Эффективное управление ресурсами.
  • Устранение неполадок в работе.

Метрики можно использовать с помощью пользовательского интерфейса Databricks или экспортировать в личные средства мониторинга. См . пример записной книжки: метрики Datadog.

Пользовательский интерфейс Spark

Пользовательский интерфейс Spark содержит подробные сведения о ходе выполнения заданий и этапов, включая количество задач, завершенных, ожидающих и неудачных. Это помогает понять поток выполнения и определить узкие места.

Для приложений потоковой передачи на вкладке "Потоковая передача " отображаются такие метрики, как скорость ввода, скорость обработки и длительность пакетной обработки. Он помогает отслеживать производительность заданий потоковой передачи и выявлять проблемы приема или обработки данных.

Дополнительные сведения см. в разделе "Отладка с помощью пользовательского интерфейса Spark ".

Метрики вычислений

Метрики вычислений помогут вам понять использование кластера. По мере выполнения задания вы можете увидеть, как оно масштабируется и как на него влияют ваши ресурсы. Вы сможете найти давление памяти, которое может привести к сбоям OOM или нагрузке ЦП, которые могут привести к длительным задержкам. Ниже приведены конкретные метрики, которые вы увидите:

  • Распределение нагрузки сервера: загрузка ЦП каждого узла за последние минуты.
  • Использование ЦП: процент времени, затраченного ЦП в различных режимах (например, пользователь, система, бездействие и айовайт).
  • Использование памяти: общее использование памяти в каждом режиме (например, используется, свободно, буфер и кэширован).
  • Использование буфера памяти: общее использование буфера памяти.
  • Свободное пространство файловой системы: общее использование файловой системы по каждой точке подключения.
  • Пропускная способность сети: количество полученных и передаваемых по сети байтов каждым устройством.
  • Количество активных узлов: количество активных узлов при каждом метке времени для заданного вычисления.

Дополнительные сведения см. в разделе "Мониторинг производительности и аппаратных метрик ".

Системные таблицы

Мониторинг затрат

Системные таблицы Azure Databricks предоставляют структурированный подход для мониторинга затрат и производительности заданий. Эти таблицы включают:

  • Сведения о выполнении задания.
  • Использование ресурсов.
  • Связанные затраты.

Используйте эти таблицы для понимания работоспособности операций и финансового влияния.

Requirements

Чтобы использовать системные таблицы для мониторинга затрат:

  • Администратор учетной записи должен включить system.lakeflow schema.
  • Пользователи должны либо:
    • Быть администратором хранилища метаданных и администратором учетной записи или
    • Иметь разрешения USE и SELECT на системные схемы.

Пример запроса: самые дорогие задания (последние 30 дней)

Этот запрос определяет самые дорогие задания за последние 30 дней, помогая в анализе затрат и оптимизации.

WITH list_cost_per_job AS (
     SELECT
       t1.workspace_id,
       t1.usage_metadata.job_id,
       COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
       SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
       FIRST(identity_metadata.run_as, true) AS run_as,
       FIRST(t1.custom_tags, true) AS custom_tags,
       MAX(t1.usage_end_time) AS last_seen_date
     FROM system.billing.usage t1
     INNER JOIN system.billing.list_prices list_prices ON
       t1.cloud = list_prices.cloud AND
       t1.sku_name = list_prices.sku_name AND
       t1.usage_start_time >= list_prices.price_start_time AND
       (t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
     WHERE
       t1.billing_origin_product = "JOBS"
       AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
     GROUP BY ALL
   ),
   most_recent_jobs AS (
     SELECT
       *,
       ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
     FROM
       system.lakeflow.jobs QUALIFY rn=1
   )
   SELECT
     t2.name,
     t1.job_id,
     t1.workspace_id,
     t1.runs,
     t1.run_as,
     SUM(list_cost) AS list_cost,
     t1.last_seen_date
   FROM list_cost_per_job t1
   LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
   GROUP BY ALL
   ORDER BY list_cost DESC

Декларативные конвейеры Lakeflow Spark

Журнал событий Lakeflow Spark Declarative Pipelines фиксирует полную запись всех событий конвейера, в том числе:

  • Журналы аудита.
  • Проверки качества данных.
  • Ход выполнения конвейера.
  • Происхождение данных.

Журнал событий автоматически включен для всех декларативных конвейеров Spark Lakeflow и доступен через:

  • Пользовательский интерфейс конвейера: просмотр журналов напрямую.
  • API конвейеров: программный доступ.
  • Прямой запрос: запрос таблицы журнала событий.

Дополнительные сведения см. в схеме журнала событий для декларативных конвейеров Spark Lakeflow.

Примеры запросов

В этих примерах запросы помогают отслеживать производительность и работоспособность конвейеров, предоставляя ключевые метрики, такие как длительность пакетов, пропускная способность, обратное давление и использование ресурсов систем.

Средняя длительность партии

Этот запрос вычисляет среднюю длительность пакетов, обработанных конвейером.

SELECT
  (max_t - min_t) / batch_count as avg_batch_duration_seconds,
  batch_count,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      count(*) as batch_count,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Средняя пропускная способность

Этот запрос вычисляет среднюю пропускную способность конвейера с точки зрения обработанных строк в секунду.

SELECT
  (max_t - min_t) / total_rows as avg_throughput_rps,
  total_rows,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      sum(
        details:flow_progress:metrics:num_output_rows
      ) as total_rows,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Backpressure

Этот запрос измеряет обратное давление потока данных, проверяя накопление данных.

SELECT
  timestamp,
  DOUBLE(
    details:flow_progress:metrics:backlog_bytes
  ) AS backlog_bytes,
  DOUBLE(
    details:flow_progress:metrics:backlog_files
  ) AS backlog_files
FROM
  event_log
WHERE
  event_type = 'flow_progress'

Использование кластеров и слотов

Этот запрос содержит аналитические сведения об использовании кластеров или слотов, используемых конвейером.

SELECT
  date_trunc("hour", timestamp) AS hour,
  AVG (
    DOUBLE (
      details:cluster_resources:num_task_slots
    )
  ) AS num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_task_slots
    )
  ) AS avg_num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:num_executors
    )
  ) AS num_executors,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_task_slot_utilization
    )
  ) AS avg_utilization,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_queued_tasks
    )
  ) AS queue_size
FROM
  event_log
WHERE
  details : cluster_resources : avg_num_queued_tasks IS NOT NULL
  AND origin.update_id = '${latest_update_id}'
GROUP BY
  1;

Jobs

Вы можете отслеживать потоковые запросы в заданиях с помощью прослушивателя потоковых запросов.

Подключите прослушиватель к сеансу Spark, чтобы включить прослушиватель потоковых запросов вAzure Databricks. Этот слушатель будет отслеживать ход и метрики потоковых запросов. Его можно использовать для отправки метрик во внешние средства мониторинга или их записи для дальнейшего анализа.

Пример. Экспорт метрик во внешние средства мониторинга

Note

Это доступно в Databricks Runtime 11.3 LTS и выше для Python и Scala.

Метрики потоковой передачи можно экспортировать во внешние службы для оповещения или мониторинга с помощью StreamingQueryListener интерфейса.

Ниже приведен базовый пример реализации прослушивателя:

from pyspark.sql.streaming import StreamingQueryListener

class MyListener(StreamingQueryListener):
   def onQueryStarted(self, event):
       print("Query started: ", event.id)

   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress)

   def onQueryTerminated(self, event):
       print("Query terminated: ", event.id)

spark.streams.addListener(MyListener())

Пример. Использование прослушивателя запросов в Azure Databricks

Ниже приведен пример журнала событий StreamingQueryListener для запроса потоковой передачи Kafka в Delta Lake:

{
  "id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
  "runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
  "timestamp": "2024-05-15T21:57:50.782Z",
  "batchId": 0,
  "batchDuration": 3601,
  "numInputRows": 20,
  "inputRowsPerSecond": 0.0,
  "processedRowsPerSecond": 5.55401277422938,
  "durationMs": {
    "addBatch": 1544,
    "commitBatch": 686,
    "commitOffsets": 27,
    "getBatch": 12,
    "latestOffset": 577,
    "queryPlanning": 105,
    "triggerExecution": 3600,
    "walCommit": 34
  },
  "stateOperators": [
    {
      "operatorName": "symmetricHashJoin",
      "numRowsTotal": 20,
      "numRowsUpdated": 20,
      "allUpdatesTimeMs": 473,
      "numRowsRemoved": 0,
      "allRemovalsTimeMs": 0,
      "commitTimeMs": 277,
      "memoryUsedBytes": 13120,
      "numRowsDroppedByWatermark": 0,
      "numShufflePartitions": 5,
      "numStateStoreInstances": 20,
      "customMetrics": {
        "loadedMapCacheHitCount": 0,
        "loadedMapCacheMissCount": 0,
        "stateOnCurrentVersionSizeBytes": 5280
      }
    }
  ],
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic-1]]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "avgOffsetsBehindLatest": "0.0",
        "estimatedTotalBytesBehindLatest": "0.0",
        "maxOffsetsBehindLatest": "0",
        "minOffsetsBehindLatest": "0"
      }
    },
    {
      "description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "numBytesOutstanding": "0",
        "numFilesOutstanding": "0"
      }
    }
  ]
}

Дополнительные примеры см. в следующих примерах: Примеры.

Метрики хода выполнения запроса

Метрики хода выполнения запросов необходимы для мониторинга производительности и работоспособности потоковых запросов. Эти метрики включают количество входных строк, скорость обработки и различные продолжительности, связанные с выполнением запроса. Эти метрики можно наблюдать, прикрепив StreamingQueryListener к сеансу Spark. Прослушиватель будет выдавать события, содержащие эти метрики, в конце каждого этапа потоковой передачи.

Например, можно получить доступ к метрикам с помощью StreamingQueryProgress.observedMetrics карты в методе прослушивателя onQueryProgress . Это позволяет отслеживать и анализировать производительность потоковых запросов в режиме реального времени.

class MyListener(StreamingQueryListener):
   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress.observedMetrics)