Informazioni di riferimento sul linguaggio SQL per le tabelle live delta

Questo articolo fornisce informazioni dettagliate sull'interfaccia di programmazione SQL di Tabelle Live Delta.

  • Per informazioni sull'API Python, vedere le informazioni di riferimento sul linguaggio Python per le tabelle live Delta.
  • Per altre informazioni sui comandi SQL, vedere Informazioni di riferimento sul linguaggio SQL.

È possibile usare funzioni definite dall'utente Python nelle query SQL, ma è necessario definire queste funzioni definite dall'utente nei file Python prima di chiamarle nei file di origine SQL. Vedere Funzioni scalari definite dall'utente - Python.

Limiti

La clausola PIVOT non è supportata. L'operazione pivot in Spark richiede il caricamento eager dei dati di input per calcolare lo schema dell'output. Questa funzionalità non è supportata nelle tabelle Live Delta.

Creare una vista materializzata o una tabella di streaming di tabelle live Delta

La stessa sintassi SQL di base viene usata quando si dichiara una tabella di streaming o una vista materializzata (detta LIVE TABLEanche ).

È possibile dichiarare tabelle di streaming solo usando query che leggono su un'origine di streaming. Databricks consiglia di usare il caricatore automatico per l'inserimento in streaming di file dall'archiviazione di oggetti cloud. Vedere sintassi SQL del caricatore automatico.

È necessario includere la STREAM() funzione intorno a un nome di set di dati quando si specificano altre tabelle o viste nella pipeline come origine di streaming.

Di seguito viene descritta la sintassi per dichiarare viste materializzate e tabelle di streaming con SQL:

CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  AS select_statement

Creare una vista Tabelle live Delta

Di seguito viene descritta la sintassi per dichiarare le viste con SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

Sintassi SQL del caricatore automatico

Di seguito viene descritta la sintassi per l'uso del caricatore automatico in SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

È possibile usare le opzioni di formato supportate con Il caricatore automatico. Usando la map() funzione , è possibile passare qualsiasi numero di opzioni al cloud_files() metodo . Le opzioni sono coppie chiave-valore, in cui le chiavi e i valori sono stringhe. Per informazioni dettagliate sui formati e le opzioni di supporto, vedere Opzioni di formato file.

Esempio: Definire tabelle

È possibile creare un set di dati leggendo da un'origine dati esterna o da set di dati definiti in una pipeline. Per leggere da un set di dati interno, anteporre la LIVE parola chiave al nome del set di dati. L'esempio seguente definisce due set di dati diversi: una tabella denominata taxi_raw che accetta un file JSON come origine di input e una tabella denominata filtered_data che accetta la taxi_raw tabella come input:

CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Esempio: Leggere da un'origine di streaming

Per leggere i dati da un'origine di streaming, ad esempio Auto Loader o un set di dati interno, definire una STREAMING tabella:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

Per altre informazioni sullo streaming dei dati, vedere Trasformare i dati con tabelle live Delta.

Controllare la modalità di materializzazione delle tabelle

Le tabelle offrono anche un controllo aggiuntivo della materializzazione:

  • Specificare la modalità di partizionamento delle tabelle tramite PARTITIONED BY. È possibile usare il partizionamento per velocizzare le query.
  • È possibile impostare le proprietà della tabella usando TBLPROPERTIES. Vedere Proprietà della tabella Tabelle live Delta.
  • Impostare un percorso di archiviazione usando l'impostazione LOCATION . Per impostazione predefinita, i dati della tabella vengono archiviati nel percorso di archiviazione della pipeline, se LOCATION non è impostato.
  • È possibile usare colonne generate nella definizione dello schema. Vedere Esempio: Specificare uno schema e colonne di partizione.

Nota

Per le tabelle di dimensioni inferiori a 1 TB, Databricks consiglia di consentire alle tabelle live delta di controllare l'organizzazione dei dati. A meno che la tabella non si cresca oltre un terabyte, in genere non è consigliabile specificare le colonne di partizione.

