Condividi tramite


CREA FLUSSO (pipeline)

Usare l'istruzione CREATE FLOW per creare flussi o backfill per le tabelle in una pipeline.

Sintassi

CREATE FLOW flow_name [COMMENT comment] AS
{
  AUTO CDC INTO target_table create_auto_cdc_flow_spec |
  INSERT INTO [ONCE] target_table BY NAME query
}

Parametri

  • flow_name

    Nome del flusso da creare.

  • COMMENTO

    Descrizione facoltativa per il flusso.

  • AUTO CDC INTO

    Istruzione AUTO CDC ... INTO che definisce il flusso, con un elemento create_auto_cdc_flow_spec. È necessario includere un'istruzione AUTO CDC ... INTO o un'istruzione INSERT INTO . Usare AUTO CDC ... INTO quando la query di origine usa la semantica delle modifiche ai dati.

    Per altre informazioni, vedere AUTO CDC INTO (pipelines).

  • target_table

    Tabella da aggiornare. Deve trattarsi di una tabella di streaming.

  • INSERT IN

    Definisce una query di tabella che viene inserita nella tabella di destinazione. Se l'opzione ONCE non viene specificata, la query deve essere una query di streaming . Utilizzare la parola chiave STREAM per utilizzare la semantica di streaming per leggere dalla sorgente. Se la lettura rileva una modifica o un'eliminazione in un record esistente, viene generato un errore. È più sicuro leggere da fonti statiche o a solo aggiunta. Per inserire dati con commit delle modifiche, è possibile usare Python e l'opzione SkipChangeCommits per gestire gli errori.

    INSERT INTO è mutuamente esclusivo rispetto a AUTO CDC ... INTO. Usare AUTO CDC ... INTO quando i dati di origine includono la funzionalità Change Data Capture (CDC). Usare INSERT INTO quando la sorgente non lo fa.

    Per ulteriori informazioni sui dati di streaming, vedere Trasformare i dati con le pipeline.

  • Una volta

    È possibile definire il flusso come un flusso singolo, ad esempio un backfill. L'uso di ONCE modifica il flusso in due modi:

    • L'origine query o create_auto_cdc_flow_spec non è una tabella di streaming.
    • Il flusso viene eseguito una sola volta per impostazione predefinita. Se la pipeline viene aggiornata con un aggiornamento completo, il ONCE flusso verrà eseguito di nuovo per ricreare i dati.

Esempi

-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;

-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);

-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;

-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;

-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;