Esercitazione: Eseguire la prima pipeline di tabelle live Delta

Importante

Le pipeline DLT serverless sono disponibili in anteprima pubblica. Per informazioni sull'abilitazione delle pipeline DLT serverless, contattare il team dell'account di Azure Databricks.

Questa esercitazione illustra come configurare una pipeline di tabelle live Delta dal codice in un notebook di Databricks ed eseguire la pipeline attivando un aggiornamento della pipeline. Questa esercitazione include una pipeline di esempio per inserire ed elaborare un set di dati di esempio con codice di esempio usando le interfacce Python e SQL . È anche possibile usare le istruzioni riportate in questa esercitazione per creare una pipeline con qualsiasi notebook con una sintassi delta live tables definita correttamente.

È possibile configurare pipeline di tabelle live Delta e attivare gli aggiornamenti usando l'interfaccia utente dell'area di lavoro di Azure Databricks o opzioni di strumenti automatizzati, ad esempio l'API, l'interfaccia della riga di comando, i bundle di asset di Databricks o come attività in un flusso di lavoro di Databricks. Per acquisire familiarità con le funzionalità e le funzionalità delle tabelle live Delta, Databricks consiglia di usare prima l'interfaccia utente per creare ed eseguire pipeline. Inoltre, quando si configura una pipeline nell'interfaccia utente, Delta Live Tables genera una configurazione JSON per la pipeline che può essere usata per implementare i flussi di lavoro programmatici.

Per illustrare la funzionalità Tabelle live Delta, gli esempi di questa esercitazione scaricano un set di dati disponibile pubblicamente. Databricks offre tuttavia diversi modi per connettersi alle origini dati e inserire dati che implementano casi d'uso reali. Vedere Inserire dati con tabelle live Delta.

Requisiti

  • Per avviare una pipeline non serverless, è necessario disporre dell'autorizzazione di creazione del cluster o dell'accesso a un criterio del cluster che definisce un cluster Delta Live Tables. Il runtime di DLT crea un cluster prima di eseguire la pipeline e ha esito negativo se non si ha l'autorizzazione corretta.

  • Per usare gli esempi in questa esercitazione, l'area di lavoro deve avere Il catalogo Unity abilitato.

  • È necessario disporre delle autorizzazioni seguenti in Unity Catalog:

    • READ VOLUME e WRITE VOLUME, o ALL PRIVILEGESper il my-volume volume.
    • USE SCHEMA o ALL PRIVILEGES per lo default schema.
    • USE CATALOG oppure ALL PRIVILEGES per il main catalogo.

    Per impostare queste autorizzazioni, vedere l'amministratore di Databricks o i privilegi di Unity Catalog e gli oggetti a protezione diretta.

  • Gli esempi in questa esercitazione usano un volume di Unity Catalog per archiviare i dati di esempio. Per usare questi esempi, creare un volume e usare i nomi di catalogo, schema e volume del volume per impostare il percorso del volume usato dagli esempi.

Nota

Se l'area di lavoro non dispone di Unity Catalog abilitato, i notebook con esempi che non richiedono Il catalogo unity sono collegati a questo articolo. Per usare questi esempi, selezionare Hive metastore come opzione di archiviazione quando si crea la pipeline.

Dove si eseguono query delta live tables?

Le query Delta Live Tables vengono implementate principalmente nei notebook di Databricks, ma le tabelle live Delta non sono progettate per essere eseguite in modo interattivo nelle celle del notebook. L'esecuzione di una cella contenente la sintassi delle tabelle live Delta in un notebook di Databricks genera un messaggio di errore. Per eseguire le query, è necessario configurare i notebook come parte di una pipeline.

Importante

  • Non è possibile fare affidamento sull'ordinamento dell'esecuzione cella per cella dei notebook durante la scrittura di query per tabelle live Delta. Le tabelle live Delta valutano ed eseguono tutto il codice definito nei notebook, ma hanno un modello di esecuzione diverso da quello di un notebook Esegui tutto comando.
  • Non è possibile combinare le lingue in un singolo file di codice sorgente delle tabelle live Delta. Ad esempio, un notebook può contenere solo query Python o query SQL. Se è necessario usare più linguaggi in una pipeline, usare più notebook o file specifici del linguaggio nella pipeline.

