Övervaka Delta Live Tables-pipelines

Den här artikeln beskriver hur du kan använda inbyggda funktioner i Delta Live Tables för övervakning och observerbarhet för pipelines, inklusive data härkomst, uppdateringshistorik och rapportering av datakvalitet.

Du kan granska de flesta övervakningsdata manuellt via användargränssnittet för pipelineinformation. Vissa uppgifter är enklare att utföra genom att köra frågor mot händelseloggens metadata. Se Vad är händelseloggen Delta Live Tables?.

Vilken pipelineinformation är tillgänglig i användargränssnittet?

Pipelinediagrammet visas så snart en uppdatering av en pipeline har startats. Pilar representerar beroenden mellan datauppsättningar i pipelinen. Som standard visar pipelineinformationssidan den senaste uppdateringen för tabellen, men du kan välja äldre uppdateringar från en nedrullningsbara meny.

Information som visas är pipeline-ID, källbibliotek, beräkningskostnad, produktversion och kanalen som konfigurerats för pipelinen.

Om du vill se en tabellvy över datauppsättningar klickar du på fliken Lista . Med listvyn kan du se alla datauppsättningar i pipelinen som representeras som en rad i en tabell och är användbart när din pipeline DAG är för stor för att visualisera i graph-vyn . Du kan styra de datauppsättningar som visas i tabellen med hjälp av flera filter, till exempel datauppsättningens namn, typ och status. Om du vill växla tillbaka till DAG-visualiseringen klickar du på Graph.

Kör som-användaren är pipelineägaren och pipelineuppdateringar körs med den här användarens behörigheter. Om du vill ändra run as användaren klickar du på Behörigheter och ändrar pipelineägaren.

Hur kan du visa datamängdsinformation?

Om du klickar på en datauppsättning i pipelinediagrammet eller datamängdslistan visas information om datamängden. Information omfattar datauppsättningsschemat, datakvalitetsmått och en länk tillbaka till källkoden som definierar datauppsättningen.

Visa uppdateringshistorik

Om du vill visa historik och status för pipelineuppdateringar klickar du på den nedrullningsbara menyn uppdateringshistorik i det övre fältet.

Om du vill visa grafen, informationen och händelserna för en uppdatering väljer du uppdateringen i den nedrullningsbara menyn. Om du vill återgå till den senaste uppdateringen klickar du på Visa den senaste uppdateringen.

Få meddelanden om pipelinehändelser

Om du vill ta emot meddelanden i realtid för pipelinehändelser, till exempel slutförande av en pipelineuppdatering eller fel vid en pipelineuppdatering, lägger du till Lägg till e-postaviseringar för pipelinehändelser när du skapar eller redigerar en pipeline.

Vad är händelseloggen Delta Live Tables?

Delta Live Tables-händelseloggen innehåller all information som rör en pipeline, inklusive granskningsloggar, datakvalitetskontroller, pipeline-förlopp och data härkomst. Du kan använda händelseloggen för att spåra, förstå och övervaka tillståndet för dina datapipelines.

Du kan visa händelseloggposter i Delta Live Tables-användargränssnittet, Delta Live Tables-API:et eller genom att fråga händelseloggen direkt. Den här artikeln fokuserar på att fråga händelseloggen direkt.

Du kan också definiera anpassade åtgärder som ska köras när händelser loggas, till exempel att skicka aviseringar med händelsekrokar.

Schema för händelselogg

I följande tabell beskrivs schemat för händelseloggen. Vissa av dessa fält innehåller JSON-data som kräver parsning för att utföra vissa frågor, till exempel fältet details . Azure Databricks stöder operatorn : för att parsa JSON-fält. Se operatorn : (kolontecken).

Fält beskrivning
id En unik identifierare för händelseloggposten.
sequence Ett JSON-dokument som innehåller metadata för att identifiera och beställa händelser.
origin Ett JSON-dokument som innehåller metadata för händelsens ursprung, till exempel molnleverantören, molnleverantörens region, user_id, pipeline_ideller pipeline_type för att visa var pipelinen skapades, antingen DBSQL eller WORKSPACE.
timestamp Den tid då händelsen spelades in.
message Ett läsbart meddelande som beskriver händelsen.
level Händelsetypen, till exempel INFO, WARN, ERROReller METRICS.
error Om ett fel har uppstått finns information som beskriver felet.
details Ett JSON-dokument som innehåller strukturerad information om händelsen. Det här är det primära fältet som används för att analysera händelser.
event_type Händelsetypen.
maturity_level Händelseschemats stabilitet. Möjliga värden är:

* STABLE: Schemat är stabilt och ändras inte.
* NULL: Schemat är stabilt och ändras inte. Värdet kan vara NULL om posten skapades innan fältet maturity_level lades till (version 2022.37).
* EVOLVING: Schemat är inte stabilt och kan ändras.
* DEPRECATED: Schemat är inaktuellt och Delta Live Tables-körningen kan sluta producera den här händelsen när som helst.

Köra frågor mot händelseloggen

Platsen för händelseloggen och gränssnittet för att fråga händelseloggen beror på om din pipeline är konfigurerad att använda Hive-metaarkivet eller Unity Catalog.

Hive-metaarkiv

Om din pipeline publicerar tabeller till Hive-metaarkivet lagras händelseloggen på /system/events platsen storage . Om du till exempel har konfigurerat pipelineinställningen storage som /Users/username/datalagras händelseloggen /Users/username/data/system/events i sökvägen i DBFS.

Om du inte har konfigurerat inställningen finns /pipelines/<pipeline-id>/system/events standardplatsen för händelseloggen storage i DBFS. Om till exempel ID:t för din pipeline är 91de5e48-35ed-11ec-8d3d-0242ac130003är /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/eventslagringsplatsen .

Du kan skapa en vy för att förenkla frågor mot händelseloggen. I följande exempel skapas en tillfällig vy med namnet event_log_raw. Den här vyn används i exempelfrågorna i händelseloggen som ingår i den här artikeln:

CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;

Ersätt <event-log-path> med platsen för händelseloggen.

Varje instans av en pipelinekörning kallas för en uppdatering. Du vill ofta extrahera information för den senaste uppdateringen. Kör följande fråga för att hitta identifieraren för den senaste uppdateringen och spara den i den latest_update_id tillfälliga vyn. Den här vyn används i exempelfrågorna i händelseloggen som ingår i den här artikeln:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Du kan köra frågor mot händelseloggen i en Azure Databricks-notebook-fil eller SQL-redigeraren. Använd en notebook-fil eller SQL-redigeraren för att köra exempelfrågorna i händelseloggen.

Unity-katalog

Om din pipeline publicerar tabeller till Unity Catalog måste du använda event_logfunktionen tabellvärde (TVF) för att hämta händelseloggen för pipelinen. Du hämtar händelseloggen för en pipeline genom att skicka pipeline-ID:t eller ett tabellnamn till TVF:n. Om du till exempel vill hämta händelseloggposterna för pipelinen med ID 04c78631-3dd7-4856-b2a6-7d84e9b2638b:

SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")

Så här hämtar du händelseloggposterna för pipelinen som skapade eller äger tabellen my_catalog.my_schema.table1:

SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))

Om du vill anropa TVF måste du använda ett delat kluster eller ett SQL-lager. Du kan till exempel använda en notebook-fil som är kopplad till ett delat kluster eller använda SQL-redigeraren som är ansluten till ett SQL-lager.

För att förenkla frågehändelser för en pipeline kan pipelinens ägare skapa en vy över TVF:n event_log . I följande exempel skapas en vy över händelseloggen för en pipeline. Den här vyn används i exempelfrågorna i händelseloggen som ingår i den här artikeln.

Kommentar

event_log TVF:n kan bara anropas av pipelineägaren och en vy som skapats via event_log TVF:n kan endast efterfrågas av pipelineägaren. Det går inte att dela vyn med andra användare.

CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");

Ersätt <pipeline-ID> med den unika identifieraren för Delta Live Tables-pipelinen. Du hittar ID:t i panelen Pipelineinformation i Delta Live Tables-användargränssnittet.

Varje instans av en pipelinekörning kallas för en uppdatering. Du vill ofta extrahera information för den senaste uppdateringen. Kör följande fråga för att hitta identifieraren för den senaste uppdateringen och spara den i den latest_update_id tillfälliga vyn. Den här vyn används i exempelfrågorna i händelseloggen som ingår i den här artikeln:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Fråga efter ursprungsinformation från händelseloggen

Händelser som innehåller information om ursprung har händelsetypen flow_definition. Objektet details:flow_definition innehåller output_dataset och input_datasets definierar varje relation i diagrammet.

Du kan använda följande fråga för att extrahera indata- och utdatauppsättningarna för att se ursprungsinformation:

SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'flow_definition'
  AND
  origin.update_id = latest_update.id
