Megosztás:


Az Azure Databricksben a feladatok, a Lakeflow Spark Deklaratív folyamatok és a Lakeflow Connect megfigyelhetősége

A streamelési alkalmazások teljesítményének, költségeinek és állapotának monitorozása elengedhetetlen a megbízható és hatékony ETL-folyamatok létrehozásához. Az Azure Databricks a feladatok, a Lakeflow Spark deklaratív folyamatok és a Lakeflow Connect számos megfigyelhetőségi funkcióját biztosítja a szűk keresztmetszetek diagnosztizálásához, a teljesítmény optimalizálásához és az erőforrás-használat és a költségek kezeléséhez.

Ez a cikk a következő területeken ismerteti az ajánlott eljárásokat:

  • Főbb streamelési teljesítménymetrikák
  • Eseménynapló-sémák és példa-lekérdezések
  • Streamelési lekérdezés monitorozása
  • Költségmegfigyelés rendszertáblák használatával
  • Naplók és metrikák exportálása külső eszközökre

A streamelési megfigyelhetőség fő mérőszámai

Streamelési folyamatok futtatásakor figyelje a következő főbb metrikákat:

Metric Purpose
Backpressure A fájlok és méretek (offsetek) számát figyeli. Segít azonosítani a szűk keresztmetszeteket, és biztosítja, hogy a rendszer anélkül tudja kezelni a bejövő adatokat, hogy lemarad.
Throughput Nyomon követi a mikro kötegenként feldolgozott üzenetek számát. Értékelje a folyamat hatékonyságát, és ellenőrizze, hogy lépést tart-e az adatbetöltéssel.
Duration Egy mikroköteg átlagos időtartamát méri. A feldolgozási sebességet jelzi, és segít a kötegek időközeinek finomhangolásában.
Latency Azt jelzi, hogy a rendszer hány rekordot/üzenetet dolgoz fel az idő függvényében. Segít megérteni a végpontok közötti folyamatkéséseket, és optimalizálni az alacsonyabb késésekre.
Fürt kihasználtsága A processzor- és memóriahasználatot (%) tükrözi. Biztosítja a hatékony erőforrás-használatot, és segít a fürtök méretezésében a feldolgozási igényeknek megfelelően.
Network Az átvitt és fogadott adatokat méri. Hasznos a hálózati szűk keresztmetszetek azonosításához és az adatátvitel teljesítményének javításához.
Checkpoint Azonosítja a feldolgozott adatokat és az eltolásokat. Biztosítja a konzisztenciát, és lehetővé teszi a hibatűrést a hibák során.
Cost Egy streamelési alkalmazás óránkénti, napi és havi költségeit jeleníti meg. A költségvetés és az erőforrás-optimalizálás segédeszköze.
Lineage Megjeleníti a streamelési alkalmazásban létrehozott adathalmazokat és rétegeket. Elősegíti az adatok átalakítását, nyomon követését, minőségbiztosítását és hibakeresését.

Fürtnaplók és metrikák

Az Azure Databricks fürt naplók és mutatók részletes betekintést nyújtanak a fürt teljesítményébe és kihasználtságába. Ezek a naplók és metrikák a cpu-ról, a memóriáról, a lemez I/O-járól, a hálózati forgalomról és más rendszermetrikákról tartalmaznak információkat. Ezeknek a metrikáknak a monitorozása elengedhetetlen a fürt teljesítményének optimalizálásához, az erőforrások hatékony kezeléséhez és a problémák elhárításához.

Az Azure Databricks-fürtnaplók és -metrikák részletes betekintést nyújtanak a fürt teljesítményébe és erőforrás-kihasználtságába. Ezek közé tartozik a processzor- és memóriahasználat, a lemez I/O-ja és a hálózati forgalom. Az alábbi metrikák monitorozása kritikus fontosságú a következőkhöz:

  • A klaszter teljesítményének optimalizálása.
  • Erőforrások hatékony kezelése.
  • Működési problémák elhárítása.