È anche possibile usare il codice Python archiviato nei file. Ad esempio, è possibile creare un modulo Python che può essere importato nelle pipeline Python o definire funzioni definite dall'utente Python da usare nelle query SQL. Per informazioni sull'importazione di moduli Python, vedere Importare moduli Python da cartelle Git o file dell'area di lavoro. Per altre informazioni sull'uso delle funzioni definite dall'utente, vedere Funzioni scalari definite dall'utente - Python.

Esempio: Inserire ed elaborare i dati dei nomi dei bambini di New York

L'esempio in questo articolo usa un set di dati disponibile pubblicamente che contiene i record dei nomi dei bambini dello Stato di New York. Questi esempi illustrano l'uso di una pipeline di tabelle live Delta per:

  • Leggere i dati CSV non elaborati da un set di dati disponibile pubblicamente in una tabella.
  • Leggere i record dalla tabella dei dati non elaborati e usare le aspettative delle tabelle live Delta per creare una nuova tabella contenente dati puliti.
  • Usare i record puliti come input per le query Delta Live Tables che creano set di dati derivati.

Questo codice illustra un esempio semplificato dell'architettura medallion. Vedere Che cos'è l'architettura del lago medallion?.

Le implementazioni di questo esempio vengono fornite per le interfacce Python e SQL . È possibile seguire la procedura per creare nuovi notebook che contengono il codice di esempio oppure passare direttamente a Creare una pipeline e usare uno dei notebook forniti in questa pagina.

Implementare una pipeline di tabelle live Delta con Python

Il codice Python che crea set di dati delta live tables deve restituire dataframe, familiarità con gli utenti con PySpark o Pandas per l'esperienza Spark. Per gli utenti che non hanno familiarità con i dataframe, Databricks consiglia di usare l'interfaccia SQL. Vedere Implementare una pipeline di tabelle live Delta con SQL.

Tutte le API Python per le tabelle live Delta vengono implementate nel dlt modulo. Il codice della pipeline Delta Live Tables implementato con Python deve importare in modo esplicito il dlt modulo nella parte superiore di notebook e file Python. Le tabelle live delta differiscono da molti script Python in modo chiave: non si chiamano le funzioni che eseguono l'inserimento e la trasformazione dei dati per creare set di dati Delta Live Tables. Le tabelle live Delta interpretano invece le funzioni decorator del dlt modulo in tutti i file caricati in una pipeline e compila un grafico del flusso di dati.

Per implementare l'esempio in questa esercitazione, copiare e incollare il codice Python seguente in un nuovo notebook Python. È necessario aggiungere ogni frammento di codice di esempio alla propria cella nel notebook nell'ordine descritto. Per esaminare le opzioni per la creazione di notebook, vedere Creare un notebook.

Nota

Quando si crea una pipeline con l'interfaccia Python, per impostazione predefinita, i nomi delle tabelle vengono definiti dai nomi delle funzioni. L'esempio python seguente, ad esempio, crea tre tabelle denominate baby_names_raw, baby_names_preparede top_baby_names_2021. È possibile eseguire l'override del nome della tabella usando il name parametro . Vedere Creare una vista materializzata o una tabella di streaming di tabelle live Delta.

Importare il modulo Tabelle live Delta

Tutte le API Python per le tabelle live Delta vengono implementate nel dlt modulo. Importare in modo esplicito il dlt modulo nella parte superiore di notebook e file Python.

L'esempio seguente illustra questa importazione, insieme alle istruzioni import per pyspark.sql.functions.

import dlt
from pyspark.sql.functions import *

Scaricare i dati

Per ottenere i dati per questo esempio, scaricare un file CSV e archiviarlo nel volume come indicato di seguito:

import os

os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/<catalog-name>/<schema-name>/<volume-name>/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"

dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")

Sostituire <catalog-name>, <schema-name>e <volume-name> con i nomi di catalogo, schema e volume per un volume di Unity Catalog.

