Sdílet prostřednictvím


Pozorovatelnost v Azure Databricks pro úlohy, deklarativní kanály Lakeflow a Lakeflow Connect

Monitorování výkonu, nákladů a stavu streamovaných aplikací je nezbytné k vytváření spolehlivých a efektivních kanálů ETL. Azure Databricks poskytuje bohatou sadu funkcí pozorovatelnosti napříč úlohami, deklarativními kanály Lakeflow a Lakeflow Connect, které pomáhají diagnostikovat kritické body, optimalizovat výkon a spravovat využití a náklady na prostředky.

Tento článek popisuje osvědčené postupy v následujících oblastech:

  • Klíčové metriky výkonu streamování
  • Schémata protokolu událostí a ukázkové dotazy
  • Monitorování streamovacích dotazů
  • Pozorovatelnost nákladů pomocí systémových tabulek
  • Export protokolů a metrik do externích nástrojů

Klíčové metriky pro pozorovatelnost streamování

Při provozu streamovacích kanálů monitorujte následující klíčové metriky:

Metrický Účel
Zpětný tlak Monitoruje počet souborů a posunů (velikostí). Pomáhá identifikovat kritické body a zajišťuje, aby systém mohl zpracovávat příchozí data, aniž by zapadal.
Propustnost Sleduje počet zpráv zpracovaných v mikrodávce. Vyhodnoťte efektivitu kanálu a zkontrolujte, jestli udržuje krok s příjmem dat.
Doba trvání Měří průměrnou dobu trvání mikrodávky. Označuje rychlost zpracování a pomáhá ladit dávkové intervaly.
Latence Určuje, kolik záznamů a zpráv se zpracovává v průběhu času. Pomáhá pochopit zpoždění mezi koncovými kanály a optimalizovat je pro nižší latence.
Využití clusteru Odráží využití procesoru a paměti (%). Zajišťuje efektivní využití prostředků a pomáhá škálovat clustery tak, aby splňovaly požadavky na zpracování.
Síť Měří přenášená a přijatá data. Užitečné pro identifikaci kritických bodů sítě a zlepšení výkonu přenosu dat.
kontrolní bod Identifikuje zpracovávaná data a posuny. Zajišťuje konzistenci a umožňuje odolnost proti chybám během selhání.
Náklady Zobrazuje hodinovou, denní a měsíční náklady na streamovací aplikaci. Pomáhá při rozpočtování a optimalizaci zdrojů.
rodokmenu Zobrazí datové sady a vrstvy vytvořené v aplikaci streamování. Usnadňuje transformaci dat, kontrolu kvality, sledování a ladění.

Protokoly a metriky clusteru

Protokoly a metriky clusteru Azure Databricks poskytují podrobné přehledy o výkonu a využití clusteru. Mezi tyto protokoly a metriky patří informace o procesoru, paměti, vstupně-výstupních operacích disku, síťovém provozu a dalších systémových metrikách. Monitorování těchto metrik je zásadní pro optimalizaci výkonu clusteru, efektivní správu prostředků a řešení potíží.

Protokoly a metriky clusteru Azure Databricks nabízejí podrobné přehledy o výkonu clusteru a využití prostředků. Patří sem využití procesoru a paměti, vstupně-výstupní operace disku a síťový provoz. Monitorování těchto metrik je důležité pro:

  • Optimalizace výkonu clusteru
  • Efektivní správa prostředků
  • Řešení provozních problémů

Metriky je možné využít prostřednictvím uživatelského rozhraní Databricks nebo exportovat do osobních monitorovacích nástrojů. Viz příklad poznámkového bloku: Metriky služby Datadog.

Uživatelské rozhraní Sparku

Uživatelské rozhraní Sparku zobrazuje podrobné informace o průběhu úloh a fází, včetně počtu dokončených, čekajících a neúspěšných úkolů. To vám pomůže pochopit tok provádění a identifikovat kritické body.

U streamovacích aplikací se na kartě Streamování zobrazují metriky, jako je vstupní rychlost, rychlost zpracování a doba trvání dávky. Pomáhá monitorovat výkon úloh streamování a identifikovat případné problémy s příjmem nebo zpracováním dat.

Další informace najdete v tématu Ladění pomocí uživatelského rozhraní Apache Spark .

Výpočetní metriky

