Jegyzet
Az oldalhoz való hozzáférés engedélyezést igényel. Próbálhatod be jelentkezni vagy könyvtárat váltani.
Az oldalhoz való hozzáférés engedélyezést igényel. Megpróbálhatod a könyvtár váltását.
A streamelési alkalmazások teljesítményének, költségeinek és állapotának monitorozása elengedhetetlen a megbízható és hatékony ETL-folyamatok létrehozásához. Az Azure Databricks a feladatok, a Lakeflow Spark deklaratív folyamatok és a Lakeflow Connect számos megfigyelhetőségi funkcióját biztosítja a szűk keresztmetszetek diagnosztizálásához, a teljesítmény optimalizálásához és az erőforrás-használat és a költségek kezeléséhez.
Ez a cikk a következő területeken ismerteti az ajánlott eljárásokat:
- Főbb streamelési teljesítménymetrikák
- Eseménynapló-sémák és példa-lekérdezések
- Streamelési lekérdezés monitorozása
- Költségmegfigyelés rendszertáblák használatával
- Naplók és metrikák exportálása külső eszközökre
A streamelési megfigyelhetőség fő mérőszámai
Streamelési folyamatok futtatásakor figyelje a következő főbb metrikákat:
| Metric | Purpose |
|---|---|
| Backpressure | A fájlok és méretek (offsetek) számát figyeli. Segít azonosítani a szűk keresztmetszeteket, és biztosítja, hogy a rendszer anélkül tudja kezelni a bejövő adatokat, hogy lemarad. |
| Throughput | Nyomon követi a mikro kötegenként feldolgozott üzenetek számát. Értékelje a folyamat hatékonyságát, és ellenőrizze, hogy lépést tart-e az adatbetöltéssel. |
| Duration | Egy mikroköteg átlagos időtartamát méri. A feldolgozási sebességet jelzi, és segít a kötegek időközeinek finomhangolásában. |
| Latency | Azt jelzi, hogy a rendszer hány rekordot/üzenetet dolgoz fel az idő függvényében. Segít megérteni a végpontok közötti folyamatkéséseket, és optimalizálni az alacsonyabb késésekre. |
| Fürt kihasználtsága | A processzor- és memóriahasználatot (%) tükrözi. Biztosítja a hatékony erőforrás-használatot, és segít a fürtök méretezésében a feldolgozási igényeknek megfelelően. |
| Network | Az átvitt és fogadott adatokat méri. Hasznos a hálózati szűk keresztmetszetek azonosításához és az adatátvitel teljesítményének javításához. |
| Checkpoint | Azonosítja a feldolgozott adatokat és az eltolásokat. Biztosítja a konzisztenciát, és lehetővé teszi a hibatűrést a hibák során. |
| Cost | Egy streamelési alkalmazás óránkénti, napi és havi költségeit jeleníti meg. A költségvetés és az erőforrás-optimalizálás segédeszköze. |
| Lineage | Megjeleníti a streamelési alkalmazásban létrehozott adathalmazokat és rétegeket. Elősegíti az adatok átalakítását, nyomon követését, minőségbiztosítását és hibakeresését. |
Fürtnaplók és metrikák
Az Azure Databricks fürt naplók és mutatók részletes betekintést nyújtanak a fürt teljesítményébe és kihasználtságába. Ezek a naplók és metrikák a cpu-ról, a memóriáról, a lemez I/O-járól, a hálózati forgalomról és más rendszermetrikákról tartalmaznak információkat. Ezeknek a metrikáknak a monitorozása elengedhetetlen a fürt teljesítményének optimalizálásához, az erőforrások hatékony kezeléséhez és a problémák elhárításához.
Az Azure Databricks-fürtnaplók és -metrikák részletes betekintést nyújtanak a fürt teljesítményébe és erőforrás-kihasználtságába. Ezek közé tartozik a processzor- és memóriahasználat, a lemez I/O-ja és a hálózati forgalom. Az alábbi metrikák monitorozása kritikus fontosságú a következőkhöz:
- A klaszter teljesítményének optimalizálása.
- Erőforrások hatékony kezelése.
- Működési problémák elhárítása.
A metrikák a Databricks felhasználói felületén keresztül használhatók, vagy exportálhatók személyes monitorozási eszközökre. Lásd a jegyzetfüzet példáját: Datadog-metrikák.
Spark felhasználói felület
A Spark felhasználói felülete részletes információkat jelenít meg a feladatok és szakaszok előrehaladásáról, beleértve a befejezett, függőben lévő és sikertelen tevékenységek számát. Ez segít megérteni a végrehajtási folyamatot, és azonosítani a szűk keresztmetszeteket.
Streamelési alkalmazások esetén a Stream lap olyan metrikákat jelenít meg, mint a bemeneti sebesség, a feldolgozási sebesség és a köteg időtartama. Segít monitorozni a streamelési feladatok teljesítményét, és azonosítani az adatbetöltési vagy feldolgozási problémákat.
További információ : Hibakeresés a Spark felhasználói felületén .
Számítási metrikák
A számítási metrikák segítenek megérteni a fürt kihasználtságát. A feladat futtatásakor láthatja, hogyan skálázódik, és hogyan érinti az erőforrásokat. Megtalálhatja a memóriaterhelést, amely OOM-hibákhoz vagy hosszú késéseket okozó processzorterheléshez vezethet. Az alábbiakban a következő metrikákat fogja látni:
- Kiszolgálói terheléselosztás: Minden csomópont cpu-kihasználtsága az elmúlt percben.
- CPU-kihasználtság: A processzor különböző módokon (például felhasználó, rendszer, tétlen és iowait) töltött idő százalékos aránya.
- Memória kihasználtsága: Minden mód (például használt, szabad, pufferelt és gyorsítótárazott) teljes memóriahasználata.
- Memória felcserélése kihasználtsága: A memória felcserélése teljes kihasználtsága.
- Szabad fájlrendszerterület: A fájlrendszerek teljes kihasználtsága az egyes csatlakoztatási pontok szerint.
- Hálózati átviteli sebesség: Az egyes eszközök által a hálózaton keresztül fogadott és továbbított bájtok száma.
- Aktív csomópontok száma: Az aktív csomópontok száma az adott számítás minden időbélyegén.
További információt a Teljesítmény és hardver metrikadiagramokmonitorozása című témakörben talál.
Rendszertáblák
Költségfigyelés
Az Azure Databricks rendszertáblái strukturált megközelítést biztosítanak a feladatok költségeinek és teljesítményének monitorozásához. Ezek a táblázatok a következőket tartalmazzák:
- Feladat végrehajtásának részletei
- Erőforrás-kihasználtság.
- Kapcsolódó költségek.
Ezekkel a táblázatokkal megismerheti a működési állapotot és a pénzügyi hatásokat.
Requirements
Rendszertáblák használata a költségfigyeléshez:
- A fiókadminisztrátornak engedélyeznie kell a
system.lakeflow schema. - A felhasználóknak a következőkre van szükség:
- Legyen egyszerre metaadattár-rendszergazda és fiókadminisztrátor, vagy
- A rendszerséma
USEésSELECTengedélyekkel rendelkezik.
Példa lekérdezés: A legdrágább feladatok (az elmúlt 30 napban)
Ez a lekérdezés az elmúlt 30 nap legdrágább feladatait azonosítja, ami segít a költségelemzésben és az optimalizálásban.
WITH list_cost_per_job AS (
SELECT
t1.workspace_id,
t1.usage_metadata.job_id,
COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
FIRST(identity_metadata.run_as, true) AS run_as,
FIRST(t1.custom_tags, true) AS custom_tags,
MAX(t1.usage_end_time) AS last_seen_date
FROM system.billing.usage t1
INNER JOIN system.billing.list_prices list_prices ON
t1.cloud = list_prices.cloud AND
t1.sku_name = list_prices.sku_name AND
t1.usage_start_time >= list_prices.price_start_time AND
(t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
WHERE
t1.billing_origin_product = "JOBS"
AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
GROUP BY ALL
),
most_recent_jobs AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
FROM
system.lakeflow.jobs QUALIFY rn=1
)
SELECT
t2.name,
t1.job_id,
t1.workspace_id,
t1.runs,
t1.run_as,
SUM(list_cost) AS list_cost,
t1.last_seen_date
FROM list_cost_per_job t1
LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY list_cost DESC
Lakeflow Spark deklaratív csővezetékek
A Lakeflow Spark Deklaratív folyamatok eseménynaplója az összes folyamatesemény átfogó rekordját rögzíti, beleértve a következőket:
- Ellenőrzési naplók
- Adatminőség-ellenőrzések.
- Csővezeték haladása.
- Adatkeletkezési lánc.
Az eseménynapló automatikusan engedélyezve van az összes Lakeflow Spark deklaratív folyamathoz, és a következőn keresztül érhető el:
- Pipeline kezelőfelület: Naplók közvetlen megtekintése.
- Pipelines API: Programozott hozzáférés.
- Közvetlen lekérdezés: Az eseménynapló-tábla lekérdezése.
További információkért tekintse meg a Lakeflow Spark Deklaratív folyamatok eseménynapló-sémáját.
Példa lekérdezések
Ezek a példa-lekérdezések segítenek a folyamatok teljesítményének és állapotának monitorozásában olyan kulcsfontosságú metrikák biztosításával, mint a köteg időtartama, az átviteli sebesség, a háttérnyomás és az erőforrás-használat.
Köteg átlagos időtartama
Ez a lekérdezés kiszámítja a folyamat által feldolgozott kötegek átlagos időtartamát.
SELECT
(max_t - min_t) / batch_count as avg_batch_duration_seconds,
batch_count,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
count(*) as batch_count,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Átlagos átviteli sebesség
Ez a lekérdezés kiszámítja a folyamat átlagos átviteli sebességét a feldolgozott sorok másodpercenkénti száma alapján.
SELECT
(max_t - min_t) / total_rows as avg_throughput_rps,
total_rows,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
sum(
details:flow_progress:metrics:num_output_rows
) as total_rows,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Backpressure
Ez a lekérdezés az adat-hátralék ellenőrzésével méri a folyamat háttérnyomását.
SELECT
timestamp,
DOUBLE(
details:flow_progress:metrics:backlog_bytes
) AS backlog_bytes,
DOUBLE(
details:flow_progress:metrics:backlog_files
) AS backlog_files
FROM
event_log
WHERE
event_type = 'flow_progress'
Fürt és foglalatok kihasználtsága
Ez a lekérdezés betekintést nyújt a csővezeték által használt fürtök vagy helyek kihasználtságába.
SELECT
date_trunc("hour", timestamp) AS hour,
AVG (
DOUBLE (
details:cluster_resources:num_task_slots
)
) AS num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:avg_num_task_slots
)
) AS avg_num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:num_executors
)
) AS num_executors,
AVG (
DOUBLE (
details:cluster_resources:avg_task_slot_utilization
)
) AS avg_utilization,
AVG (
DOUBLE (
details:cluster_resources:avg_num_queued_tasks
)
) AS queue_size
FROM
event_log
WHERE
details : cluster_resources : avg_num_queued_tasks IS NOT NULL
AND origin.update_id = '${latest_update_id}'
GROUP BY
1;
Jobs
A streamelési lekérdezéseket a feladatokban a Streamlekérdezés figyelőjével figyelheti.
Csatlakoztasson egy figyelőt a Spark-munkamenethez az Azure Databricks rendszerében a Streaming Query Listener engedélyezéséhez. Ez a figyelő figyeli a streamelési lekérdezések előrehaladását és metrikáit. Használható metrikák külső monitorozási eszközökre való leküldésére vagy további elemzés céljából történő naplózására.
Példa: Metrikák exportálása külső monitorozási eszközökre
Note
Ez a Databricks Runtime 11.3 LTS-ben és újabb verziókban érhető el Pythonhoz és Scalához.
A streamelési metrikákat külső szolgáltatásokba exportálhatja riasztás vagy irányítópult-készítés céljából az StreamingQueryListener interfész használatával.
Íme egy egyszerű példa a figyelő implementálására:
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: ", event.id)
def onQueryProgress(self, event):
print("Query made progress: ", event.progress)
def onQueryTerminated(self, event):
print("Query terminated: ", event.id)
spark.streams.addListener(MyListener())
Példa: Lekérdezésfigyelő használata az Azure Databricksben
Az alábbiakban egy példa látható egy StreamingQueryListener eseménynaplóra, amely egy Kafka és Delta Lake közötti streamelési lekérdezéshez készült.
{
"id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"timestamp": "2024-05-15T21:57:50.782Z",
"batchId": 0,
"batchDuration": 3601,
"numInputRows": 20,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 5.55401277422938,
"durationMs": {
"addBatch": 1544,
"commitBatch": 686,
"commitOffsets": 27,
"getBatch": 12,
"latestOffset": 577,
"queryPlanning": 105,
"triggerExecution": 3600,
"walCommit": 34
},
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 20,
"numRowsUpdated": 20,
"allUpdatesTimeMs": 473,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 277,
"memoryUsedBytes": 13120,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 5,
"numStateStoreInstances": 20,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"stateOnCurrentVersionSizeBytes": 5280
}
}
],
"sources": [
{
"description": "KafkaV2[Subscribe[topic-1]]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
},
{
"description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
]
}
További példákért lásd: Példák.
Lekérdezési folyamat mérőszámai
A lekérdezések előrehaladási mérőszámai elengedhetetlenek a streamelési lekérdezések teljesítményének és állapotának monitorozásához. Ezek a metrikák tartalmazzák a bemeneti sorok számát, a feldolgozási sebességeket és a lekérdezés végrehajtásához kapcsolódó különböző időtartamokat. Ezeket a metrikákat egy StreamingQueryListener csatolásával a Spark-munkamenethez, figyelhetjük meg. A figyelő az egyes streamelési korszakok végén bocsát ki olyan eseményeket, amelyek tartalmazzák ezeket a metrikákat.
A metrikákat például a figyelő által használt metódusban, a StreamingQueryProgress.observedMetrics térkép használatával érheti el onQueryProgress. Ez lehetővé teszi a streamelési lekérdezések teljesítményének valós idejű nyomon követését és elemzését.
class MyListener(StreamingQueryListener):
def onQueryProgress(self, event):
print("Query made progress: ", event.progress.observedMetrics)