Creare una tabella da file nell'archivio oggetti

Le tabelle live delta supportano il caricamento di dati da tutti i formati supportati da Azure Databricks. Vedere Opzioni di formato dati.

L'elemento @dlt.table Decorator indica a Delta Live Tables di creare una tabella contenente il risultato di un DataFrame oggetto restituito da una funzione. Aggiungere l'elemento @dlt.table Decorator prima di qualsiasi definizione di funzione Python che restituisce un dataframe Spark per registrare una nuova tabella nelle tabelle live Delta. Nell'esempio seguente viene illustrato l'uso del nome della funzione come nome della tabella e l'aggiunta di un commento descrittivo alla tabella:

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

Aggiungere una tabella da un set di dati upstream nella pipeline

È possibile usare dlt.read() per leggere i dati da altri set di dati dichiarati nella pipeline di tabelle live Delta correnti. La dichiarazione di nuove tabelle in questo modo crea una dipendenza che le tabelle live Delta risolvono automaticamente prima di eseguire gli aggiornamenti. Il codice seguente include anche esempi di monitoraggio e applicazione della qualità dei dati con le aspettative. Vedere Gestire la qualità dei dati con le tabelle live Delta.

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    dlt.read("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

Creare una tabella con viste dati arricchite

Poiché le tabelle live Delta elaborano gli aggiornamenti alle pipeline come una serie di grafici di dipendenza, è possibile dichiarare visualizzazioni altamente arricchite che alimentano dashboard, BI e analisi dichiarando tabelle con logica di business specifica.

Le tabelle nelle tabelle live delta sono concettualmente equivalenti alle viste materializzate. Mentre le visualizzazioni tradizionali nella logica di esecuzione di Spark ogni volta che viene eseguita una query nella vista, una tabella Tabelle live Delta archivia la versione più recente dei risultati della query nei file di dati. Poiché le tabelle live Delta gestiscono gli aggiornamenti per tutti i set di dati in una pipeline, è possibile pianificare gli aggiornamenti della pipeline in modo che corrispondano ai requisiti di latenza per le viste materializzate e che le query su queste tabelle contengano la versione più recente dei dati disponibili.

La tabella definita dal codice seguente illustra la somiglianza concettuale con una vista materializzata derivata dai dati upstream nella pipeline:

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    dlt.read("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

Per configurare una pipeline che usa il notebook, vedere Creare una pipeline.

Implementare una pipeline di tabelle live Delta con SQL

Databricks consiglia tabelle live Delta con SQL come metodo preferito per gli utenti SQL di creare nuove pipeline di trasformazione, inserimento e inserimento in Azure Databricks. L'interfaccia SQL per le tabelle live Delta estende Spark SQL standard con molte nuove parole chiave, costrutti e funzioni con valori di tabella. Queste aggiunte a SQL standard consentono agli utenti di dichiarare dipendenze tra set di dati e distribuire un'infrastruttura di livello di produzione senza apprendere nuovi strumenti o concetti aggiuntivi.

Per gli utenti che hanno familiarità con i dataframe Spark e che necessitano di supporto per test e operazioni più estese che sono difficili da implementare con SQL, ad esempio le operazioni di metaprogrammazione, Databricks consiglia di usare l'interfaccia Python. Vedere Esempio: Inserire ed elaborare i dati dei nomi dei bambini di New York.

Scaricare i dati

Per ottenere i dati per questo esempio, copiare il codice seguente, incollarlo in un nuovo notebook e quindi eseguire il notebook. Per esaminare le opzioni per la creazione di notebook, vedere Creare un notebook.

%sh
wget -O "/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv" "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"

Sostituire <catalog-name>, <schema-name>e <volume-name> con i nomi di catalogo, schema e volume per un volume di Unity Catalog.

Creare una tabella da file in Unity Catalog

Per il resto di questo esempio, copiare i frammenti SQL seguenti e incollarli in un nuovo notebook SQL, separato dal notebook nella sezione precedente. È necessario aggiungere ogni frammento SQL di esempio alla propria cella nel notebook nell'ordine descritto.

Le tabelle live delta supportano il caricamento di dati da tutti i formati supportati da Azure Databricks. Vedere Opzioni di formato dati.

Tutte le istruzioni SQL di Delta Live Tables usano CREATE OR REFRESH sintassi e semantica. Quando si aggiorna una pipeline, Delta Live Tables determina se il risultato logico corretto per la tabella può essere eseguito tramite l'elaborazione incrementale o se è necessaria la ricomputazione completa.

L'esempio seguente crea una tabella caricando i dati dal file CSV archiviato nel volume di Unity Catalog:

CREATE OR REFRESH LIVE TABLE baby_names_sql_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count FROM read_files(
  '/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

Sostituire <catalog-name>, <schema-name>e <volume-name> con i nomi di catalogo, schema e volume per un volume di Unity Catalog.

Aggiungere una tabella da un set di dati upstream alla pipeline

È possibile usare lo live schema virtuale per eseguire query sui dati di altri set di dati dichiarati nella pipeline di tabelle live Delta correnti. La dichiarazione di nuove tabelle in questo modo crea una dipendenza che le tabelle live Delta risolvono automaticamente prima di eseguire gli aggiornamenti. Lo live schema è una parola chiave personalizzata implementata in Tabelle Live Delta che possono essere sostituite da uno schema di destinazione se si desidera pubblicare i set di dati. Vedere Usare Unity Catalog con le pipeline delle tabelle live Delta e Pubblicare dati da tabelle live Delta al metastore Hive.

Il codice seguente include anche esempi di monitoraggio e applicazione della qualità dei dati con le aspettative. Vedere Gestire la qualità dei dati con le tabelle live Delta.

CREATE OR REFRESH LIVE TABLE baby_names_sql_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM live.baby_names_sql_raw;

Creare una vista dati arricchita

Poiché le tabelle live Delta elaborano gli aggiornamenti alle pipeline come una serie di grafici di dipendenza, è possibile dichiarare visualizzazioni altamente arricchite che alimentano dashboard, BI e analisi dichiarando tabelle con logica di business specifica.

Le tabelle live sono concettualmente equivalenti alle viste materializzate. Mentre le visualizzazioni tradizionali nella logica di esecuzione di Spark ogni volta che viene eseguita una query sulla vista, le tabelle live archiviano la versione più recente dei risultati delle query nei file di dati. Poiché le tabelle live Delta gestiscono gli aggiornamenti per tutti i set di dati in una pipeline, è possibile pianificare gli aggiornamenti della pipeline in modo che corrispondano ai requisiti di latenza per le viste materializzate e che le query su queste tabelle contengano la versione più recente dei dati disponibili.

Il codice seguente crea una vista materializzata arricchita dei dati upstream:

CREATE OR REFRESH LIVE TABLE top_baby_names_sql_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM live.baby_names_sql_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

Per configurare una pipeline che usa il notebook, continuare a Creare una pipeline.

Creare una pipeline

Le tabelle live delta creano pipeline risolvendo le dipendenze definite nei notebook o nei file (denominati codice sorgente o librerie) usando la sintassi Delta Live Tables. Ogni file di codice sorgente può contenere un solo linguaggio, ma è possibile combinare librerie di linguaggi diversi nella pipeline.

  1. Fare clic su Delta Live Tables (Tabelle attive Delta) nella barra laterale e fare clic su Create Pipeline (Crea pipeline).
  2. Assegnare alla pipeline un nome.
  3. (Facoltativo) Selezionare la casella di controllo Serverless per usare il calcolo completamente gestito per questa pipeline. Quando si seleziona Serverless, le impostazioni di calcolo vengono rimosse dall'interfaccia utente.
  4. (Facoltativo) Selezionare un'edizione del prodotto.
  5. Selezionare Attivato per Modalità pipeline.
  6. Configurare uno o più notebook contenenti il codice sorgente per la pipeline. Nella casella di testo Percorsi immettere il percorso di un notebook o fare clic Icona selezione file per selezionare un notebook.
  7. Selezionare una destinazione per i set di dati pubblicati dalla pipeline, ovvero il metastore Hive o il catalogo Unity. Vedere Pubblicare set di dati.
    • Metastore Hive:
      • (Facoltativo) Immettere un percorso Archiviazione per i dati di output dalla pipeline. Se si lascia Archiviazione percorso vuoto, il sistema usa una posizione predefinita.
      • (Facoltativo) Specificare uno schema di destinazione per pubblicare il set di dati nel metastore Hive.
    • Catalogo unity: specificare un catalogo e uno schema di destinazione per pubblicare il set di dati nel catalogo Unity.
  8. (Facoltativo) Se non è stata selezionata l'opzione Serverless, è possibile configurare le impostazioni di calcolo per la pipeline. Per informazioni sulle opzioni per le impostazioni di calcolo, vedere Configurare le impostazioni della pipeline per le tabelle Live Delta.
  9. (Facoltativo) Fare clic su Aggiungi notifica per configurare uno o più indirizzi di posta elettronica per ricevere notifiche per gli eventi della pipeline. Vedere Aggiungere notifiche di posta elettronica per gli eventi della pipeline.
  10. (Facoltativo) Configurare le impostazioni avanzate per la pipeline. Per informazioni sulle opzioni per le impostazioni avanzate, vedere Configurare le impostazioni della pipeline per le tabelle live Delta.
  11. Fai clic su Crea.

Il sistema visualizza la pagina Dettagli pipeline dopo aver fatto clic su Crea. È anche possibile accedere alla pipeline facendo clic sul nome della pipeline nella scheda Tabelle live Delta.

Avviare un aggiornamento della pipeline

Per avviare un aggiornamento per una pipeline, fare clic sul Icona di avvio delle tabelle live delta pulsante nel pannello superiore. Il sistema restituisce un messaggio che conferma l'avvio della pipeline.

Dopo aver avviato correttamente l'aggiornamento, il sistema Delta Live Tables:

  1. Avvia un cluster usando una configurazione cluster creata dal sistema Delta Live Tables. È anche possibile specificare una configurazione del cluster personalizzata.
  2. Crea tutte le tabelle che non esistono e garantisce che lo schema sia corretto per tutte le tabelle esistenti.
  3. Aggiornamenti tabelle con i dati più recenti disponibili.
  4. Arresta il cluster al termine dell'aggiornamento.

Nota

La modalità di esecuzione è impostata su Produzione per impostazione predefinita, che distribuisce risorse di calcolo temporanee per ogni aggiornamento. È possibile usare la modalità sviluppo per modificare questo comportamento, consentendo di usare le stesse risorse di calcolo per più aggiornamenti della pipeline durante lo sviluppo e il test. Vedere Sviluppo e modalità di produzione.

Pubblicare set di dati

È possibile rendere disponibili set di dati di Tabelle live Delta per l'esecuzione di query pubblicando tabelle nel metastore Hive o nel catalogo Unity. Se non si specifica una destinazione per la pubblicazione di dati, è possibile accedere alle tabelle create nelle pipeline di tabelle live Delta solo da altre operazioni nella stessa pipeline. Vedere Pubblicare dati da tabelle live Delta nel metastore Hive e Usare il catalogo Unity con le pipeline di tabelle live Delta.

Notebook del codice sorgente di esempio

È possibile importare questi notebook in un'area di lavoro di Azure Databricks e usarli per distribuire una pipeline di tabelle live Delta. Vedere Creare una pipeline.

Introduzione al notebook Python per tabelle live Delta

Ottenere il notebook

Introduzione al notebook SQL di Tabelle live Delta

Ottenere il notebook

Notebook di codice sorgente di esempio per le aree di lavoro senza catalogo Unity

È possibile importare questi notebook in un'area di lavoro di Azure Databricks senza che Unity Catalog sia abilitato e usarli per distribuire una pipeline di tabelle Live Delta. Vedere Creare una pipeline.

Introduzione al notebook Python per tabelle live Delta

Ottenere il notebook

Introduzione al notebook SQL di Tabelle live Delta

Ottenere il notebook