Sviluppare codice della pipeline con SQL

Delta Live Tables introduce diverse nuove parole chiave e funzioni SQL per definire views materializzate e tables di streaming nelle pipeline. Il supporto SQL per lo sviluppo di pipeline si basa sulle nozioni di base di Spark SQL e aggiunge il supporto per la funzionalità Structured Streaming.

Gli utenti che hanno familiarità con i dataframe PySpark potrebbero preferire lo sviluppo di codice della pipeline con Python. Python supporta test e operazioni più estese che sono difficili da implementare con SQL, ad esempio le operazioni di metaprogrammazione. Vedere Sviluppare codice della pipeline con Python.

Per un riferimento completo alla sintassi SQL di Delta Live Tables, vedere informazioni di riferimento sul linguaggio SQL di Delta Live Tables.

Nozioni di base di SQL per lo sviluppo di pipeline

Il codice SQL che crea set di dati di Tables Delta Live usa la sintassi CREATE OR REFRESH per definire views materializzati e tables di streaming sui risultati della query.

La STREAM parola chiave indica se l'origine dati a cui si fa riferimento in una SELECT clausola deve essere letta con la semantica di streaming.

Il codice sorgente di Delta Live Tables differisce in modo critico dagli script SQL: Delta Live Tables valuta tutte le definizioni di set di dati di tutti i file di codice sorgente configurati in una pipeline e costruisce un grafo di flusso di dati prima dell'esecuzione di qualsiasi query. L'ordine delle query visualizzate in un notebook o in uno script non definisce l'ordine di esecuzione.

Creare una vista materializzata con SQL

L'esempio di codice seguente illustra la sintassi di base per la creazione di una vista materializzata con SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Creare uno streaming table con SQL

L'esempio di codice seguente illustra la sintassi di base per creare un table in streaming con SQL.

Nota

Non tutte le origini dati supportano le letture di streaming e alcune origini dati devono essere sempre elaborate con la semantica di streaming.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Caricare dati dall'archivio oggetti

Delta Live Tables supporta il caricamento di dati da tutti i formati supportati da Azure Databricks. Vedere Opzioni di formato dati.

Nota

Questi esempi usano i dati disponibili nell'area di lavoro montata automaticamente nell'area /databricks-datasets di lavoro. Databricks consiglia di usare i percorsi del volume o gli URI cloud per fare riferimento ai dati archiviati nell'archiviazione di oggetti cloud. Consulta Che cosa sono CatalogvolumesUnity?.

Databricks consiglia di usare Auto Loader e lo streamingtables quando si configurano carichi di lavoro di inserimento incrementali sui dati archiviati nello storage di oggetti nel cloud. Vedere Che cos'è l’Autoloader?.

SQL usa la funzione per richiamare la read_files funzionalità del caricatore automatico. È anche necessario usare la STREAM parola chiave per configurare una lettura di streaming con read_files.

L'esempio seguente crea un flusso di dati table da file JSON usando Auto Loader:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

La funzione read_files supporta anche la semantica batch per creare viewsmaterializzati. L'esempio seguente usa la semantica batch per leggere una directory JSON e creare una vista materializzata:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Convalidare i dati con le aspettative

È possibile usare le aspettative per set e applicare vincoli di qualità dei dati. Consultare Gestire la qualità dei dati con le aspettative della pipeline.

Il codice seguente definisce un'aspettativa denominata valid_data che elimina i record null durante l'inserimento dati:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Query materializzate views e query di streaming tables definite nella pipeline

Usare il LIVEschema per interrogare altri views materializzati e tables di flusso definiti nella pipeline.

L'esempio seguente definisce quattro set di dati:

  • Flusso table chiamato orders che carica i dati JSON.
  • Vista materializzata denominata customers che carica i dati CSV.
  • Una vista materializzata denominata customer_orders che unisce i record dai orders set di dati e customers , esegue il cast del timestamp dell'ordine a una data e seleziona i customer_idcampi , order_number, statee order_date .
  • Vista materializzata denominata daily_orders_by_state che aggrega il conteggio giornaliero degli ordini per ogni stato.
CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;