Esempio: Specificare uno schema e colonne di partizione

Facoltativamente, è possibile specificare uno schema quando si definisce una tabella. Nell'esempio seguente viene specificato lo schema per la tabella di destinazione, incluso l'uso di colonne generate da Delta Lake e la definizione delle colonne di partizione per la tabella:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Per impostazione predefinita, le tabelle live Delta deducono lo schema dalla table definizione se non si specifica uno schema.

Esempio: Definire vincoli di tabella

Nota

Il supporto dei vincoli di tabella è disponibile in anteprima pubblica. Per definire i vincoli di tabella, la pipeline deve essere una pipeline abilitata per il catalogo unity e configurata per l'uso del preview canale.

Quando si specifica uno schema, è possibile definire chiavi primarie ed esterne. I vincoli sono informativi e non vengono applicati. Nell'esempio seguente viene definita una tabella con un vincolo di chiave primaria ed esterna:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Impostare i valori di configurazione per una tabella o una vista

Usare SET per specificare un valore di configurazione per una tabella o una vista, incluse le configurazioni di Spark. Qualsiasi tabella o vista definita in un notebook dopo che l'istruzione SET ha accesso al valore definito. Tutte le configurazioni spark specificate usando l'istruzione vengono usate durante l'esecuzione SET della query Spark per qualsiasi tabella o vista dopo l'istruzione edizione Standard T. Per leggere un valore di configurazione in una query, usare la sintassi di interpolazione di stringhe ${}. L'esempio seguente imposta un valore di configurazione Spark denominato startDate e usa tale valore in una query:

SET startDate='2020-01-01';

CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Per specificare più valori di configurazione, usare un'istruzione separata SET per ogni valore.

Proprietà SQL

CREATE TABLE o VIEW
TEMPORARY

Creare una tabella ma non pubblicare i metadati per la tabella. La TEMPORARY clausola indica a Delta Live Tables di creare una tabella disponibile per la pipeline, ma non deve essere accessibile all'esterno della pipeline. Per ridurre il tempo di elaborazione, una tabella temporanea viene mantenuta per la durata della pipeline che lo crea e non solo per un singolo aggiornamento.
STREAMING

Creare una tabella che legge un set di dati di input come flusso. Il set di dati di input deve essere un'origine dati di streaming, ad esempio Auto Loader o una STREAMING tabella.
PARTITIONED BY

Elenco facoltativo di una o più colonne da utilizzare per il partizionamento della tabella.
LOCATION

Percorso di archiviazione facoltativo per i dati della tabella. Se non è impostato, per impostazione predefinita il sistema verrà impostato sul percorso di archiviazione della pipeline.
COMMENT

Descrizione facoltativa per la tabella.
column_constraint

Una chiave primaria informativa facoltativa o un vincolo di chiave esterna nella colonna.
table_constraint

Una chiave primaria informativa facoltativa o un vincolo di chiave esterna nella tabella.
TBLPROPERTIES

Elenco facoltativo di proprietà della tabella per la tabella.
select_statement

Query Delta Live Tables che definisce il set di dati per la tabella.
Clausola CONSTRAINT
EXPECT expectation_name

Definire il vincolo expectation_namedata quality . Se ON VIOLATION il vincolo non è definito, aggiungere righe che violano il vincolo al set di dati di destinazione.
ON VIOLATION

Azione facoltativa da eseguire per le righe non riuscite:

* FAIL UPDATE: arresta immediatamente l'esecuzione della pipeline.
* DROP ROW: eliminare il record e continuare l'elaborazione.

Change Data Capture con SQL in tabelle live Delta

Usare l'istruzione per usare la APPLY CHANGES INTO funzionalità CDC di Tabelle Live Delta, come descritto di seguito:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

I vincoli di qualità dei dati per una APPLY CHANGES destinazione vengono definiti usando la stessa CONSTRAINT clausola delle query nonAPPLY CHANGES . Vedere Gestire la qualità dei dati con le tabelle live Delta.

Nota

