Delen via


Waarneembaarheid in Azure Databricks voor taken, Lakeflow Spark-declaratieve pijplijnen en Lakeflow Connect

Het bewaken van de prestaties, kosten en status van uw streamingtoepassingen is essentieel voor het bouwen van betrouwbare, efficiënte ETL-pijplijnen. Azure Databricks biedt een uitgebreide set observability-functies in Jobs, Lakeflow Spark Declarative Pipelines en Lakeflow Connect om knelpunten te diagnosticeren, prestaties te optimaliseren en het gebruik van resources en kosten te beheren.

In dit artikel vindt u een overzicht van de aanbevolen procedures op de volgende gebieden:

  • Essentiële prestatiemetingen voor streaming
  • Schema's en voorbeeldquery's voor gebeurtenislogboeken
  • Monitoring van streaming query's
  • Waarneembaarheid van kosten met behulp van systeemtabellen
  • Logboeken en metrische gegevens exporteren naar externe hulpprogramma's

Belangrijke statistieken voor streaming-observeerbaarheid

Wanneer u streamingpijplijnen gebruikt, controleert u de volgende belangrijke metrische gegevens:

Metric Purpose
Backpressure Controleert het aantal bestanden en offsetwaarden (groottes). Helpt knelpunten te identificeren en zorgt ervoor dat het systeem binnenkomende gegevens kan verwerken zonder achter te komen.
Throughput Houdt het aantal berichten bij dat per microbatch wordt verwerkt. Evalueer de efficiëntie van de pijplijn en controleer of deze gelijke tred houdt met de gegevensopname.
Duration Meet de gemiddelde duur van een microbatch. Geeft de verwerkingssnelheid aan en helpt batchintervallen af te stemmen.
Latency Geeft aan hoeveel records/berichten in de loop van de tijd worden verwerkt. Helpt bij het begrijpen van end-to-end pijplijnvertragingen en het optimaliseren voor lagere latenties.
Clustergebruik Geeft het CPU- en geheugengebruik weer (%). Zorgt voor efficiënt resourcegebruik en helpt clusters te schalen om te voldoen aan de verwerkingsvereisten.
Network Meet gegevens die worden overgedragen en ontvangen. Handig voor het identificeren van netwerkknelpunten en het verbeteren van de prestaties van gegevensoverdracht.
Checkpoint Identificeert verwerkte gegevens en offsets. Zorgt voor consistentie en maakt fouttolerantie mogelijk tijdens fouten.
Cost Toont de uur-, dagelijkse en maandelijkse kosten van een streamingtoepassing. Hulpmiddelen voor budgettering en resourceoptimalisatie.
Lineage Geeft gegevenssets en lagen weer die zijn gemaakt in de streamingtoepassing. Vereenvoudigt gegevenstransformatie, tracering, kwaliteitscontrole en foutopsporing.

Clusterlogboeken en metrische gegevens

Azure Databricks-clusterlogboeken en metrische gegevens bieden gedetailleerde inzichten in clusterprestaties en -gebruik. Deze logboeken en metrische gegevens bevatten informatie over CPU, geheugen, schijf-I/O, netwerkverkeer en andere metrische systeemgegevens. Het bewaken van deze metrische gegevens is van cruciaal belang voor het optimaliseren van clusterprestaties, het efficiënt beheren van resources en het oplossen van problemen.

Azure Databricks-clusterlogboeken en metrische gegevens bieden gedetailleerde inzichten in clusterprestaties en resourcegebruik. Dit zijn onder andere CPU- en geheugengebruik, schijf-I/O en netwerkverkeer. Het bewaken van deze metrische gegevens is essentieel voor:

  • Clusterprestaties optimaliseren.
  • Resources efficiënt beheren.
  • Operationele problemen oplossen.

De metrische gegevens kunnen worden gebruikt via de Databricks-gebruikersinterface of worden geëxporteerd naar persoonlijke bewakingshulpprogramma's. Zie notebookvoorbeeld: metrische gegevens van Datadog.

Spark-gebruikersinterface

De Spark-gebruikersinterface bevat gedetailleerde informatie over de voortgang van taken en fasen, waaronder het aantal voltooide taken, wachtend en mislukt. Dit helpt u inzicht te hebben in de uitvoeringsstroom en knelpunten te identificeren.

Voor streamingtoepassingen worden op het tabblad Streaming metrische gegevens weergegeven, zoals invoersnelheid, verwerkingssnelheid en batchduur. Hiermee kunt u de prestaties van uw streamingtaken bewaken en eventuele problemen met gegevensopname of verwerking identificeren.

Zie Foutopsporing met de Spark-gebruikersinterface voor meer informatie.

Metrische gegevens berekenen

