Monitorare le pipeline Delta Live Tables

Questo articolo descrive come usare funzionalità predefinite di monitoraggio e osservabilità per le pipeline di tabelle live Delta, tra cui derivazione dei dati, cronologia degli aggiornamenti e report sulla qualità dei dati.

È possibile esaminare manualmente la maggior parte dei dati di monitoraggio tramite l'interfaccia utente dei dettagli della pipeline. Alcune attività sono più facili da eseguire eseguendo query sui metadati del registro eventi. Vedere Che cos'è il registro eventi di Delta Live Tables?.

Quali dettagli della pipeline sono disponibili nell'interfaccia utente?

Il grafico della pipeline viene visualizzato non appena un aggiornamento a una pipeline è stato avviato correttamente. Le frecce rappresentano le dipendenze tra set di dati nella pipeline. Per impostazione predefinita, la pagina dei dettagli della pipeline mostra l'aggiornamento più recente per la tabella, ma è possibile selezionare gli aggiornamenti meno recenti da un menu a discesa.

I dettagli visualizzati includono l'ID pipeline, le librerie di origine, il costo di calcolo, l'edizione del prodotto e il canale configurato per la pipeline.

Per visualizzare una visualizzazione tabulare dei set di dati, fare clic sulla scheda Elenco . La visualizzazione Elenco consente di visualizzare tutti i set di dati nella pipeline rappresentati come riga in una tabella ed è utile quando il DAG della pipeline è troppo grande per visualizzare nella visualizzazione Grafico . È possibile controllare i set di dati visualizzati nella tabella usando più filtri, ad esempio nome del set di dati, tipo e stato. Per tornare alla visualizzazione DAG, fare clic su Grafico.

L'utente Esegui come utente è il proprietario della pipeline e gli aggiornamenti della pipeline vengono eseguiti con le autorizzazioni di questo utente. Per modificare l'utente run as , fare clic su Autorizzazioni e modificare il proprietario della pipeline.

Come è possibile visualizzare i dettagli del set di dati?

Facendo clic su un set di dati nell'elenco del grafico o del set di dati della pipeline vengono visualizzati i dettagli sul set di dati. I dettagli includono lo schema del set di dati, le metriche sulla qualità dei dati e un collegamento al codice sorgente che definisce il set di dati.

Visualizzare la cronologia degli aggiornamenti

Per visualizzare la cronologia e lo stato degli aggiornamenti della pipeline, fare clic sul menu a discesa Cronologia aggiornamenti nella barra superiore.

Per visualizzare il grafico, i dettagli e gli eventi per un aggiornamento, selezionare l'aggiornamento nel menu a discesa. Per tornare all'aggiornamento più recente, fare clic su Mostra l'aggiornamento più recente.

Ottenere notifiche per gli eventi della pipeline

Per ricevere notifiche in tempo reale per gli eventi delle pipeline, ad esempio il completamento corretto di un aggiornamento delle pipeline o un errore di un aggiornamento delle pipeline, selezionare Aggiungi notifiche tramite posta elettronica per gli eventi della pipeline quando si crea o si modifica una pipeline.

Che cos'è il registro eventi di Delta Live Tables?

Il registro eventi di DLT (Delta Live Table) contiene tutte le informazioni correlate a una pipeline, tra cui log di audit, controlli della qualità dei dati, stato della pipeline e derivazione dei dati. È possibile usare il registro eventi per tenere traccia, comprendere e monitorare lo stato delle pipeline di dati.

È possibile visualizzare le voci del registro eventi nell'interfaccia utente di DLT, nell'API DLT o eseguendo direttamente una query sul registro eventi. Questa sezione è incentrata sull'esecuzione di query direttamente sul registro eventi.

È anche possibile definire azioni personalizzate da eseguire quando vengono registrati eventi, ad esempio l'invio di avvisi, con hook di eventi.

Schema del registro eventi

Nella tabella seguente viene descritto lo schema del registro eventi. Alcuni di questi campi contengono dati JSON che richiedono l'analisi per eseguire alcune query, ad esempio il details campo . Azure Databricks supporta l'operatore : per analizzare i campi JSON. Vedere l'operatore : (segno di due punti).

Campo Descrizione
id Identificatore univoco per il record del registro eventi.
sequence Documento JSON contenente i metadati per identificare e ordinare gli eventi.
origin Documento JSON contenente i metadati per l'origine dell'evento, ad esempio il provider di servizi cloud, l'area del provider di servizi cloud, user_id, pipeline_ido pipeline_type per mostrare dove è stata creata la pipeline, DBSQL o WORKSPACE.
timestamp Ora in cui è stato registrato l'evento.
message Messaggio leggibile che descrive l'evento.
level Tipo di evento, ad esempio , INFOWARN, ERRORo METRICS.
error Se si è verificato un errore, vengono descritti in dettaglio l'errore.
details Documento JSON contenente i dettagli strutturati dell'evento. Questo è il campo principale usato per l'analisi degli eventi.
event_type Tipo di evento.
maturity_level Stabilità dello schema di eventi. I valori possibili sono:

