Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Gunakan pernyataan CREATE FLOW untuk membuat alur atau pengisian ulang bagi tabel dalam sebuah pipeline.
Syntax
CREATE FLOW flow_name [COMMENT comment] AS
{
AUTO CDC INTO target_table create_auto_cdc_flow_spec |
INSERT INTO [ONCE] target_table BY NAME query
}
Parameter-parameternya
flow_name
Nama alur yang akan dibuat.
KOMENTAR
Deskripsi opsional untuk alur.
-
Perintah
AUTO CDC ... INTOyang menentukan alur, dengancreate_auto_cdc_flow_spec. Anda harus menyertakanAUTO CDC ... INTOpernyataan, atauINSERT INTOpernyataan. GunakanAUTO CDC ... INTOsaat kueri sumber menggunakan semantik data perubahan.Untuk informasi selengkapnya, lihat AUTO CDC INTO (pipeline).
target_table
Tabel yang akan diperbarui. Ini harus berupa tabel streaming.
INSERT KE
Menentukan suatu kueri tabel yang disisipkan ke dalam tabel target.
ONCEJika opsi tidak disediakan, kueri harus berupa kueri streaming. Gunakan kata kunci STREAM untuk menggunakan semantik streaming untuk membaca dari sumbernya. Jika pembacaan mengalami perubahan atau penghapusan pada rekaman yang ada, akan menghasilkan kesalahan. Paling aman untuk membaca dari sumber statis atau yang hanya bisa ditambahkan. Untuk memasukkan data yang memiliki komit perubahan, Anda dapat menggunakan Python dan opsiSkipChangeCommitsuntuk menangani kesalahan.INSERT INTOsaling eksklusif denganAUTO CDC ... INTO. GunakanAUTO CDC ... INTOsaat data sumber menyertakan fungsionalitas change data capture (CDC). GunakanINSERT INTOketika sumber tidak melakukannya.Untuk informasi selengkapnya tentang data streaming, lihat Mengubah data dengan alur.
SEKALI
Secara opsional tentukan alur sebagai aliran satu kali, seperti isi ulang. Dengan menggunakan
ONCE, alurnya dapat diubah dengan dua cara:- Sumber
queryataucreate_auto_cdc_flow_specbukan tabel streaming. - Alur dijalankan satu kali secara default. Jika pipeline diperbarui dengan refresh lengkap, maka alur
ONCEakan berjalan lagi untuk membuat ulang data.
- Sumber
Examples
-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;
-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);
-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;
-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;
-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;