CREATE STREAMING TABLE
Si applica a: Databricks SQL Databricks Runtime 13.3 LTS e versioni successive
Importante
Questa funzionalità è disponibile in anteprima pubblica. Per iscriversi per l'accesso, compilare questo modulo.
Crea una tabella di streaming, una tabella Delta con supporto aggiuntivo per lo streaming o l'elaborazione incrementale dei dati.
Le tabelle di streaming sono supportate solo nelle tabelle Live Delta e in Databricks SQL con Unity Catalog. L'esecuzione di questo comando nel calcolo di Databricks Runtime supportato analizza solo la sintassi. Vedere Implementare una pipeline di tabelle live Delta con SQL.
Sintassi
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( [ column_identifier column_type [ NOT NULL ]
[ COMMENT column_comment ] [ column_constraint ]
] [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] } [...]
Parametri
REFRESH
Se specificato, aggiorna la tabella con i dati più recenti disponibili dalle origini definite nella query. Vengono elaborati solo i nuovi dati che arrivano prima dell'avvio della query. I nuovi dati aggiunti alle origini durante l'esecuzione del comando vengono ignorati fino al successivo aggiornamento.
SE NON ESISTE
Se specificato e esiste già una tabella con lo stesso nome, l'istruzione viene ignorata.
IF NOT EXISTS
non può essere usato insieme aREFRESH
, il che significaCREATE OR REFRESH TABLE IF NOT EXISTS
che non è consentito.-
Nome della tabella da creare. Il nome non deve includere una specifica temporale. Se il nome non è qualificato, la tabella viene creata nello schema corrente.
table_specification
Questa clausola facoltativa definisce l'elenco di colonne, i relativi tipi, proprietà, descrizioni e vincoli di colonna.
Se non si definiscono colonne nello schema della tabella, è necessario specificare
AS query
.-
Nome univoco per la colonna.
-
Specifica il tipo di dati della colonna.
NOT NULL
Se la colonna specificata non accetta
NULL
valori.COMMENT column_comment
Valore letterale stringa per descrivere la colonna.
-
Importante
Questa funzionalità è disponibile in anteprima pubblica.
Aggiunge una chiave primaria o un vincolo di chiave esterna alla colonna in una tabella di streaming. I vincoli non sono supportati per le tabelle nel
hive_metastore
catalogo. CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]
Aggiunge aspettative sulla qualità dei dati alla tabella. Queste aspettative sulla qualità dei dati possono essere rilevate nel tempo e accessibili tramite il registro eventi della tabella di streaming. Una
FAIL UPDATE
previsione causa un errore di elaborazione quando si crea la tabella e si aggiorna la tabella. SeDROP ROW
l'aspettativa non viene soddisfatta, l'intera riga viene eliminata.expectation_expr
può essere composto da valori letterali, identificatori di colonna all'interno della tabella e funzioni SQL predefinite o operatori, ad eccezione di:- Funzioni di aggregazione
- Funzioni della finestra analitica
- Funzioni della finestra di classificazione
- Funzioni generatore con valori di tabella
Inoltre
expr
, non deve contenere alcuna sottoquery.- Funzioni di aggregazione
-
Importante
Questa funzionalità è disponibile in anteprima pubblica.
Aggiunge una chiave primaria informativa o vincoli di chiave esterna informativa a una tabella di streaming. I vincoli di chiave non sono supportati per le tabelle nel
hive_metastore
catalogo.
-
-
table_clauses
Facoltativamente, specificare partizionamento, commenti, proprietà definite dall'utente e una pianificazione di aggiornamento per la nuova tabella. Ogni clausola secondaria può essere specificata una sola volta.
-
Elenco facoltativo di colonne della tabella per partizionare la tabella.
COMMENT table_comment
Valore
STRING
letterale per descrivere la tabella.-
Facoltativamente, imposta una o più proprietà definite dall'utente.
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ]
Se specificato, pianifica la tabella di streaming o la vista materializzata per aggiornare i dati con la pianificazione cron di quarzi specificata. Vengono accettati solo time_zone_values .
AT TIME ZONE LOCAL
non è supportata. SeAT TIME ZONE
è assente, viene usato il fuso orario della sessione. SeAT TIME ZONE
è assente e il fuso orario della sessione non è impostato, viene generato un errore.SCHEDULE
è semanticamente equivalente aSCHEDULE REFRESH
.Non è possibile usare la
SCHEDULE
sintassi in una definizione di pipeline Delta Live Tables.La
SCHEDULE
clausola non è consentita in unCREATE OR REFRESH
comando. La pianificazione può essere specificata come parte delCREATE
comando . Usare ALTER STREAMING TABLE per modificare la pianificazione di una tabella di streaming dopo la creazione.
-
-
Questa clausola popola la tabella usando i dati di
query
. Questa query deve essere una query di streaming . A tale scopo, è possibile aggiungere laSTREAM
parola chiave a qualsiasi relazione da elaborare in modo incrementale. Quando si specifica un e untable_specification
query
insieme, lo schema della tabella specificato intable_specification
deve contenere tutte le colonne restituite daquery
, in caso contrario viene visualizzato un errore. Qualsiasi colonna specificata intable_specification
ma non restituita daiquery
null
valori restituiti quando viene eseguita una query.Questa clausola è necessaria per le tabelle di streaming create in Databricks SQL, ma non necessarie nelle tabelle Live Delta. Se questa clausola non è disponibile in Tabelle live Delta, è necessario fare riferimento a questa tabella in un
APPLY CHANGES
comando nella pipeline DLT. Vedere Change Data Capture con SQL in Tabelle live Delta.
Differenze tra tabelle di streaming e altre tabelle
Le tabelle di streaming sono tabelle con stato, progettate per gestire ogni riga una sola volta durante l'elaborazione di un set di dati in crescita. Poiché la maggior parte dei set di dati aumenta continuamente nel tempo, le tabelle di streaming sono valide per la maggior parte dei carichi di lavoro di inserimento. Le tabelle di streaming sono ottimali per le pipeline che richiedono aggiornamento dei dati e bassa latenza. Le tabelle di streaming possono essere utili anche per le trasformazioni su larga scala, poiché i risultati possono essere calcolati in modo incrementale man mano che arrivano nuovi dati, mantenendo i risultati aggiornati senza dover ricompilare completamente tutti i dati di origine con ogni aggiornamento. Le tabelle di streaming sono progettate per le origini dati che sono di sola accodamento.
Le tabelle di streaming accettano comandi aggiuntivi, ad REFRESH
esempio , che elabora i dati più recenti disponibili nelle origini fornite nella query. Le modifiche apportate alla query fornita vengono riflesse solo sui nuovi dati chiamando un REFRESH
oggetto , non elaborato in precedenza. Per applicare anche le modifiche ai dati esistenti, è necessario eseguire REFRESH TABLE <table_name> FULL
per eseguire un oggetto FULL REFRESH
. L'aggiornamento completo riprocessa tutti i dati disponibili nell'origine con la definizione più recente. Non è consigliabile chiamare aggiornamenti completi sulle origini che non mantengono l'intera cronologia dei dati o hanno brevi periodi di conservazione, ad esempio Kafka, perché l'aggiornamento completo tronca i dati esistenti. Potrebbe non essere possibile recuperare i dati obsoleti se i dati non sono più disponibili nell'origine.
Limiti
Solo i proprietari delle tabelle possono aggiornare le tabelle di streaming per ottenere i dati più recenti.
ALTER TABLE
i comandi non sono consentiti nelle tabelle di streaming. La definizione e le proprietà della tabella devono essere modificate tramite l'istruzioneALTER STREAMING TABLE
.Le query di spostamento temporale non sono supportate.
L'evoluzione dello schema di tabella tramite comandi DML come
INSERT INTO
eMERGE
non è supportata.I comandi seguenti non sono supportati nelle tabelle di streaming:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
La condivisione differenziale non è supportata.
La ridenominazione della tabella o la modifica del proprietario non è supportata.
I vincoli di tabella,
PRIMARY KEY
ad esempio eFOREIGN KEY
, non sono supportati.Le colonne generate, le colonne Identity e le colonne predefinite non sono supportate.
Esempi
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');