Condividi tramite


CREATE STREAMING TABLE (pipeline)

Una tabella di streaming è una tabella con supporto per lo streaming o l'elaborazione incrementale dei dati. Le tabelle di streaming sono alimentate dalle pipeline. Ogni volta che viene aggiornata una tabella di streaming, i dati aggiunti alle tabelle di origine vengono aggiunti alla tabella di streaming. È possibile aggiornare le tabelle di streaming manualmente o in base a una pianificazione.

Per altre informazioni su come eseguire o pianificare gli aggiornamenti, vedere Eseguire un aggiornamento della pipeline.

Sintassi

CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ column_constraint ] [, ...]
    [ , table_constraint ] [...] )

   column_properties
      { NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]

table_clauses
  { USING DELTA
    PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    LOCATION path |
    COMMENT view_comment |
    TBLPROPERTIES clause |
    WITH { ROW FILTER clause } } [ ... ]

Parametri

  • REFRESH

    Se specificato, creerà la tabella o aggiornerà una tabella esistente e il relativo contenuto.

  • PRIVATO

    Crea una tabella di streaming privata.

    • Non vengono aggiunti al catalogo e sono accessibili solo all'interno della pipeline di definizione
    • Possono avere lo stesso nome di un oggetto esistente nel catalogo. All'interno della pipeline, se una tabella di streaming privata e un oggetto nel catalogo hanno lo stesso nome, i riferimenti al nome si risolveranno nella tabella di streaming privata.
    • Le tabelle di streaming private vengono mantenute solo per tutta la durata della pipeline, non solo per un singolo aggiornamento.

    Le tabelle di streaming private sono state create in precedenza con il TEMPORARY parametro .

  • table_name

    Nome della tabella appena creata. Il nome della tabella, completo e qualificato, deve essere univoco.

  • specifica_tavola

    Questa clausola facoltativa definisce l'elenco di colonne, i relativi tipi, proprietà, descrizioni e vincoli di colonna.

  • vincolo_di_tabella

    Importante

    Questa funzionalità è in Anteprima Pubblica.

    Quando si specifica uno schema, è possibile definire chiavi primarie ed esterne. I vincoli sono informativi e non vengono applicati. Consulta la clausola CONSTRAINT nella guida di riferimento del linguaggio SQL.

    Annotazioni

    Per definire i vincoli di tabella, la pipeline deve avere il Catalogo Unity abilitato.

  • table_clauses

    Facoltativamente, specificare il partizionamento, i commenti e le proprietà definite dall'utente per la tabella. Ogni clausola secondaria può essere specificata una sola volta.

    • USO DI DELTA

      Specifica il formato dati. L'unica opzione è DELTA.

      Questa clausola è facoltativa e l'impostazione predefinita è DELTA.

    • PARTIZIONATO PER

      Elenco facoltativo di una o più colonne da utilizzare per il partizionamento nella tabella. Si escludono con CLUSTER BY a vicenda.

      Il clustering liquido offre una soluzione flessibile e ottimizzata per il clustering. È consigliabile usare CLUSTER BY anziché PARTITIONED BY per le pipeline.

    • CLUSTER BY

      Abilitare il clustering liquido nella tabella e definire le colonne da usare come chiavi di clustering. Usare il clustering liquido automatico con CLUSTER BY AUTOe Databricks sceglie in modo intelligente le chiavi di clustering per ottimizzare le prestazioni delle query. Si escludono con PARTITIONED BY a vicenda.

      Vedere Usare clustering liquido per le tabelle.

    • UBICAZIONE

      Posizione di archiviazione facoltativa per i dati della tabella. Se non impostato, il sistema userà per impostazione predefinita il percorso di archiviazione della pipeline.

    • COMMENTO

      Un valore letterale facoltativo STRING per descrivere la tabella.

    • TBLPROPERTIES

      Elenco facoltativo delle proprietà della tabella.

    • CON ROW FILTER

    Importante

    Questa funzionalità è in Anteprima Pubblica.

    Aggiunge una funzione di filtro di riga alla tabella. Le future query per tale tabella ricevono un sottoinsieme delle righe per cui la funzione restituisce TRUE. Ciò è utile per il controllo di accesso con granularità fine, perché consente alla funzione di controllare l'identità e le appartenenze ai gruppi dell'utente che richiama per decidere se filtrare determinate righe.

    Vedere la clausola ROW FILTER.

  • quesito

    Questa clausola popola la tabella usando i dati di query. Questa query deve essere una query di streaming. Utilizzare la parola chiave STREAM per utilizzare la semantica di streaming per leggere dalla sorgente. Se la lettura rileva una modifica o un'eliminazione in un record esistente, viene generato un errore. È più sicuro leggere da fonti statiche o a solo aggiunta. Per inserire dati con commit delle modifiche, è possibile usare Python e l'opzione SkipChangeCommits per gestire gli errori.

    Quando si specifica un query e un table_specification insieme, lo schema della tabella specificato in table_specification deve contenere tutte le colonne restituite dal query, in caso contrario viene visualizzato un errore. Qualsiasi colonna specificata in table_specification ma non restituita da query restituisce valori null quando viene eseguita una query.

    Per ulteriori informazioni sui dati di streaming, vedere Trasformare i dati con le pipeline.

Autorizzazioni necessarie

L'utente run-as per una pipeline deve avere le autorizzazioni seguenti:

  • SELECT privilegi sulle tabelle di base a cui fa riferimento la tabella di streaming.
  • Il privilegio USE CATALOG sul catalogo padre e il privilegio USE SCHEMA sullo schema padre.
  • CREATE MATERIALIZED VIEW privilegio sullo schema della tabella di streaming.

Affinché un utente possa aggiornare la pipeline all'interno della quale è definita la tabella di streaming, è necessario:

  • Il privilegio USE CATALOG sul catalogo padre e il privilegio USE SCHEMA sullo schema padre.
  • Proprietà della tabella di streaming o privilegio sulla tabella di streaming.
  • Il proprietario della tabella di streaming deve avere il SELECT privilegio sulle tabelle di base a cui fa riferimento la tabella di streaming.

Affinché un utente sia in grado di eseguire query sulla tabella di streaming risultante, è necessario:

  • Il privilegio USE CATALOG sul catalogo padre e il privilegio USE SCHEMA sullo schema padre.
  • SELECT privilegio sulla tabella di streaming.

Limitazioni

  • Solo i proprietari delle tabelle possono aggiornare le tabelle di streaming per ottenere i dati più recenti.
  • I comandi ALTER TABLE non sono consentiti nelle tabelle di streaming. La definizione e le proprietà della tabella devono essere modificate tramite l'istruzione CREATE OR REFRESH o ALTER STREAMING TABLE.
  • L'evoluzione dello schema di tabella tramite comandi DML come INSERT INTOe MERGE non è supportata.
  • I comandi seguenti non sono supportati nelle tabelle di streaming:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • La rinominazione della tabella o la modifica del proprietario non è supportata.
  • Le colonne generate, le colonne identità e le colonne predefinite non sono supportate.

Esempi

-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")

-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;