Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Si applica a:
Databricks SQL
Crea una tabella di streaming , una tabella Delta con supporto aggiuntivo per lo streaming o l'elaborazione incrementale dei dati.
Le tabelle di streaming sono supportate solo nelle pipeline dichiarative di Lakeflow Spark e in Databricks SQL con il catalogo Unity. L'esecuzione di questo comando nel calcolo di Databricks Runtime supportato analizza solo la sintassi. Vedere Sviluppare codice di pipeline dichiarative di Lakeflow Spark con SQL.
Sintassi
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
COMMENT table_comment |
DEFAULT COLLATION UTF8_BINARY |
TBLPROPERTIES clause |
schedule |
WITH { ROW FILTER clause } } [...]
schedule
{ SCHEDULE [ REFRESH ] schedule_clause |
TRIGGER ON UPDATE [ AT MOST EVERY trigger_interval ] }
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ]}
Parametri
REFRESH
Se specificato, aggiorna la tabella con i dati più recenti disponibili dalle origini definite nella query. Vengono elaborati solo i nuovi dati che arrivano prima dell'avvio della query. I nuovi dati aggiunti alle origini durante l'esecuzione del comando vengono ignorati fino al successivo aggiornamento. L'operazione di aggiornamento da CREATE OR REFRESH è completamente dichiarativa. Se un comando refresh non specifica tutti i metadati dell'istruzione di creazione della tabella originale, i metadati non specificati vengono eliminati.
SE NON ESISTE
Crea la tabella di streaming se non esiste. Se esiste già una tabella con questo nome, l'istruzione
CREATE STREAMING TABLEviene ignorata.È possibile specificare al massimo uno di
IF NOT EXISTSoOR REFRESH.-
Il nome della tabella da creare. Il nome non deve includere una specifica temporale o una specifica delle opzioni. Se il nome non è qualificato, la tabella viene creata nello schema corrente.
specifica_tavola
Questa clausola facoltativa definisce l'elenco di colonne, i relativi tipi, proprietà, descrizioni e vincoli di colonna.
Se non si definiscono colonne nello schema della tabella, è necessario specificare
AS query.-
Nome univoco per la colonna.
NOT NULL
La colonna, se specificata, non accetta valori
NULL.COMMENT colonna_commento
Stringa letterale usata per descrivere la colonna.
-
Importante
Questa funzionalità è disponibile in anteprima pubblica.
Aggiunge una chiave primaria o un vincolo di chiave esterna alla colonna in una tabella di streaming. I vincoli non sono supportati per le tabelle nel catalogo
hive_metastore. -
Aggiunge una funzione di mascheratura delle colonne per rendere anonimi i dati sensibili. Tutte le query successive di tale colonna ricevono il risultato della valutazione della funzione applicata alla colonna al posto del valore originale della colonna. Ciò può essere utile per scopi di controllo di accesso con granularità fine in cui la funzione può esaminare l'identità o le appartenenze ai gruppi dell'utente che richiama per decidere se redigere il valore.
CONSTRAINT nome_attesa ATTENDI (espressione_attesa) [ SU VIOLAZIONE { FALLIMENTO UPDATE | RIDUCI RIGA } ]
Aggiunge aspettative sulla qualità dei dati alla tabella. Queste aspettative sulla qualità dei dati possono essere rilevate nel tempo e accessibili tramite il registro eventi della tabella di streaming. Un'aspettativa
FAIL UPDATEfa fallire l'elaborazione sia durante la creazione che durante l'aggiornamento della tabella. Se l'aspettativaDROP ROWnon viene soddisfatta, l'intera riga viene eliminata.expectation_exprpossono essere costituiti da valori letterali, identificatori di colonna all'interno della tabella e funzioni o operatori SQL predefiniti, ad eccezione di:-
Funzioni di aggregazione
- funzioni della finestra analitica
- funzioni di finestra di classifica
- Funzioni generatrici con valori tabulari
Inoltre
exprnon deve contenere alcuna sottoquery.-
Funzioni di aggregazione
-
Importante
Questa funzionalità è disponibile in anteprima pubblica.
Aggiunge una chiave primaria informativa o vincoli di chiave esterna informativa a una tabella di streaming. I vincoli di chiave non sono supportati per le tabelle nel catalogo
hive_metastore.
-
table_clauses
Facoltativamente, specificare partizionamento, commenti, proprietà definite dall'utente e una pianificazione di aggiornamento per la nuova tabella. Ogni clausola secondaria può essere specificata una sola volta.
-
Elenco facoltativo di colonne della tabella per partizionare la tabella.
Nota
Il clustering liquido offre una soluzione flessibile e ottimizzata per il clustering. È consigliabile usare
CLUSTER BYanzichéPARTITIONED BYper le tabelle di streaming. -
Clausola opzionale per raggruppare basandosi su un sottoinsieme di colonne. Usare il clustering liquido automatico con
CLUSTER BY AUTOe Databricks sceglie in modo intelligente le chiavi di clustering per ottimizzare le prestazioni delle query. Vedere Usare clustering liquido per le tabelle.Il clustering liquido non può essere combinato con
PARTITIONED BY. COMMENTO table_comment
Un
STRINGletterale per descrivere la tabella.COLLAZIONE PREDEFINITA UTF8_BINARY
Si applica a:
controllo SQL di Databricks
Databricks Runtime 17.1 e versioni successiveForza le regole di confronto predefinite della tabella di streaming su
UTF8_BINARY. Questa clausola è obbligatoria se lo schema in cui viene creata la tabella ha regole di confronto predefinite diverse daUTF8_BINARY. Le regole di confronto predefinite della tabella di streaming vengono usate come regole di confronto predefinite all'interno diquerye per i tipi di colonna.-
Facoltativamente, imposta una o più proprietà definite dall'utente.
Usare questa impostazione per specificare il canale di runtime di Lakeflow Spark Declarative Pipelines usato per eseguire questa istruzione. Impostare il valore della proprietà
pipelines.channelsu"PREVIEW"o"CURRENT". Il valore predefinito è"CURRENT". Per altre informazioni sui canali di pipeline dichiarative di Lakeflow Spark, vedere Canali di runtime di Lakeflow Spark Declarative Pipelines. schedule
La pianificazione può essere un'istruzione
SCHEDULEo un'istruzioneTRIGGER.SCHEDULE [ REFRESH ] clausola_di_programma
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }Per pianificare un aggiornamento che avviene periodicamente, utilizzare la sintassi
EVERY. Se viene specificata la sintassiEVERY, la tabella di streaming o la vista materializzata viene aggiornata periodicamente all'intervallo indicato in base al valore fornito, ad esempioHOUR,HOURS,DAY,DAYS,WEEKoWEEKS. Nella tabella seguente sono elencati i valori integer accettati pernumber.Unità di tempo Valore intero HOUR or HOURS1 <= H <= 72 DAY or DAYS1 <= D <= 31 WEEK or WEEKS1 <= W <= 8 Nota
Le forme singolari e plurali dell'unità temporale inclusa sono semanticamente equivalenti.
CRON cron_string [ AT TIME ZONE timezone_id ]Per pianificare un aggiornamento utilizzando un valore di quartz cron. Vengono accettati time_zone_values validi.
AT TIME ZONE LOCALnon è supportata.Se
AT TIME ZONEè assente, viene usato il fuso orario della sessione. SeAT TIME ZONEè assente e il fuso orario della sessione non è impostato, viene generato un errore.SCHEDULEè semanticamente equivalente aSCHEDULE REFRESH.
La pianificazione può essere specificata come parte del comando
CREATE. Usare ALTER STREAMING TABLE o eseguire il comandoCREATE OR REFRESHcon la clausolaSCHEDULEper modificare la pianificazione di una tabella di streaming dopo la creazione.TRIGGER ON UPDATE [ AL MASSIMO OGNI trigger_interval ]
Importante
La
TRIGGER ON UPDATEfunzionalità è in versione beta.Facoltativamente, impostare la tabella da aggiornare quando viene aggiornata un'origine dati upstream, al massimo una volta ogni minuto. Impostare un valore per per
AT MOST EVERYper richiedere almeno un tempo minimo tra gli aggiornamenti.Le origini dati upstream devono essere tabelle Delta esterne o gestite (incluse viste materializzate o tabelle di streaming) o viste gestite le cui dipendenze sono limitate ai tipi di tabella supportati.
L'abilitazione degli eventi di file può rendere i trigger più efficienti e aumenta alcuni dei limiti per gli aggiornamenti dei trigger.
trigger_intervalè un'istruzione INTERVAL di almeno 1 minuto.TRIGGER ON UPDATEpresenta le limitazioni seguenti- Non più di 10 sorgenti di dati upstream per tabella di flusso dati quando si utilizza TRIGGER ON UPDATE.
- È possibile specificare un massimo di 1000 tabelle di streaming o viste materializzate con TRIGGER ON UPDATE.
- Per impostazione predefinita, la
AT MOST EVERYclausola è 1 minuto e non può essere inferiore a 1 minuto.
-
-
Aggiunge una funzione di filtro di riga alla tabella. Tutte le query successive da tale tabella ricevono un sottoinsieme delle righe in cui la funzione restituisce il valore booleano TRUE. Ciò può essere utile per scopi di controllo di accesso con granularità fine in cui la funzione può controllare l'identità o le appartenenze ai gruppi dell'utente che richiama per decidere se filtrare determinate righe.
AS Query
Questa clausola popola la tabella usando i dati di
query. Questa query deve essere una query di streaming. A tale scopo, è possibile aggiungere la parola chiaveSTREAMa qualsiasi relazione da elaborare in modo incrementale. Quando si specifica unquerye untable_specificationinsieme, lo schema della tabella specificato intable_specificationdeve contenere tutte le colonne restituite dalquery, in caso contrario viene visualizzato un errore. Qualsiasi colonna specificata intable_specificationma non restituita daqueryrestituisce valorinullquando viene eseguita una query.
Differenze tra tabelle di streaming e altre tabelle
Le tabelle di streaming sono tabelle con stato, progettate per gestire ogni riga una sola volta durante l'elaborazione di un set di dati in crescita. Poiché la maggior parte dei set di dati aumenta continuamente nel tempo, le tabelle di streaming sono valide per la maggior parte dei carichi di lavoro di inserimento. Le tabelle di streaming sono ottimali per le pipeline che richiedono aggiornamento dei dati e bassa latenza. Le tabelle di streaming possono essere utili anche per le trasformazioni su larga scala, poiché i risultati possono essere calcolati in modo incrementale man mano che arrivano nuovi dati, mantenendo i risultati aggiornati senza dover ricompilare completamente tutti i dati di origine con ogni aggiornamento. Le tabelle di streaming sono progettate per origini dati che consentono solo l'aggiunta.
Le tabelle di streaming accettano comandi aggiuntivi, ad esempio REFRESH, che elabora i dati più recenti disponibili nelle origini fornite nella query. Le modifiche apportate alla query fornita vengono riflesse solo sui nuovi dati chiamando un REFRESH, mentre i dati già elaborati in precedenza non vengono influenzati. Per applicare le modifiche anche ai dati esistenti, è necessario eseguire REFRESH TABLE <table_name> FULL per eseguire FULL REFRESH. L'aggiornamento completo rielabora tutti i dati disponibili nell'origine usando la definizione più recente. Non è consigliabile chiamare aggiornamenti completi sulle origini che non mantengono l'intera cronologia dei dati o hanno brevi periodi di conservazione, ad esempio Kafka, perché l'aggiornamento completo tronca i dati esistenti. Potrebbe non essere possibile recuperare i dati obsoleti se i dati non sono più disponibili nell'origine.
Filtri di riga e maschere di colonna
I filtri di riga consentono di specificare una funzione che viene applicata come filtro ogni volta che un'analisi di tabella recupera righe. Questi filtri assicurano che le query successive restituiscano solo righe per le quali il predicato di filtro restituisce TRUE.
Le maschere di colonna consentono di mascherare i valori di una colonna ogni volta che un'analisi di tabella recupera le righe. Tutte le query future che coinvolgono tale colonna riceveranno il risultato della valutazione della funzione applicata alla colonna, sostituendo il valore originale di essa.
Per altre informazioni su come usare filtri di riga e maschere di colonna, vedere Filtri di riga e maschere di colonna.
Gestione dei filtri di riga e delle maschere di colonna
I filtri di riga e le maschere di colonna nelle tabelle di streaming devono essere aggiunti, aggiornati o eliminati tramite l'istruzione CREATE OR REFRESH.
Comportamento
-
Aggiornare come Definer: quando le istruzioni
CREATE OR REFRESHoREFRESHaggiornano una tabella di streaming, le funzioni di filtro di riga vengono eseguite con i diritti del definitore (come proprietario della tabella). Ciò significa che l'aggiornamento della tabella usa il contesto di sicurezza dell'utente che ha creato la tabella di streaming. -
Query: mentre la maggior parte dei filtri viene eseguita con i diritti del definer, le funzioni che controllano il contesto utente (ad esempio
CURRENT_USEReIS_MEMBER) sono eccezioni. Queste funzioni vengono eseguite come chi le invoca. Questo approccio applica controlli di accesso e sicurezza dei dati specifici dell'utente in base al contesto dell'utente corrente.
Osservabilità
Usare DESCRIBE EXTENDED, INFORMATION_SCHEMAo Esplora cataloghi per esaminare i filtri di riga e le maschere di colonna esistenti applicabili a una determinata tabella di streaming. Questa funzionalità consente agli utenti di controllare ed esaminare le misure di accesso e protezione dei dati nelle tabelle di streaming.
Limiti
- Solo i proprietari delle tabelle possono aggiornare le tabelle di streaming per ottenere i dati più recenti.
- I comandi
ALTER TABLEnon sono consentiti nelle tabelle di streaming. La definizione e le proprietà della tabella devono essere modificate tramite l'istruzioneCREATE OR REFRESHo ALTER STREAMING TABLE. - L'evoluzione dello schema di tabella tramite comandi DML come
INSERT INTOeMERGEnon è supportata. - I comandi seguenti non sono supportati nelle tabelle di streaming:
CREATE TABLE ... CLONE <streaming_table>COPY INTOANALYZE TABLERESTORETRUNCATEGENERATE MANIFEST[CREATE OR] REPLACE TABLE
- La condivisione Delta non è supportata.
- La rinominazione della tabella o la modifica del proprietario non è supportata.
- I vincoli di tabella,
PRIMARY KEYad esempio eFOREIGN KEY, non sono supportati per le tabelle di streaming nelhive_metastorecatalogo. - Le colonne generate, le colonne identità e le colonne predefinite non sono supportate.
Esempi
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Creates a streaming table that scheduled to refresh when upstream data is updated.
-- The refresh frequency of triggered_data is at most once an hour.
> CREATE STREAMING TABLE triggered_data
TRIGGER ON UPDATE AT MOST EVERY INTERVAL 1 hour
AS SELECT *
FROM STREAM source_stream_data;
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE EVERY 1 HOUR
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM STREAM sales;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')