A metrikák a Databricks felhasználói felületén keresztül használhatók, vagy exportálhatók személyes monitorozási eszközökre. Lásd a jegyzetfüzet példáját: Datadog-metrikák.

Spark felhasználói felület

A Spark felhasználói felülete részletes információkat jelenít meg a feladatok és szakaszok előrehaladásáról, beleértve a befejezett, függőben lévő és sikertelen tevékenységek számát. Ez segít megérteni a végrehajtási folyamatot, és azonosítani a szűk keresztmetszeteket.

Streamelési alkalmazások esetén a Stream lap olyan metrikákat jelenít meg, mint a bemeneti sebesség, a feldolgozási sebesség és a köteg időtartama. Segít monitorozni a streamelési feladatok teljesítményét, és azonosítani az adatbetöltési vagy feldolgozási problémákat.

További információ : Hibakeresés a Spark felhasználói felületén .

Számítási metrikák

A számítási metrikák segítenek megérteni a fürt kihasználtságát. A feladat futtatásakor láthatja, hogyan skálázódik, és hogyan érinti az erőforrásokat. Megtalálhatja a memóriaterhelést, amely OOM-hibákhoz vagy hosszú késéseket okozó processzorterheléshez vezethet. Az alábbiakban a következő metrikákat fogja látni:

  • Kiszolgálói terheléselosztás: Minden csomópont cpu-kihasználtsága az elmúlt percben.
  • CPU-kihasználtság: A processzor különböző módokon (például felhasználó, rendszer, tétlen és iowait) töltött idő százalékos aránya.
  • Memória kihasználtsága: Minden mód (például használt, szabad, pufferelt és gyorsítótárazott) teljes memóriahasználata.
  • Memória felcserélése kihasználtsága: A memória felcserélése teljes kihasználtsága.
  • Szabad fájlrendszerterület: A fájlrendszerek teljes kihasználtsága az egyes csatlakoztatási pontok szerint.
  • Hálózati átviteli sebesség: Az egyes eszközök által a hálózaton keresztül fogadott és továbbított bájtok száma.
  • Aktív csomópontok száma: Az aktív csomópontok száma az adott számítás minden időbélyegén.

További információt a Teljesítmény és hardver metrikadiagramokmonitorozása című témakörben talál.

Rendszertáblák

Költségfigyelés

Az Azure Databricks rendszertáblái strukturált megközelítést biztosítanak a feladatok költségeinek és teljesítményének monitorozásához. Ezek a táblázatok a következőket tartalmazzák:

  • Feladat végrehajtásának részletei
  • Erőforrás-kihasználtság.
  • Kapcsolódó költségek.

Ezekkel a táblázatokkal megismerheti a működési állapotot és a pénzügyi hatásokat.

Requirements

Rendszertáblák használata a költségfigyeléshez:

  • A fiókadminisztrátornak engedélyeznie kell a system.lakeflow schema.
  • A felhasználóknak a következőkre van szükség:
    • Legyen egyszerre metaadattár-rendszergazda és fiókadminisztrátor, vagy
    • A rendszerséma USE és SELECT engedélyekkel rendelkezik.

Példa lekérdezés: A legdrágább feladatok (az elmúlt 30 napban)

Ez a lekérdezés az elmúlt 30 nap legdrágább feladatait azonosítja, ami segít a költségelemzésben és az optimalizálásban.