* STABLE: lo schema è stabile e non cambierà.
* NULL: lo schema è stabile e non cambierà. Il valore può essere NULL se il record è stato creato prima dell'aggiunta del maturity_level campo (versione 2022.37).
* EVOLVING: lo schema non è stabile e può cambiare.
* DEPRECATED: lo schema è deprecato e il runtime di Tabelle live Delta potrebbe interrompere la produzione di questo evento in qualsiasi momento.

Esecuzione di query sul registro eventi

Il percorso del registro eventi e l'interfaccia per eseguire query sul registro eventi dipendono dal fatto che la pipeline sia configurata per l'uso del metastore Hive o del catalogo Unity.

Metastore Hive

Se la pipeline pubblica tabelle nel metastore Hive, il registro eventi viene archiviato nel /system/eventsstorage percorso. Ad esempio, se l'impostazione della pipeline storage è stata configurata come /Users/username/data, il registro eventi viene archiviato nel /Users/username/data/system/events percorso in DBFS.

Se l'impostazione storage non è stata configurata, il percorso predefinito del registro eventi si trova /pipelines/<pipeline-id>/system/events in DBFS. Ad esempio, se l'ID della pipeline è 91de5e48-35ed-11ec-8d3d-0242ac130003, il percorso di archiviazione è /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events.

È possibile creare una vista per semplificare l'esecuzione di query nel registro eventi. Nell'esempio seguente viene creata una vista temporanea denominata event_log_raw. Questa vista viene usata nelle query del log eventi di esempio incluse in questo articolo:

CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;

Sostituire <event-log-path> con il percorso del registro eventi.

Ogni istanza di un'esecuzione della pipeline viene chiamata aggiornamento. Spesso si vogliono estrarre informazioni per l'aggiornamento più recente. Eseguire la query seguente per trovare l'identificatore per l'aggiornamento più recente e salvarlo nella latest_update_id visualizzazione temporanea. Questa vista viene usata nelle query del log eventi di esempio incluse in questo articolo:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

È possibile eseguire query sul log eventi in un notebook di Azure Databricks o nell'editor SQL. Usare un notebook o l'editor SQL per eseguire le query del log eventi di esempio.

Catalogo unity

Se la pipeline pubblica tabelle in Unity Catalog, è necessario usare la event_logfunzione con valori di tabella (TVF) per recuperare il registro eventi per la pipeline. Per recuperare il registro eventi per una pipeline, passare l'ID pipeline o un nome di tabella al file TVF. Ad esempio, per recuperare i record del registro eventi per la pipeline con ID 04c78631-3dd7-4856-b2a6-7d84e9b2638b:

SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")

Per recuperare i record del log eventi per la pipeline che ha creato o possiede la tabella my_catalog.my_schema.table1:

SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))

Per chiamare il file TVF, è necessario usare un cluster condiviso o un'istanza di SQL Warehouse. Ad esempio, è possibile usare un notebook collegato a un cluster condiviso o usare l'editor SQL connesso a un sql warehouse.

Per semplificare l'esecuzione di query sugli eventi per una pipeline, il proprietario della pipeline può creare una visualizzazione su event_log TVF. Nell'esempio seguente viene creata una visualizzazione sul registro eventi per una pipeline. Questa vista viene usata nelle query del log eventi di esempio incluse in questo articolo.

Nota

Il event_log valore TVF può essere chiamato solo dal proprietario della event_log pipeline e una vista creata tramite TVF può essere eseguita solo dal proprietario della pipeline. La visualizzazione non può essere condivisa con altri utenti.

CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");

Sostituire <pipeline-ID> con l'identificatore univoco per la pipeline delta live tables. È possibile trovare l'ID nel pannello Dettagli pipeline nell'interfaccia utente di Tabelle live Delta.

Ogni istanza di un'esecuzione della pipeline viene chiamata aggiornamento. Spesso si vogliono estrarre informazioni per l'aggiornamento più recente. Eseguire la query seguente per trovare l'identificatore per l'aggiornamento più recente e salvarlo nella latest_update_id visualizzazione temporanea. Questa vista viene usata nelle query del log eventi di esempio incluse in questo articolo:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Eseguire query sulle informazioni di derivazione dal registro eventi

Gli eventi contenenti informazioni sulla derivazione hanno il tipo di flow_definitionevento . L'oggetto details:flow_definition contiene e output_datasetinput_datasets definisce ogni relazione nel grafico.

È possibile usare la query seguente per estrarre i set di dati di input e output per visualizzare le informazioni di derivazione:

SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'flow_definition'
  AND
  origin.update_id = latest_update.id
output_dataset input_datasets
1 customers null
2 sales_orders_raw null
3 sales_orders_cleaned ["customers", "sales_orders_raw"]
4 sales_order_in_la ["sales_orders_cleaned"]