Výpočetní metriky vám pomůžou porozumět využití clusteru. Při spuštění úlohy uvidíte, jak se škáluje a jak jsou vaše prostředky ovlivněny. Budete schopni najít zatížení paměti, které by mohlo vést k selhání kvůli nedostatku paměti (OOM), nebo zatížení procesoru, které by mohlo způsobit dlouhá zpoždění. Tady jsou konkrétní metriky, které uvidíte:

  • Distribuce zatížení serveru: Využití procesoru každého uzlu za poslední minutu
  • Využití procesoru: Procento času stráveného procesorem v různých režimech (například uživatel, systém, nečinný a iowait).
  • Využití paměti: Celkové využití paměti v jednotlivých režimech (například využité, volné, vyrovnávací paměti a ukládání do mezipaměti).
  • Využití paměti swapu: Celkové využití paměti swapu
  • Volné místo systému souborů: Celkové využití systému souborů každým přípojným bodem
  • Propustnost sítě: Počet přijatých a přenášených bajtů prostřednictvím sítě jednotlivými zařízeními.
  • Počet aktivních uzlů: Počet aktivních uzlů v každém časovém razítku daného výpočetního prostředí.

Další informace najdete v tématu Sledování výkonu a grafy metrik hardwaru.

Systémové tabulky

Monitorování nákladů

Systémové tabulky Azure Databricks poskytují strukturovaný přístup k monitorování nákladů na úlohy a výkonu. Mezi tyto tabulky patří:

  • Podrobnosti o spuštění úlohy
  • Využití prostředků
  • Přidružené náklady.

Tyto tabulky slouží k pochopení provozního stavu a finančního dopadu.

Požadavky

Použití systémových tabulek pro monitorování nákladů:

  • Správce účtu musí povolit system.lakeflow schema.
  • Uživatelé musí:
    • Být správcem metastoru i správcem účtu nebo
    • Mít oprávnění USE a SELECT k systémovým schématům.

Příklad dotazu: Nejnákladnější úlohy (posledních 30 dnů)

Tento dotaz identifikuje nejnákladnější úlohy za posledních 30 dnů, což pomáhá při analýze nákladů a optimalizaci.

WITH list_cost_per_job AS (
     SELECT
       t1.workspace_id,
       t1.usage_metadata.job_id,
       COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
       SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
       FIRST(identity_metadata.run_as, true) AS run_as,
       FIRST(t1.custom_tags, true) AS custom_tags,
       MAX(t1.usage_end_time) AS last_seen_date
     FROM system.billing.usage t1
     INNER JOIN system.billing.list_prices list_prices ON
       t1.cloud = list_prices.cloud AND
       t1.sku_name = list_prices.sku_name AND
       t1.usage_start_time >= list_prices.price_start_time AND
       (t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
     WHERE
       t1.billing_origin_product = "JOBS"
       AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
     GROUP BY ALL
   ),
   most_recent_jobs AS (
     SELECT
       *,
       ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
     FROM
       system.lakeflow.jobs QUALIFY rn=1
   )
   SELECT
     t2.name,
     t1.job_id,
     t1.workspace_id,
     t1.runs,
     t1.run_as,
     SUM(list_cost) AS list_cost,
     t1.last_seen_date
   FROM list_cost_per_job t1
   LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
   GROUP BY ALL
   ORDER BY list_cost DESC

Deklarativní kanály Lakeflow

Protokol událostí deklarativních kanálů Lakeflow zaznamenává komplexní záznam všech událostí kanálu, včetně:

  • Protokoly auditu
  • Kontroly kvality dat.
  • Postup projektu
  • Linage dat.

Protokol událostí je automaticky povolený pro všechny deklarativní kanály Lakeflow a dá se k němu přistupovat prostřednictvím:

  • Rozhraní Pipeline: Prohlížet protokoly přímo.
  • DLT API: Programový přístup.
  • Přímý dotaz: Zadejte dotaz na tabulku protokolu událostí.

Další informace najdete ve schématu protokolu událostí pro deklarativní kanály Lakeflow.

Příkladové dotazy

Tyto ukázkové dotazy pomáhají monitorovat výkon a stav datových toků tím, že poskytují klíčové metriky, jako je doba trvání dávky, propustnost, zpětný tlak a využití prostředků.

Průměrná doba trvání dávky

Tento dotaz vypočítá průměrnou dobu trvání dávek zpracovaných kanálem.

SELECT
  (max_t - min_t) / batch_count as avg_batch_duration_seconds,
  batch_count,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      count(*) as batch_count,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Průměrná propustnost

Tento dotaz vypočítá průměrnou propustnost kanálu z hlediska zpracovaných řádků za sekundu.

SELECT
  (max_t - min_t) / total_rows as avg_throughput_rps,
  total_rows,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      sum(
        details:flow_progress:metrics:num_output_rows
      ) as total_rows,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Zpětný tlak

Tento dotaz měří zpětný tlak kanálu kontrolou čekačky dat.

SELECT
  timestamp,
  DOUBLE(
    details:flow_progress:metrics:backlog_bytes
  ) AS backlog_bytes,
  DOUBLE(
    details:flow_progress:metrics:backlog_files
  ) AS backlog_files
FROM
  event_log
WHERE
  event_type = 'flow_progress'

Využití clusterů a slotů

Tento dotaz poskytuje informace o využití clusterů nebo slotů, které používá systém.

SELECT
  date_trunc("hour", timestamp) AS hour,
  AVG (
    DOUBLE (
      details:cluster_resources:num_task_slots
    )
  ) AS num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_task_slots
    )
  ) AS avg_num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:num_executors
    )
  ) AS num_executors,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_task_slot_utilization
    )
  ) AS avg_utilization,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_queued_tasks
    )
  ) AS queue_size