WITH list_cost_per_job AS (
     SELECT
       t1.workspace_id,
       t1.usage_metadata.job_id,
       COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
       SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
       FIRST(identity_metadata.run_as, true) AS run_as,
       FIRST(t1.custom_tags, true) AS custom_tags,
       MAX(t1.usage_end_time) AS last_seen_date
     FROM system.billing.usage t1
     INNER JOIN system.billing.list_prices list_prices ON
       t1.cloud = list_prices.cloud AND
       t1.sku_name = list_prices.sku_name AND
       t1.usage_start_time >= list_prices.price_start_time AND
       (t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
     WHERE
       t1.billing_origin_product = "JOBS"
       AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
     GROUP BY ALL
   ),
   most_recent_jobs AS (
     SELECT
       *,
       ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
     FROM
       system.lakeflow.jobs QUALIFY rn=1
   )
   SELECT
     t2.name,
     t1.job_id,
     t1.workspace_id,
     t1.runs,
     t1.run_as,
     SUM(list_cost) AS list_cost,
     t1.last_seen_date
   FROM list_cost_per_job t1
   LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
   GROUP BY ALL
   ORDER BY list_cost DESC

Lakeflow Spark deklaratív csővezetékek

A Lakeflow Spark Deklaratív folyamatok eseménynaplója az összes folyamatesemény átfogó rekordját rögzíti, beleértve a következőket:

  • Ellenőrzési naplók
  • Adatminőség-ellenőrzések.
  • Csővezeték haladása.
  • Adatkeletkezési lánc.

Az eseménynapló automatikusan engedélyezve van az összes Lakeflow Spark deklaratív folyamathoz, és a következőn keresztül érhető el:

  • Pipeline kezelőfelület: Naplók közvetlen megtekintése.
  • Pipelines API: Programozott hozzáférés.
  • Közvetlen lekérdezés: Az eseménynapló-tábla lekérdezése.

További információkért tekintse meg a Lakeflow Spark Deklaratív folyamatok eseménynapló-sémáját.

Példa lekérdezések

Ezek a példa-lekérdezések segítenek a folyamatok teljesítményének és állapotának monitorozásában olyan kulcsfontosságú metrikák biztosításával, mint a köteg időtartama, az átviteli sebesség, a háttérnyomás és az erőforrás-használat.

Köteg átlagos időtartama

Ez a lekérdezés kiszámítja a folyamat által feldolgozott kötegek átlagos időtartamát.

SELECT
  (max_t - min_t) / batch_count as avg_batch_duration_seconds,
  batch_count,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      count(*) as batch_count,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Átlagos átviteli sebesség

Ez a lekérdezés kiszámítja a folyamat átlagos átviteli sebességét a feldolgozott sorok másodpercenkénti száma alapján.

SELECT
  (max_t - min_t) / total_rows as avg_throughput_rps,
  total_rows,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      sum(
        details:flow_progress:metrics:num_output_rows
      ) as total_rows,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Backpressure

Ez a lekérdezés az adat-hátralék ellenőrzésével méri a folyamat háttérnyomását.

SELECT
  timestamp,
  DOUBLE(
    details:flow_progress:metrics:backlog_bytes
  ) AS backlog_bytes,
  DOUBLE(
    details:flow_progress:metrics:backlog_files
  ) AS backlog_files
FROM
  event_log
WHERE
  event_type = 'flow_progress'

Fürt és foglalatok kihasználtsága

Ez a lekérdezés betekintést nyújt a csővezeték által használt fürtök vagy helyek kihasználtságába.

SELECT
  date_trunc("hour", timestamp) AS hour,
  AVG (
    DOUBLE (
      details:cluster_resources:num_task_slots
    )
  ) AS num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_task_slots
    )
  ) AS avg_num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:num_executors
    )
  ) AS num_executors,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_task_slot_utilization
    )
  ) AS avg_utilization,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_queued_tasks
    )
  ) AS queue_size
FROM
  event_log
WHERE
  details : cluster_resources : avg_num_queued_tasks IS NOT NULL
  AND origin.update_id = '${latest_update_id}'
GROUP BY
  1;

Jobs

A streamelési lekérdezéseket a feladatokban a Streamlekérdezés figyelőjével figyelheti.