De metrische rekengegevens helpen u inzicht te krijgen in het clustergebruik. Tijdens het uitvoeren van uw taak kunt u zien hoe de schaal ervan is en hoe uw resources worden beïnvloed. U kunt geheugendruk tegenkomen die kan leiden tot fouten door gebrek aan geheugen, of CPU-druksituaties die lange vertragingen kunnen veroorzaken. Dit zijn de specifieke metrische gegevens die u ziet:

  • Distributie van serverbelasting: het CPU-gebruik van elk knooppunt in de afgelopen minuut.
  • CPU-gebruik: het percentage tijd dat de CPU in verschillende modi heeft besteed (bijvoorbeeld gebruiker, systeem, inactiviteit en iowait).
  • Geheugengebruik: Totaal geheugengebruik per modus (bijvoorbeeld gebruikt, vrij, buffer en cache).
  • Geheugenwisselingsgebruik: Totaal gebruik van geheugenwisselingen.
  • Vrije bestandssysteemruimte: Totaal gebruik van bestandssysteem door elk koppelpunt.
  • Netwerkdoorvoer: het aantal ontvangen en verzonden bytes via het netwerk door elk apparaat.
  • Aantal actieve knooppunten: het aantal actieve knooppunten op elke tijdstempel voor de opgegeven rekenkracht.

Zie Prestaties bewaken en Hardwaremetriekgrafieken voor meer informatie.

Systeemtabellen

Kostenbewaking

Azure Databricks-systeemtabellen bieden een gestructureerde benadering voor het bewaken van de kosten en prestaties van taken. Deze tabellen omvatten:

  • Details van taakuitvoering.
  • Resourcegebruik.
  • Gekoppelde kosten.

Gebruik deze tabellen om inzicht te hebben in de operationele status en de financiële impact.

Requirements

Systeemtabellen gebruiken voor kostenbewaking:

  • Een accountbeheerder moet het system.lakeflow schemainschakelen.
  • Gebruikers moeten één van de volgende dingen doen:
    • Zowel een metastore-beheerder als een accountbeheerder zijn, of
    • Beschikken over USE- en SELECT machtigingen voor de systeemschema's.

Voorbeeldquery: Duurste banen (afgelopen 30 dagen)

Deze query identificeert de duurste taken in de afgelopen 30 dagen, die helpen bij kostenanalyse en optimalisatie.

WITH list_cost_per_job AS (
     SELECT
       t1.workspace_id,
       t1.usage_metadata.job_id,
       COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
       SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
       FIRST(identity_metadata.run_as, true) AS run_as,
       FIRST(t1.custom_tags, true) AS custom_tags,
       MAX(t1.usage_end_time) AS last_seen_date
     FROM system.billing.usage t1
     INNER JOIN system.billing.list_prices list_prices ON
       t1.cloud = list_prices.cloud AND
       t1.sku_name = list_prices.sku_name AND
       t1.usage_start_time >= list_prices.price_start_time AND
       (t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
     WHERE
       t1.billing_origin_product = "JOBS"
       AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
     GROUP BY ALL
   ),
   most_recent_jobs AS (
     SELECT
       *,
       ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
     FROM
       system.lakeflow.jobs QUALIFY rn=1
   )
   SELECT
     t2.name,
     t1.job_id,
     t1.workspace_id,
     t1.runs,
     t1.run_as,
     SUM(list_cost) AS list_cost,
     t1.last_seen_date
   FROM list_cost_per_job t1
   LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
   GROUP BY ALL
   ORDER BY list_cost DESC

Declaratieve Pijplijnen van Lakeflow Spark

In het gebeurtenislogboek van Lakeflow Spark Declarative Pipelines wordt een uitgebreide record vastgelegd van alle pijplijn gebeurtenissen, waaronder:

  • Auditlogboeken.
  • Gegevenskwaliteitscontroles.
  • Voortgang van pijplijn.
  • Gegevensherkomst.

Het gebeurtenislogboek wordt automatisch ingeschakeld voor alle Lakeflow Spark-declaratieve pijplijnen en kan worden geopend via:

  • Pijplijngebruikersinterface: logboeken rechtstreeks weergeven.
  • Pijplijnen-API: Programmatische toegang.
  • Directe query: Voer een query uit op de tabel van het gebeurtenislogboek.

Zie het gebeurtenislogboekschema voor Lakeflow Spark-declaratieve pijplijnen voor meer informatie.

Voorbeeldquery's

Met deze voorbeeldquery's kunt u de prestaties en status van pijplijnen bewaken door belangrijke metrische gegevens op te geven, zoals batchduur, doorvoer, backpressure en resourcegebruik.

Gemiddelde batchduur

Deze query berekent de gemiddelde duur van batches die door de pijplijn worden verwerkt.

SELECT
  (max_t - min_t) / batch_count as avg_batch_duration_seconds,
  batch_count,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      count(*) as batch_count,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Gemiddelde doorvoer

Met deze query wordt de gemiddelde doorvoer van de pijplijn berekend in termen van verwerkte rijen per seconde.

SELECT
  (max_t - min_t) / total_rows as avg_throughput_rps,
  total_rows,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      sum(
        details:flow_progress:metrics:num_output_rows
      ) as total_rows,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Backpressure

Met deze query wordt de backpressure van de pijplijn berekend door de gegevensachterstand te controleren.

SELECT
  timestamp,
  DOUBLE(
    details:flow_progress:metrics:backlog_bytes
  ) AS backlog_bytes,
  DOUBLE(
    details:flow_progress:metrics:backlog_files
  ) AS backlog_files
FROM
  event_log
WHERE
  event_type = 'flow_progress'

Cluster- en slotsgebruik

Deze query biedt inzicht in het gebruik van clusters of slots die door de pijplijn worden gebruikt.

SELECT
  date_trunc("hour", timestamp) AS hour,
  AVG (
    DOUBLE (
      details:cluster_resources:num_task_slots
    )
  ) AS num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_task_slots
    )
  ) AS avg_num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:num_executors
    )
  ) AS num_executors,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_task_slot_utilization
    )
  ) AS avg_utilization,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_queued_tasks
    )
  ) AS queue_size
