Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Lakeflow Spark Bildirimli İşlem Hatları'nın değişiklik veri yakalama (CDC) işlevini kullanan bir akış oluşturmak için AUTO CDC ... INTO deyimini kullanın. Bu ifade, CDC kaynağındaki değişiklikleri okur ve bunları hedef akışa uygular.
- CDC hakkında bilgi edinmek için bkz. Değişiklik veri yakalama (CDC) nedir?.
-
AUTO CDCkullanımı hakkında daha fazla ayrıntı için bkz. AUTO CDC API'leri: İşlem hatlarıyla değişiklik verileri yakalamayı basitleştirme. - hakkında
CREATE FLOWdaha fazla ayrıntı için bkz. CREATE FLOW (işlem hatları).
Sözdizimi
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Diğer işlem hattı sorgularıyla aynı CONSTRAINT yan tümceyi kullanarak hedef için veri kalitesi kısıtlamaları tanımlarsınız. bkz. İşlem hattı beklentileriyle veri kalitesini yönetme.
INSERT ve UPDATE olayları için varsayılan davranış, kaynaktan UPDATE olaylarını güncelleştirmek veya eklemektir: hedef tabloda belirtilen anahtarla eşleşen satırları güncelleyin veya hedef tabloda eşleşen bir kayıt yoksa yeni bir satır ekleyin.
DELETE olayları için işleme APPLY AS DELETE WHEN koşuluyla belirtilebilir.
Önemli
Değişiklikleri uygulamak için bir hedef akış tablosu bildirmeniz gerekir. İsteğe bağlı olarak hedef tablonuzun şemasını belirtebilirsiniz. SCD tür 2 tabloları için, hedef tablonun şemasını belirtirken, __START_AT sütunuyla aynı veri türüne sahip olan __END_AT ve sequence_by sütunlarını da eklemeniz gerekir.
Bkz AUTO CDC API'leri: İşlem hatlarıyla değişiklik verilerini yakalamayı basitleştirin.
Parametreler
flow_nameOluşturulacak akışın adı.
sourceVerilerin kaynağı. Kaynak bir akış kaynağı olmalıdır. Kaynaktan okumak üzere akış semantiğini kullanmak için STREAM anahtar sözcüğünü kullanın. Okuma işlemi var olan bir kayıtta bir değişiklik veya silme işlemiyle karşılaşırsa bir hata oluşur. Statik veya yalnızca ekleme kaynaklarından okumak en güvenlidir. Değişiklik taahhütleri içeren verileri içeri çekmek için Python'ı ve
SkipChangeCommitsseçeneğini hataları işlemek için kullanabilirsiniz.Akış verileri hakkında daha fazla bilgi için bkz. İşlem hatları ile veri dönüştürme.
KEYSKaynak verilerdeki bir satırı benzersiz olarak tanımlayan sütun veya sütun bileşimi. Bu sütunlardaki değerler, hedef tablodaki belirli kayıtlara hangi CDC olaylarının uygulanacağını belirlemek için kullanılır.
Sütunların birleşimini tanımlamak için virgülle ayrılmış sütun listesi kullanın.
Bu ana tümce gereklidir.
IGNORE NULL UPDATESHedef sütunların bir alt kümesini içeren güncelleştirmelerin alımına izin verir. CDC olayı mevcut bir satırla eşleştiğinde ve NULL GÜNCELLEŞTIRMELERİ YOKSAY belirtildiğinde, değeri
nullolan sütunlar hedefteki mevcut değerlerini korur. Bu, bir değeri olannulliç içe sütunlar için de geçerlidir.Bu yan tümce isteğe bağlıdır.
Varsayılan ayar,
nulldeğerlerinin mevcut sütunların üzerine yazılmasıdır.APPLY AS DELETE WHENBir CDC olayının ne zaman upsert yerine bir
DELETEolarak ele alınacağı belirtir.SCD tip 2 kaynaklar için, sırası bozuk verileri işlemek amacıyla, silinen satır temel Delta tablosunda geçici olarak bir "mezar taşı" olarak tutulur ve meta veri deposunda bu "mezar taşlarını" filtreleyen bir görünüm oluşturulur. Bekletme aralığı
pipelines.cdc.tombstoneGCThresholdInSecondsyapılandırılabilir.Bu yan tümce isteğe bağlıdır.
APPLY AS TRUNCATE WHENBir CDC olayının tam tablo
TRUNCATEolarak ne zaman ele alınacağı belirtir. Bu yan tümce hedef tablonun tam kesilmesini tetiklediğinden, yalnızca bu işlevi gerektiren belirli kullanım örnekleri için kullanılmalıdır.APPLY AS TRUNCATE WHENyan tümcesi yalnızca SCD türü 1 için desteklenir. SCD tür 2, kesme işlemini desteklemez.Bu yan tümce isteğe bağlıdır.
SEQUENCE BYKaynak verilerdeki CDC olaylarının mantıksal sırasını belirten sütun adı. İşlem hattı işleme, sıralama dışı gelen değişiklik olaylarını işlemek için bu sıralamayı kullanır.
Sıralama için birden çok sütun gerekiyorsa bir
STRUCTifade kullanın: önce ilk yapı alanına göre, sonra bir bağlama varsa ikinci alana göre sıralanır ve bu şekilde devam eder.Belirtilen sütunlar sıralanabilir veri türleri olmalıdır.
Bu madde gereklidir.
COLUMNSHedef tabloya eklenecek sütunların bir alt kümesini belirtir. Şunlardan birini yapabilirsiniz:
- Eklenecek sütunların tam listesini belirtin:
COLUMNS (userId, name, city). - Dışlanması gereken sütunların listesini belirtin:
COLUMNS * EXCEPT (operation, sequenceNum)
Bu yan tümce isteğe bağlıdır.
Varsayılan değer,
COLUMNSyan tümcesi belirtilmediğinde tüm sütunları hedef tabloya eklemektir.- Eklenecek sütunların tam listesini belirtin:
STORED ASKayıtların SCD türü 1 veya SCD tür 2 olarak depolanması.
Bu yan tümce isteğe bağlıdır.
Varsayılan değer SCD tür 1'dir.
TRACK HISTORY ONBelirtilen sütunlarda herhangi bir değişiklik olduğunda geçmiş kayıtları oluşturmak için çıkış sütunlarının bir alt kümesini belirtir. Şunlardan birini yapabilirsiniz:
- İzlenen sütunların tam listesini belirtin:
COLUMNS (userId, name, city). - İzlemenin dışında tutulacak sütunların listesini belirtin:
COLUMNS * EXCEPT (operation, sequenceNum)
Bu yan tümce isteğe bağlıdır. Varsayılan değer, herhangi bir değişiklik olduğunda tüm çıkış sütunlarının geçmişini izlemektir ve ile eşdeğerdir
TRACK HISTORY ON *.- İzlenen sütunların tam listesini belirtin:
Örnekler
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);