Csatlakoztasson egy figyelőt a Spark-munkamenethez az Azure Databricks rendszerében a Streaming Query Listener engedélyezéséhez. Ez a figyelő figyeli a streamelési lekérdezések előrehaladását és metrikáit. Használható metrikák külső monitorozási eszközökre való leküldésére vagy további elemzés céljából történő naplózására.

Példa: Metrikák exportálása külső monitorozási eszközökre

Note

Ez a Databricks Runtime 11.3 LTS-ben és újabb verziókban érhető el Pythonhoz és Scalához.

A streamelési metrikákat külső szolgáltatásokba exportálhatja riasztás vagy irányítópult-készítés céljából az StreamingQueryListener interfész használatával.

Íme egy egyszerű példa a figyelő implementálására:

from pyspark.sql.streaming import StreamingQueryListener

class MyListener(StreamingQueryListener):
   def onQueryStarted(self, event):
       print("Query started: ", event.id)

   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress)

   def onQueryTerminated(self, event):
       print("Query terminated: ", event.id)

spark.streams.addListener(MyListener())

Példa: Lekérdezésfigyelő használata az Azure Databricksben

Az alábbiakban egy példa látható egy StreamingQueryListener eseménynaplóra, amely egy Kafka és Delta Lake közötti streamelési lekérdezéshez készült.

{
  "id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
  "runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
  "timestamp": "2024-05-15T21:57:50.782Z",
  "batchId": 0,
  "batchDuration": 3601,
  "numInputRows": 20,
  "inputRowsPerSecond": 0.0,
  "processedRowsPerSecond": 5.55401277422938,
  "durationMs": {
    "addBatch": 1544,
    "commitBatch": 686,
    "commitOffsets": 27,
    "getBatch": 12,
    "latestOffset": 577,
    "queryPlanning": 105,
    "triggerExecution": 3600,
    "walCommit": 34
  },
  "stateOperators": [
    {
      "operatorName": "symmetricHashJoin",
      "numRowsTotal": 20,
      "numRowsUpdated": 20,
      "allUpdatesTimeMs": 473,
      "numRowsRemoved": 0,
      "allRemovalsTimeMs": 0,
      "commitTimeMs": 277,
      "memoryUsedBytes": 13120,
      "numRowsDroppedByWatermark": 0,
      "numShufflePartitions": 5,
      "numStateStoreInstances": 20,
      "customMetrics": {
        "loadedMapCacheHitCount": 0,
        "loadedMapCacheMissCount": 0,
        "stateOnCurrentVersionSizeBytes": 5280
      }
    }
  ],
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic-1]]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "avgOffsetsBehindLatest": "0.0",
        "estimatedTotalBytesBehindLatest": "0.0",
        "maxOffsetsBehindLatest": "0",
        "minOffsetsBehindLatest": "0"
      }
    },
    {
      "description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "numBytesOutstanding": "0",
        "numFilesOutstanding": "0"
      }
    }
  ]
}

További példákért lásd: Példák.

Lekérdezési folyamat mérőszámai

A lekérdezések előrehaladási mérőszámai elengedhetetlenek a streamelési lekérdezések teljesítményének és állapotának monitorozásához. Ezek a metrikák tartalmazzák a bemeneti sorok számát, a feldolgozási sebességeket és a lekérdezés végrehajtásához kapcsolódó különböző időtartamokat. Ezeket a metrikákat egy StreamingQueryListener csatolásával a Spark-munkamenethez, figyelhetjük meg. A figyelő az egyes streamelési korszakok végén bocsát ki olyan eseményeket, amelyek tartalmazzák ezeket a metrikákat.

A metrikákat például a figyelő által használt metódusban, a StreamingQueryProgress.observedMetrics térkép használatával érheti el onQueryProgress. Ez lehetővé teszi a streamelési lekérdezések teljesítményének valós idejű nyomon követését és elemzését.

class MyListener(StreamingQueryListener):
   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress.observedMetrics)