FROM
  event_log
WHERE
  details : cluster_resources : avg_num_queued_tasks IS NOT NULL
  AND origin.update_id = '${latest_update_id}'
GROUP BY
  1;

Jobs

U kunt streamingqueries in taken bewaken via de Streaming Query Luisteraar.

Koppel een listener aan de Spark-sessie om de streamingquerylister inAzure Databricks in te schakelen. Deze listener bewaakt de voortgang en metrische gegevens van uw streamingquery's. Het kan worden gebruikt om metrische gegevens naar externe bewakingshulpprogramma's te pushen of om ze te registreren voor verdere analyse.

Voorbeeld: Metrische gegevens exporteren naar externe bewakingshulpprogramma's

Note

Dit is beschikbaar in Databricks Runtime 11.3 LTS en hoger voor Python en Scala.

U kunt streaminggegevens exporteren naar externe services voor waarschuwingen of dashboards met behulp van de StreamingQueryListener interface.

Hier volgt een eenvoudig voorbeeld van het implementeren van een listener:

from pyspark.sql.streaming import StreamingQueryListener

class MyListener(StreamingQueryListener):
   def onQueryStarted(self, event):
       print("Query started: ", event.id)

   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress)

   def onQueryTerminated(self, event):
       print("Query terminated: ", event.id)

spark.streams.addListener(MyListener())

Voorbeeld: Querylistener gebruiken in Azure Databricks

Hieronder ziet u een voorbeeld van een StreamingQueryListener-gebeurtenislogboek voor een Kafka naar Delta Lake-streamingquery:

{
  "id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
  "runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
  "timestamp": "2024-05-15T21:57:50.782Z",
  "batchId": 0,
  "batchDuration": 3601,
  "numInputRows": 20,
  "inputRowsPerSecond": 0.0,
  "processedRowsPerSecond": 5.55401277422938,
  "durationMs": {
    "addBatch": 1544,
    "commitBatch": 686,
    "commitOffsets": 27,
    "getBatch": 12,
    "latestOffset": 577,
    "queryPlanning": 105,
    "triggerExecution": 3600,
    "walCommit": 34
  },
  "stateOperators": [
    {
      "operatorName": "symmetricHashJoin",
      "numRowsTotal": 20,
      "numRowsUpdated": 20,
      "allUpdatesTimeMs": 473,
      "numRowsRemoved": 0,
      "allRemovalsTimeMs": 0,
      "commitTimeMs": 277,
      "memoryUsedBytes": 13120,
      "numRowsDroppedByWatermark": 0,
      "numShufflePartitions": 5,
      "numStateStoreInstances": 20,
      "customMetrics": {
        "loadedMapCacheHitCount": 0,
        "loadedMapCacheMissCount": 0,
        "stateOnCurrentVersionSizeBytes": 5280
      }
    }
  ],
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic-1]]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "avgOffsetsBehindLatest": "0.0",
        "estimatedTotalBytesBehindLatest": "0.0",
        "maxOffsetsBehindLatest": "0",
        "minOffsetsBehindLatest": "0"
      }
    },
    {
      "description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "numBytesOutstanding": "0",
        "numFilesOutstanding": "0"
      }
    }
  ]
}

Zie voor meer voorbeelden: Voorbeelden.

Metrische gegevens over voortgang van query's

Metrische gegevens over de voortgang van query's zijn essentieel voor het bewaken van de prestaties en status van uw streamingquery's. Deze metrische gegevens omvatten het aantal invoerrijen, verwerkingssnelheden en verschillende duur met betrekking tot de uitvoering van de query. U kunt deze metrische gegevens bekijken door een StreamingQueryListener aan de Spark-sessie toe te voegen. De listener verzendt gebeurtenissen die deze metrische gegevens bevatten aan het einde van elk streaming-epoch.

U kunt bijvoorbeeld metrische gegevens openen met behulp van de StreamingQueryProgress.observedMetrics kaart in de methode van de onQueryProgress listener. Hiermee kunt u de prestaties van uw streamingquery's in realtime bijhouden en analyseren.

class MyListener(StreamingQueryListener):
   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress.observedMetrics)