Il comportamento predefinito per INSERT gli eventi e UPDATE consiste nell'eseguire l'upsert degli eventi CDC dall'origine: aggiornare tutte le righe nella tabella di destinazione che corrispondono alle chiavi specificate o inserire una nuova riga quando un record corrispondente non esiste nella tabella di destinazione. La gestione degli DELETE eventi può essere specificata con la APPLY AS DELETE WHEN condizione .

Importante

È necessario dichiarare una tabella di streaming di destinazione in cui applicare le modifiche. Facoltativamente, è possibile specificare lo schema per la tabella di destinazione. Quando si specifica lo schema della APPLY CHANGES tabella di destinazione, è necessario includere anche le __START_AT colonne e __END_AT con lo stesso tipo di dati del sequence_by campo.

Vedere Change Data Capture semplificato con l'API APPLY CHANGES in Delta Live Tables.

Clausole
KEYS

Colonna o combinazione di colonne che identificano in modo univoco una riga nei dati di origine. Viene usato per identificare quali eventi CDC si applicano a record specifici nella tabella di destinazione.

Questa clausola è obbligatoria.
IGNORE NULL UPDATES

Consentire l'inserimento di aggiornamenti contenenti un subset delle colonne di destinazione. Quando un evento CDC corrisponde a una riga esistente e viene specificato IGNORE NULL UPDATES, le colonne con un null manterranno i valori esistenti nella destinazione. Questo vale anche per le colonne nidificate con il valore .null

La clausola è facoltativa.

L'impostazione predefinita consiste nel sovrascrivere le colonne esistenti con null valori.
APPLY AS DELETE WHEN

Specifica quando un evento CDC deve essere considerato come un DELETE upsert anziché un upsert. Per gestire i dati non ordinati, la riga eliminata viene temporaneamente mantenuta come rimozione definitiva nella tabella Delta sottostante e viene creata una vista nel metastore che filtra tali tombe. L'intervallo di conservazione può essere configurato con
pipelines.cdc.tombstoneGCThresholdInSecondsproprietà table.

La clausola è facoltativa.
APPLY AS TRUNCATE WHEN

Specifica quando un evento CDC deve essere considerato come una tabella TRUNCATEcompleta. Poiché questa clausola attiva un troncamento completo della tabella di destinazione, deve essere usata solo per casi d'uso specifici che richiedono questa funzionalità.

La APPLY AS TRUNCATE WHEN clausola è supportata solo per scD di tipo 1. Il tipo SCD 2 non supporta il troncamento.

La clausola è facoltativa.
SEQUENCE BY

Nome della colonna che specifica l'ordine logico degli eventi CDC nei dati di origine. Le tabelle live delta usano questa sequenziazione per gestire gli eventi di modifica che arrivano non in ordine.

Questa clausola è obbligatoria.
COLUMNS

Specifica un subset di colonne da includere nella tabella di destinazione. È possibile:

* Specificare l'elenco completo di colonne da includere: COLUMNS (userId, name, city).
* Specificare un elenco di colonne da escludere: COLUMNS * EXCEPT (operation, sequenceNum)

La clausola è facoltativa.

L'impostazione predefinita consiste nell'includere tutte le colonne nella tabella di destinazione quando la COLUMNS clausola non è specificata.
STORED AS

Indica se archiviare i record come scD di tipo 1 o SCD di tipo 2.

La clausola è facoltativa.

Il valore predefinito è SCD di tipo 1.
TRACK HISTORY ON

Specifica un subset di colonne di output per generare record di cronologia quando sono presenti modifiche a tali colonne specificate. È possibile:

* Specificare l'elenco completo delle colonne da tenere traccia: COLUMNS (userId, name, city).
* Specificare un elenco di colonne da escludere dal rilevamento: COLUMNS * EXCEPT (operation, sequenceNum)

La clausola è facoltativa. Il valore predefinito è la cronologia delle tracce per tutte le colonne di output quando sono presenti modifiche, equivalenti a TRACK HISTORY ON *.