Poznámka
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Monitorování výkonu, nákladů a stavu streamovaných aplikací je nezbytné k vytváření spolehlivých a efektivních kanálů ETL. Azure Databricks poskytuje bohatou sadu funkcí pozorovatelnosti napříč úlohami, deklarativními kanály Lakeflow a Lakeflow Connect, které pomáhají diagnostikovat kritické body, optimalizovat výkon a spravovat využití a náklady na prostředky.
Tento článek popisuje osvědčené postupy v následujících oblastech:
- Klíčové metriky výkonu streamování
- Schémata protokolu událostí a ukázkové dotazy
- Monitorování streamovacích dotazů
- Pozorovatelnost nákladů pomocí systémových tabulek
- Export protokolů a metrik do externích nástrojů
Klíčové metriky pro pozorovatelnost streamování
Při provozu streamovacích kanálů monitorujte následující klíčové metriky:
Metrický | Účel |
---|---|
Zpětný tlak | Monitoruje počet souborů a posunů (velikostí). Pomáhá identifikovat kritické body a zajišťuje, aby systém mohl zpracovávat příchozí data, aniž by zapadal. |
Propustnost | Sleduje počet zpráv zpracovaných v mikrodávce. Vyhodnoťte efektivitu kanálu a zkontrolujte, jestli udržuje krok s příjmem dat. |
Doba trvání | Měří průměrnou dobu trvání mikrodávky. Označuje rychlost zpracování a pomáhá ladit dávkové intervaly. |
Latence | Určuje, kolik záznamů a zpráv se zpracovává v průběhu času. Pomáhá pochopit zpoždění mezi koncovými kanály a optimalizovat je pro nižší latence. |
Využití clusteru | Odráží využití procesoru a paměti (%). Zajišťuje efektivní využití prostředků a pomáhá škálovat clustery tak, aby splňovaly požadavky na zpracování. |
Síť | Měří přenášená a přijatá data. Užitečné pro identifikaci kritických bodů sítě a zlepšení výkonu přenosu dat. |
kontrolní bod | Identifikuje zpracovávaná data a posuny. Zajišťuje konzistenci a umožňuje odolnost proti chybám během selhání. |
Náklady | Zobrazuje hodinovou, denní a měsíční náklady na streamovací aplikaci. Pomáhá při rozpočtování a optimalizaci zdrojů. |
rodokmenu | Zobrazí datové sady a vrstvy vytvořené v aplikaci streamování. Usnadňuje transformaci dat, kontrolu kvality, sledování a ladění. |
Protokoly a metriky clusteru
Protokoly a metriky clusteru Azure Databricks poskytují podrobné přehledy o výkonu a využití clusteru. Mezi tyto protokoly a metriky patří informace o procesoru, paměti, vstupně-výstupních operacích disku, síťovém provozu a dalších systémových metrikách. Monitorování těchto metrik je zásadní pro optimalizaci výkonu clusteru, efektivní správu prostředků a řešení potíží.
Protokoly a metriky clusteru Azure Databricks nabízejí podrobné přehledy o výkonu clusteru a využití prostředků. Patří sem využití procesoru a paměti, vstupně-výstupní operace disku a síťový provoz. Monitorování těchto metrik je důležité pro:
- Optimalizace výkonu clusteru
- Efektivní správa prostředků
- Řešení provozních problémů
Metriky je možné využít prostřednictvím uživatelského rozhraní Databricks nebo exportovat do osobních monitorovacích nástrojů. Viz příklad poznámkového bloku: Metriky služby Datadog.
Uživatelské rozhraní Sparku
Uživatelské rozhraní Sparku zobrazuje podrobné informace o průběhu úloh a fází, včetně počtu dokončených, čekajících a neúspěšných úkolů. To vám pomůže pochopit tok provádění a identifikovat kritické body.
U streamovacích aplikací se na kartě Streamování zobrazují metriky, jako je vstupní rychlost, rychlost zpracování a doba trvání dávky. Pomáhá monitorovat výkon úloh streamování a identifikovat případné problémy s příjmem nebo zpracováním dat.
Další informace najdete v tématu Ladění pomocí uživatelského rozhraní Apache Spark .
Výpočetní metriky
Výpočetní metriky vám pomůžou porozumět využití clusteru. Při spuštění úlohy uvidíte, jak se škáluje a jak jsou vaše prostředky ovlivněny. Budete schopni najít zatížení paměti, které by mohlo vést k selhání kvůli nedostatku paměti (OOM), nebo zatížení procesoru, které by mohlo způsobit dlouhá zpoždění. Tady jsou konkrétní metriky, které uvidíte:
- Distribuce zatížení serveru: Využití procesoru každého uzlu za poslední minutu
- Využití procesoru: Procento času stráveného procesorem v různých režimech (například uživatel, systém, nečinný a iowait).
- Využití paměti: Celkové využití paměti v jednotlivých režimech (například využité, volné, vyrovnávací paměti a ukládání do mezipaměti).
- Využití paměti swapu: Celkové využití paměti swapu
- Volné místo systému souborů: Celkové využití systému souborů každým přípojným bodem
- Propustnost sítě: Počet přijatých a přenášených bajtů prostřednictvím sítě jednotlivými zařízeními.
- Počet aktivních uzlů: Počet aktivních uzlů v každém časovém razítku daného výpočetního prostředí.
Další informace najdete v tématu Sledování výkonu a grafy metrik hardwaru.
Systémové tabulky
Monitorování nákladů
Systémové tabulky Azure Databricks poskytují strukturovaný přístup k monitorování nákladů na úlohy a výkonu. Mezi tyto tabulky patří:
- Podrobnosti o spuštění úlohy
- Využití prostředků
- Přidružené náklady.
Tyto tabulky slouží k pochopení provozního stavu a finančního dopadu.
Požadavky
Použití systémových tabulek pro monitorování nákladů:
- Správce účtu musí povolit
system.lakeflow schema
. - Uživatelé musí:
- Být správcem metastoru i správcem účtu nebo
- Mít oprávnění
USE
aSELECT
k systémovým schématům.
Příklad dotazu: Nejnákladnější úlohy (posledních 30 dnů)
Tento dotaz identifikuje nejnákladnější úlohy za posledních 30 dnů, což pomáhá při analýze nákladů a optimalizaci.
WITH list_cost_per_job AS (
SELECT
t1.workspace_id,
t1.usage_metadata.job_id,
COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
FIRST(identity_metadata.run_as, true) AS run_as,
FIRST(t1.custom_tags, true) AS custom_tags,
MAX(t1.usage_end_time) AS last_seen_date
FROM system.billing.usage t1
INNER JOIN system.billing.list_prices list_prices ON
t1.cloud = list_prices.cloud AND
t1.sku_name = list_prices.sku_name AND
t1.usage_start_time >= list_prices.price_start_time AND
(t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
WHERE
t1.billing_origin_product = "JOBS"
AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
GROUP BY ALL
),
most_recent_jobs AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
FROM
system.lakeflow.jobs QUALIFY rn=1
)
SELECT
t2.name,
t1.job_id,
t1.workspace_id,
t1.runs,
t1.run_as,
SUM(list_cost) AS list_cost,
t1.last_seen_date
FROM list_cost_per_job t1
LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY list_cost DESC
Deklarativní kanály Lakeflow
Protokol událostí deklarativních kanálů Lakeflow zaznamenává komplexní záznam všech událostí kanálu, včetně:
- Protokoly auditu
- Kontroly kvality dat.
- Postup projektu
- Linage dat.
Protokol událostí je automaticky povolený pro všechny deklarativní kanály Lakeflow a dá se k němu přistupovat prostřednictvím:
- Rozhraní Pipeline: Prohlížet protokoly přímo.
- DLT API: Programový přístup.
- Přímý dotaz: Zadejte dotaz na tabulku protokolu událostí.
Další informace najdete ve schématu protokolu událostí pro deklarativní kanály Lakeflow.
Příkladové dotazy
Tyto ukázkové dotazy pomáhají monitorovat výkon a stav datových toků tím, že poskytují klíčové metriky, jako je doba trvání dávky, propustnost, zpětný tlak a využití prostředků.
Průměrná doba trvání dávky
Tento dotaz vypočítá průměrnou dobu trvání dávek zpracovaných kanálem.
SELECT
(max_t - min_t) / batch_count as avg_batch_duration_seconds,
batch_count,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
count(*) as batch_count,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Průměrná propustnost
Tento dotaz vypočítá průměrnou propustnost kanálu z hlediska zpracovaných řádků za sekundu.
SELECT
(max_t - min_t) / total_rows as avg_throughput_rps,
total_rows,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
sum(
details:flow_progress:metrics:num_output_rows
) as total_rows,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Zpětný tlak
Tento dotaz měří zpětný tlak kanálu kontrolou čekačky dat.
SELECT
timestamp,
DOUBLE(
details:flow_progress:metrics:backlog_bytes
) AS backlog_bytes,
DOUBLE(
details:flow_progress:metrics:backlog_files
) AS backlog_files
FROM
event_log
WHERE
event_type = 'flow_progress'
Využití clusterů a slotů
Tento dotaz poskytuje informace o využití clusterů nebo slotů, které používá systém.
SELECT
date_trunc("hour", timestamp) AS hour,
AVG (
DOUBLE (
details:cluster_resources:num_task_slots
)
) AS num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:avg_num_task_slots
)
) AS avg_num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:num_executors
)
) AS num_executors,
AVG (
DOUBLE (
details:cluster_resources:avg_task_slot_utilization
)
) AS avg_utilization,
AVG (
DOUBLE (
details:cluster_resources:avg_num_queued_tasks
)
) AS queue_size
FROM
event_log
WHERE
details : cluster_resources : avg_num_queued_tasks IS NOT NULL
AND origin.update_id = '${latest_update_id}'
GROUP BY
1;
Pracovní místa
Streamovací dotazy můžete sledovat v úlohách prostřednictvím Streamovacího Query Listeneru.
Připojte naslouchadlo k relaci Sparku a povolte naslouchadlo streamování dotazů v Azure Databricks. Tento posluchač bude monitorovat průběh a metriky vašich streamovaných dotazů. Dá se použít k nasdílení metrik externím monitorovacím nástrojům nebo k jejich protokolování pro další analýzu.
Příklad: Export metrik do externích monitorovacích nástrojů
:::poznámka
To je k dispozici v Databricks Runtime 11.3 LTS a vyšší pro Python a Scala.
:::
Streamované metriky můžete exportovat do externích služeb pro upozorňování nebo řídicí panel pomocí StreamingQueryListener
rozhraní.
Tady je základní příklad, jak implementovat posluchač:
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: ", event.id)
def onQueryProgress(self, event):
print("Query made progress: ", event.progress)
def onQueryTerminated(self, event):
print("Query terminated: ", event.id)
spark.streams.addListener(MyListener())
Příklad: Použití posluchače dotazu v Azure Databricks
Níže je příklad protokolu událostí StreamingQueryListener pro streamovací dotaz z Kafka do Delta Lake.
{
"id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"timestamp": "2024-05-15T21:57:50.782Z",
"batchId": 0,
"batchDuration": 3601,
"numInputRows": 20,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 5.55401277422938,
"durationMs": {
"addBatch": 1544,
"commitBatch": 686,
"commitOffsets": 27,
"getBatch": 12,
"latestOffset": 577,
"queryPlanning": 105,
"triggerExecution": 3600,
"walCommit": 34
},
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 20,
"numRowsUpdated": 20,
"allUpdatesTimeMs": 473,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 277,
"memoryUsedBytes": 13120,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 5,
"numStateStoreInstances": 20,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"stateOnCurrentVersionSizeBytes": 5280
}
}
],
"sources": [
{
"description": "KafkaV2[Subscribe[topic-1]]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
},
{
"description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
]
}
Další příklady najdete v tématu: Příklady.
Metriky průběhu dotazů
Metriky průběhu dotazů jsou nezbytné pro monitorování výkonu a stavu streamovaných dotazů. Mezi tyto metriky patří počet vstupních řádků, rychlost zpracování a různé doby trvání související se spuštěním dotazu. Metriky můžete sledovat tím, že připojíte StreamingQueryListener
k relaci Spark. Posluchač bude na konci každé epochy streamování vyvolávat události obsahující tyto metriky.
K metrikám můžete například přistupovat pomocí StreamingQueryProgress.observedMetrics
mapy v metodě posluchače onQueryProgress
. Díky tomu můžete sledovat a analyzovat výkon streamovaných dotazů v reálném čase.
class MyListener(StreamingQueryListener):
def onQueryProgress(self, event):
print("Query made progress: ", event.progress.observedMetrics)