output_dataset input_datasets
1 customers null
2 sales_orders_raw null
3 sales_orders_cleaned ["customers", "sales_orders_raw"]
4 sales_order_in_la ["sales_orders_cleaned"]

Fråga efter datakvalitet från händelseloggen

Om du definierar förväntningar på datauppsättningar i din pipeline lagras datakvalitetsmåtten details:flow_progress.data_quality.expectations i objektet. Händelser som innehåller information om datakvalitet har händelsetypen flow_progress. I följande exempel frågar vi datakvalitetsmåtten för den senaste pipelineuppdateringen:

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name
dataset expectation passing_records failing_records
1 sales_orders_cleaned valid_order_number 4083 0

Övervaka kvarvarande data genom att fråga händelseloggen

Delta Live Tables spårar hur mycket data som finns i kvarvarande uppgifter i details:flow_progress.metrics.backlog_bytes objektet. Händelser som innehåller mått för kvarvarande uppgifter har händelsetypen flow_progress. I följande exempel frågar du eftersläpningsmått för den senaste pipelineuppdateringen:

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id

Kommentar

Måtten för kvarvarande uppgifter kanske inte är tillgängliga beroende på pipelinens datakällatyp och Databricks Runtime-version.

Övervaka förbättrade autoskalningshändelser från händelseloggen

Händelseloggen samlar in klusterstorlekar när utökad autoskalning är aktiverat i dina pipelines. Händelser som innehåller information om förbättrad autoskalning har händelsetypen autoscale. Informationen om att ändra storlek på begäran lagras i details:autoscale objektet. I följande exempel efterfrågas att utökat autoskalningskluster ändrar storlek på begäranden för den senaste pipelineuppdateringen:

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

Övervaka beräkningsresursanvändning

cluster_resources händelser ger mått på antalet aktivitetsfack i klustret, hur mycket dessa aktivitetsfack används och hur många aktiviteter som väntar på att schemaläggas.

När förbättrad autoskalning är aktiverad cluster_resources innehåller händelser även mått för algoritmen för automatisk skalning, inklusive latest_requested_num_executors, och optimal_num_executors. Händelserna visar också status för algoritmen som olika tillstånd, till exempel CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSoch BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. Den här informationen kan visas tillsammans med autoskalningshändelserna för att ge en övergripande bild av förbättrad autoskalning.

I följande exempel efterfrågas historiken för aktivitetsköstorleken för den senaste pipelineuppdateringen:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

I följande exempel efterfrågas användningshistoriken för den senaste pipelineuppdateringen:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

I följande exempel efterfrågas historiken för antalet exekutorer, tillsammans med mått som endast är tillgängliga för pipelines för utökad autoskalning, inklusive antalet exekutorer som begärdes av algoritmen i den senaste begäran, det optimala antalet köre som rekommenderas av algoritmen baserat på de senaste måtten och algoritmtillståndet för autoskalning:

SELECT
  timestamp,
  Double(details :cluster_resources.num_executors) as current_executors,
  Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

Granska Delta Live Tables-pipelines

Du kan använda Delta Live Tables-händelseloggposter och andra Azure Databricks-granskningsloggar för att få en fullständig bild av hur data uppdateras i Delta Live Tables.

Delta Live Tables använder autentiseringsuppgifterna för pipelineägaren för att köra uppdateringar. Du kan ändra de autentiseringsuppgifter som används genom att uppdatera pipelineägaren. Delta Live Tables registrerar användaren för åtgärder på pipelinen, inklusive att skapa pipelinen, redigera till konfiguration och utlösa uppdateringar.

Se Unity Catalog-händelser för en referens till Unity Catalog-granskningshändelser.

Fråga användaråtgärder i händelseloggen

Du kan använda händelseloggen för att granska händelser, till exempel användaråtgärder. Händelser som innehåller information om användaråtgärder har händelsetypen user_action.

Information om åtgärden lagras i user_action objektet i fältet details . Använd följande fråga för att skapa en granskningslogg med användarhändelser. Information om hur event_log_raw du skapar vyn som används i den här frågan finns i Fråga efter händelseloggen.

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
1 2021-05-20T19:36:03.517+0000 START user@company.com
2 2021-05-20T19:35:59.913+0000 CREATE user@company.com
3 2021-05-27T00:35:51.971+0000 START user@company.com

Körningsinformation

Du kan visa körningsinformation för en pipelineuppdatering, till exempel Databricks Runtime-versionen för uppdateringen:

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
1 11,0