Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
CREATE FLOW ifadesini bir işlem hattındaki tablolar için akışlar veya geri yüklemeler oluşturmak için kullanın.
Sözdizimi
CREATE FLOW flow_name [COMMENT comment] AS
{
AUTO CDC INTO target_table create_auto_cdc_flow_spec |
INSERT INTO [ONCE] target_table BY NAME query
}
Parametreler
flow_name
Oluşturulacak akışın adı.
YORUM
Akış için isteğe bağlı bir açıklama.
-
AUTO CDC ... INTOile akışı tanımlayan bircreate_auto_cdc_flow_specifadesi. Ya birAUTO CDC ... INTOifadesi ya da birINSERT INTOifadesi eklemelisiniz. Kaynak sorgu değişiklik verisi semantiğini kullandığında kullanınAUTO CDC ... INTO.Daha fazla bilgi için bkz. AUTO CDC INTO (işlem hatları).
target_table
Güncelleştirilecek tablo. Bu bir Akış tablosu olmalıdır.
INSERT İÇİNE
Hedef tabloya eklenen tablo sorgusunu tanımlar. Seçenek
ONCE, verilmezse sorgu bir akış sorgusu olmalıdır. Kaynaktan okumak üzere akış semantiğini kullanmak için STREAM anahtar sözcüğünü kullanın. Okuma işlemi var olan bir kayıtta bir değişiklik veya silme işlemiyle karşılaşırsa bir hata oluşur. Statik veya yalnızca ekleme kaynaklarından okumak en güvenlidir. Değişiklik taahhütleri içeren verileri içeri çekmek için Python'ı veSkipChangeCommitsseçeneğini hataları işlemek için kullanabilirsiniz.INSERT INTOileAUTO CDC ... INTObirbirini dışlar. Kaynak veriler değişiklik verisi yakalama (CDC) işlevselliği içerdiğinde kullanınAUTO CDC ... INTO. Kaynakta kullanılmadığındaINSERT INTOkullanın.Akış verileri hakkında daha fazla bilgi için bkz. İşlem hatları ile veri dönüştürme.
BİR DEFA
İsteğe bağlı olarak akışı bir kerelik akış olarak (örneğin, bir geri doldurma) tanımlayın. Kullanımı
ONCE, akışı iki şekilde değiştirir:- Kaynak
queryveyacreate_auto_cdc_flow_specbir akış tablosu değil. - Akış varsayılan olarak bir kez çalıştırılır. İşlem hattı eksiksiz bir yenilemeyle güncelleştirilirse
ONCEakış yeniden çalıştırarak verileri yeniden oluşturur.
- Kaynak
Örnekler
-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;
-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users BY NAME
SELECT * FROM stream(raw_data.users);
-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users BY NAME
SELECT * FROM user_backfill_table;
-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;
-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;