Eseguire query sulla qualità dei dati dal registro eventi

Se si definiscono le aspettative sui set di dati nella pipeline, le metriche relative alla qualità dei dati vengono archiviate nell'oggetto details:flow_progress.data_quality.expectations . Gli eventi contenenti informazioni sulla qualità dei dati hanno il tipo di flow_progressevento . L'esempio seguente esegue una query sulle metriche relative alla qualità dei dati per l'ultimo aggiornamento della pipeline:

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name
dataset expectation passing_records failing_records
1 sales_orders_cleaned valid_order_number 4083 0

Monitorare il backlog dei dati eseguendo una query sul registro eventi

Delta Live Tables tiene traccia della quantità di dati presenti nel backlog nell'oggetto details:flow_progress.metrics.backlog_bytes . Gli eventi contenenti le metriche di backlog hanno il tipo di flow_progressevento . Nell'esempio seguente vengono eseguite query sulle metriche del backlog per l'ultimo aggiornamento della pipeline:

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id

Nota

Le metriche del backlog potrebbero non essere disponibili a seconda del tipo di origine dati della pipeline e della versione di Databricks Runtime.

Monitorare gli eventi di scalabilità automatica avanzata dal registro eventi

Il registro eventi acquisisce il ridimensionamento del cluster quando la scalabilità automatica avanzata è abilitata nelle pipeline. Gli eventi contenenti informazioni sulla scalabilità automatica avanzata hanno il tipo di autoscaleevento . Le informazioni sulla richiesta di ridimensionamento del cluster vengono archiviate nell'oggetto details:autoscale . L'esempio seguente esegue una query sulle richieste di ridimensionamento automatico avanzato del cluster per l'ultimo aggiornamento della pipeline:

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

Monitorare l'utilizzo delle risorse di calcolo

cluster_resources gli eventi forniscono metriche sul numero di slot di attività nel cluster, sulla quantità di slot di attività e sul numero di attività in attesa di essere pianificate.

Quando la scalabilità automatica avanzata è abilitata, cluster_resources gli eventi contengono anche metriche per l'algoritmo di scalabilità automatica, inclusi latest_requested_num_executors, e optimal_num_executors. Gli eventi mostrano anche lo stato dell'algoritmo come stati diversi, ad CLUSTER_AT_DESIRED_SIZEesempio , SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSe BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. Queste informazioni possono essere visualizzate insieme agli eventi di scalabilità automatica per fornire un quadro generale della scalabilità automatica avanzata.

Nell'esempio seguente viene eseguita una query sulla cronologia delle dimensioni della coda di attività per l'ultimo aggiornamento della pipeline:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

L'esempio seguente esegue una query sulla cronologia di utilizzo per l'ultimo aggiornamento della pipeline:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

L'esempio seguente esegue una query sulla cronologia del conteggio executor, accompagnata da metriche disponibili solo per le pipeline di scalabilità automatica avanzata, incluso il numero di executor richiesti dall'algoritmo nella richiesta più recente, il numero ottimale di executor consigliati dall'algoritmo in base alle metriche più recenti e lo stato dell'algoritmo di scalabilità automatica:

SELECT
  timestamp,
  Double(details :cluster_resources.num_executors) as current_executors,
  Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

Controllare le pipeline delle tabelle live delta

È possibile usare i record del registro eventi di DLT e altri log di audit di Azure Databricks per ottenere un quadro completo del modo in cui i dati vengono aggiornati in DLT.

DLT (Delta Live Table) usa le credenziali del proprietario della pipeline per eseguire gli aggiornamenti. È possibile modificare le credenziali usate aggiornando il proprietario della pipeline. DLT registra l'utente per le azioni sulla pipeline, tra cui creazione di pipeline, modifiche alla configurazione e attivazione di aggiornamenti.

Per informazioni di riferimento sugli eventi di controllo di Unity Catalog, vedere Eventi del catalogo Unity.

Eseguire query sulle azioni utente nel registro eventi

È possibile usare il registro eventi per controllare gli eventi, ad esempio le azioni dell'utente. Gli eventi contenenti informazioni sulle azioni utente hanno il tipo di user_actionevento .

Le informazioni sull'azione vengono archiviate nell'oggetto user_action nel details campo . Usare la query seguente per costruire un log di controllo degli eventi utente. Per creare la event_log_raw vista usata in questa query, vedere Esecuzione di query sul registro eventi.

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
1 2021-05-20T19:36:03.517+0000 START user@company.com
2 2021-05-20T19:35:59.913+0000 CREATE user@company.com
3 2021-05-27T00:35:51.971+0000 START user@company.com

Informazioni sul runtime

È possibile visualizzare le informazioni di runtime per un aggiornamento della pipeline, ad esempio la versione di Databricks Runtime per l'aggiornamento:

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
1 11.0