FROM
  event_log
WHERE
  details : cluster_resources : avg_num_queued_tasks IS NOT NULL
  AND origin.update_id = '${latest_update_id}'
GROUP BY
  1;

Pracovní místa

Streamovací dotazy můžete sledovat v úlohách prostřednictvím Streamovacího Query Listeneru.

Připojte naslouchadlo k relaci Sparku a povolte naslouchadlo streamování dotazů v Azure Databricks. Tento posluchač bude monitorovat průběh a metriky vašich streamovaných dotazů. Dá se použít k nasdílení metrik externím monitorovacím nástrojům nebo k jejich protokolování pro další analýzu.

Příklad: Export metrik do externích monitorovacích nástrojů

:::poznámka

To je k dispozici v Databricks Runtime 11.3 LTS a vyšší pro Python a Scala.

:::

Streamované metriky můžete exportovat do externích služeb pro upozorňování nebo řídicí panel pomocí StreamingQueryListener rozhraní.

Tady je základní příklad, jak implementovat posluchač:

from pyspark.sql.streaming import StreamingQueryListener

class MyListener(StreamingQueryListener):
   def onQueryStarted(self, event):
       print("Query started: ", event.id)

   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress)

   def onQueryTerminated(self, event):
       print("Query terminated: ", event.id)

spark.streams.addListener(MyListener())

Příklad: Použití posluchače dotazu v Azure Databricks

Níže je příklad protokolu událostí StreamingQueryListener pro streamovací dotaz z Kafka do Delta Lake.

{
  "id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
  "runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
  "timestamp": "2024-05-15T21:57:50.782Z",
  "batchId": 0,
  "batchDuration": 3601,
  "numInputRows": 20,
  "inputRowsPerSecond": 0.0,
  "processedRowsPerSecond": 5.55401277422938,
  "durationMs": {
    "addBatch": 1544,
    "commitBatch": 686,
    "commitOffsets": 27,
    "getBatch": 12,
    "latestOffset": 577,
    "queryPlanning": 105,
    "triggerExecution": 3600,
    "walCommit": 34
  },
  "stateOperators": [
    {
      "operatorName": "symmetricHashJoin",
      "numRowsTotal": 20,
      "numRowsUpdated": 20,
      "allUpdatesTimeMs": 473,
      "numRowsRemoved": 0,
      "allRemovalsTimeMs": 0,
      "commitTimeMs": 277,
      "memoryUsedBytes": 13120,
      "numRowsDroppedByWatermark": 0,
      "numShufflePartitions": 5,
      "numStateStoreInstances": 20,
      "customMetrics": {
        "loadedMapCacheHitCount": 0,
        "loadedMapCacheMissCount": 0,
        "stateOnCurrentVersionSizeBytes": 5280
      }
    }
  ],
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic-1]]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "avgOffsetsBehindLatest": "0.0",
        "estimatedTotalBytesBehindLatest": "0.0",
        "maxOffsetsBehindLatest": "0",
        "minOffsetsBehindLatest": "0"
      }
    },
    {
      "description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "numBytesOutstanding": "0",
        "numFilesOutstanding": "0"
      }
    }
  ]
}

Další příklady najdete v tématu: Příklady.

Metriky průběhu dotazů

Metriky průběhu dotazů jsou nezbytné pro monitorování výkonu a stavu streamovaných dotazů. Mezi tyto metriky patří počet vstupních řádků, rychlost zpracování a různé doby trvání související se spuštěním dotazu. Metriky můžete sledovat tím, že připojíte StreamingQueryListener k relaci Spark. Posluchač bude na konci každé epochy streamování vyvolávat události obsahující tyto metriky.

K metrikám můžete například přistupovat pomocí StreamingQueryProgress.observedMetrics mapy v metodě posluchače onQueryProgress. Díky tomu můžete sledovat a analyzovat výkon streamovaných dotazů v reálném čase.

class MyListener(StreamingQueryListener):
   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress.observedMetrics)