Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Usare l'istruzione CREATE FLOW per creare flussi o backfill per le tabelle in una pipeline.
Sintassi
CREATE FLOW flow_name [COMMENT comment] AS
{
AUTO CDC INTO target_table create_auto_cdc_flow_spec |
INSERT INTO [ONCE] target_table BY NAME query
}
Parametri
flow_name
Nome del flusso da creare.
COMMENTO
Descrizione facoltativa per il flusso.
-
Istruzione
AUTO CDC ... INTOche definisce il flusso, con un elementocreate_auto_cdc_flow_spec. È necessario includere un'istruzioneAUTO CDC ... INTOo un'istruzioneINSERT INTO. UsareAUTO CDC ... INTOquando la query di origine usa la semantica delle modifiche ai dati.Per altre informazioni, vedere AUTO CDC INTO (pipelines).
target_table
Tabella da aggiornare. Deve trattarsi di una tabella di streaming.
INSERT IN
Definisce una query di tabella che viene inserita nella tabella di destinazione. Se l'opzione
ONCEnon viene specificata, la query deve essere una query di streaming . Utilizzare la parola chiave STREAM per utilizzare la semantica di streaming per leggere dalla sorgente. Se la lettura rileva una modifica o un'eliminazione in un record esistente, viene generato un errore. È più sicuro leggere da fonti statiche o a solo aggiunta. Per inserire dati con commit delle modifiche, è possibile usare Python e l'opzioneSkipChangeCommitsper gestire gli errori.INSERT INTOè mutuamente esclusivo rispetto aAUTO CDC ... INTO. UsareAUTO CDC ... INTOquando i dati di origine includono la funzionalità Change Data Capture (CDC). UsareINSERT INTOquando la sorgente non lo fa.Per ulteriori informazioni sui dati di streaming, vedere Trasformare i dati con le pipeline.
Una volta
È possibile definire il flusso come un flusso singolo, ad esempio un backfill. L'uso di
ONCEmodifica il flusso in due modi:- L'origine
queryocreate_auto_cdc_flow_specnon è una tabella di streaming. - Il flusso viene eseguito una sola volta per impostazione predefinita. Se la pipeline viene aggiornata con un aggiornamento completo, il
ONCEflusso verrà eseguito di nuovo per ricreare i dati.
- L'origine
Esempi
-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;
-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);
-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;
-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;
-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;