Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Lakeflow Spark Bildirimli İşlem Hatları, değişiklik veri yakalamayı (CDC) AUTO CDC ve AUTO CDC FROM SNAPSHOT API'leriyle basitleştirir. Bu API'ler, CDC akışından veya veritabanı anlık görüntülerinden yavaş değişen boyutların (SCD) Tür 1 ve Tür 2'nin işlem karmaşıklığını otomatikleştirir. Bu kavramlar hakkında daha fazla bilgi edinmek için bkz. Veri yakalamayı ve anlık görüntüleri değiştirme.
Uyarı
AUTO CDC API'ler, APPLY CHANGES API'lerin yerini alır ve aynı söz dizimine sahip.
APPLY CHANGES API'ler hala kullanılabilir durumdadır, ancak Databricks yerine AUTO CDC API'lerin kullanılmasını önerir.
Kullandığınız API, değişiklik verilerinizin kaynağına bağlıdır:
-
AUTO CDC: Kaynak veritabanında CDC akışı etkinleştirilmişse bunu kullanın.AUTO CDCdeğişiklik veri akışından (CDF) değişiklikleri işler. Hem istemci SQL hem de Python arabirimlerinde desteklenir. -
AUTO CDC FROM SNAPSHOT: Kaynak veritabanında CDC etkinleştirilmediğinde ve yalnızca anlık görüntüler kullanılabiliyorsa bunu kullanın. Bu API, değişiklikleri belirlemek için anlık görüntüleri karşılaştırır ve sonra bunları işler. Yalnızca Python arabiriminde desteklenir.
Her iki API de SCD Tür 1 ve Tür 2 kullanılarak tabloların güncelleştirilmesini destekler:
- Kayıtları doğrudan güncelleştirmek için SCD Tür 1'i kullanın. Güncelleştirilmiş kayıtlar için geçmiş korunmaz.
- Tüm güncelleştirmelerde veya belirtilen sütun kümesi güncelleştirmelerinde kayıtların geçmişini korumak için SCD Tür 2'yi kullanın.
API'ler AUTO CDC Apache Spark Bildirimli İşlem Hatları tarafından desteklenmez.
Sentaks ve diğer başvurular için bkz. AUTO CDC INTO (pipelines), create_auto_cdc_flow ve create_auto_cdc_from_snapshot_flow.
Uyarı
Bu sayfada, kaynak verilerdeki değişikliklere göre işlem hatlarınızdaki tabloların nasıl güncelleştirildiği açıklanır. Delta tabloları için satır düzeyi değişiklik bilgilerini kaydetmeyi ve sorgulamayı öğrenmek için bkz. Azure Databricks'te Delta Lake değişiklik veri akışını kullanma.
Gereksinimler
CDC API'lerini kullanmak için işlem hattınızın sunucusuz SDP veya SDPPro ya da Advanced sürümleri kullanacak şekilde yapılandırılması gerekir.
AUTO CDC nasıl çalışır?
AUTO CDC ile CDC işlemesi gerçekleştirmek için bir akış tablosu oluşturun ve ardından SQL'de AUTO CDC ... INTO deyimini veya Python’da create_auto_cdc_flow() işlevini kullanarak kaynak, anahtarlar ve değişiklik akışı için sıralama belirtin. Sıralama ve SCD mantığının nasıl çalıştığına ilişkin bir açıklama için bkz. Veri yakalamayı ve anlık görüntüleri değiştirme.
AUTO CDC örneklerine bakın.
Değişiklik akışı olan bir kaynaktan ilk hidrasyon için bir akışla AUTO CDC kullanın once ve ardından değişiklik akışını işlemeye devam edin. Bkz . AUTO CDC kullanarak dış RDBMS tablosunu çoğaltma.
Söz dizimi ayrıntıları için bkz. AUTO CDC INTO (pipelines) veya create_auto_cdc_flow.
AUTO CDC ANLIK GÖRÜNTÜDEN nasıl çalışır?
AUTO CDC FROM SNAPSHOT sıralı anlık görüntüleri karşılaştırarak kaynak verilerdeki değişiklikleri belirler. Yalnızca Python işlem hattı arabiriminde desteklenir. Delta tablosundan, bulut depolama dosyalarından veya JDBC'den anlık görüntüleri doğrudan okuyabilirsiniz.
CDC işlemesini AUTO CDC FROM SNAPSHOT ile gerçekleştirmek için bir akış tablosu oluşturun ve ardından create_auto_cdc_from_snapshot_flow() işlevini kullanarak anlık görüntüyü, anahtarları ve diğer bağımsız değişkenleri belirtin. İki veri alımı modeli ve her birinin ne zaman kullanılacağı hakkında ayrıntılı bilgi için bkz Anlık görüntü işleme desenleri.
AUTO CDC FROM SNAPSHOT örneklerine bakın.
Söz dizimi ayrıntıları için bkz. create_auto_cdc_from_snapshot_flow.
Sıralama için birden çok sütun kullanma
Birden çok sütuna göre sıralama yapmak için (örneğin, bağlamları çözmek için bir zaman damgası ve kimlik numarasını kullanarak), bunları birleştirmek için bir STRUCT kullanın. API ilk alana göre sipariş eder ve bir eşitlik durumunda ikinci alanı dikkate alır ve bu şekilde devam eder.
SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)
Piton
sequence_by = struct("timestamp_col", "id_col")
AUTO CDC örnekleri
Aşağıdaki örneklerde, değişiklik veri akışı kaynağı kullanılarak SCD Tür 1 ve Tür 2 işleme gösterilmektedir. Örnek veriler yeni kullanıcı kayıtları oluşturur, kullanıcı kaydını siler ve kullanıcı kayıtlarını güncelleştirir. SCD Tür 1 örneğinde, son UPDATE işlemler geç gelirler ve olayların sırasız işlenmesini göstererek hedef tablodan çıkarılır.
Bu örneklerde kullanılan giriş kayıtları aşağıda verilmiştir. Bu veriler, Örnek veri oluşturma bölümünde sorgu çalıştırılarak oluşturulur.
| userId | name | city | Operasyon | sıraNumarası |
|---|---|---|---|---|
| 124 | Raul | Oaxaca | INSERT | 1 |
| 123 | Isabel | Monterrey | INSERT | 1 |
| 125 | Mercedes | Tijuana | INSERT | 2 |
| 126 | Zambak | Cancun | INSERT | 2 |
| 123 | sıfır | sıfır | DELETE | 6 |
| 125 | Mercedes | Guadalajara | UPDATE | 6 |
| 125 | Mercedes | Mexicali | UPDATE | 5 |
| 123 | Isabel | Chihuahua | UPDATE | 5 |
Örnek veri oluşturma sorgusundaki son satırın açıklamasını kaldırdığınızda, tablonun kesilmesini (tabloyu temizleme) belirten aşağıdaki kaydı sequenceNum=3 ekler:
| userId | name | city | Operasyon | sıraNumarası |
|---|---|---|---|---|
| sıfır | sıfır | sıfır | TRUNCATE | 3 |
Uyarı
Aşağıdaki örneklerin tümü hem hem DELETE de TRUNCATE işlemleri belirtme seçeneklerini içerir, ancak her biri isteğe bağlıdır.
Örnek veri oluşturma
Örnek bir veri kümesi oluşturmak için aşağıdaki deyimleri çalıştırın. Bu kodun bir işlem hattı tanımının parçası olarak çalıştırılması amaçlanmamıştır. Bunu dönüştürmeler klasörü yerine işlem hattınızın keşif klasöründen çalıştırın.
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
SCD Tür 1 güncelleştirmelerini işleme
SCD Tür 1, her kaydın yalnızca en son sürümünü tutar. Aşağıdaki örnek, yukarıda oluşturulan değişiklik veri akışından okur ve değişiklikleri akış tablosu hedefine uygular. Bu kodu çalıştırmak için Lakeflow Spark Bildirimli İşlem Hatları geliştirin.
Piton
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_current")
dp.create_auto_cdc_flow(
target = "users_current",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
CREATE OR REFRESH STREAMING TABLE users_current;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_current
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
SCD Tür 1 örneğini çalıştırdıktan sonra hedef tablo aşağıdaki kayıtları içerir:
| userId | name | city |
|---|---|---|
| 124 | Raul | Oaxaca |
| 125 | Mercedes | Guadalajara |
| 126 | Zambak | Cancun |
Kullanıcı 123 (Isabel) silindi ve görünmüyor. Kullanıcı 125 (Mercedes) yalnızca en son şehri (Guadalajara) gösterir çünkü SCD Type 1 önceki değerlerin üzerine yazar. Daha sonraki bir güncelleştirme geldiğinden, önceki UPDATEsequenceNum=5 sürümü sequenceNum=6 bırakıldı.
Örneği TRUNCATE kaydı yorum satırı olmadan çalıştırdıktan sonra, tablo sequenceNum=3'de temizlenir. Bu, 124 ve 126 kayıtlarının tabloda olmadığı ve nihai hedef tablonun yalnızca aşağıdaki kaydı içerdiği anlamına gelir:
| userId | name | city |
|---|---|---|
| 125 | Mercedes | Guadalajara |
SCD Tür 2 güncelleştirmelerini işleme
SCD Tür 2, __START_AT__END_AT bir kaydın her sürümü için yeni satırlar oluşturarak ve her sürümün ne zaman etkin olduğunu gösteren sütunlar oluşturarak değişikliklerin tam geçmişini korur.
Piton
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
SCD Tür 2 örneğini çalıştırdıktan sonra hedef tablo aşağıdaki kayıtları içerir:
| userId | name | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Monterrey | 1 | 5 |
| 123 | Isabel | Chihuahua | 5 | 6 |
| 124 | Raul | Oaxaca | 1 | sıfır |
| 125 | Mercedes | Tijuana | 2 | 5 |
| 125 | Mercedes | Mexicali | 5 | 6 |
| 125 | Mercedes | Guadalajara | 6 | sıfır |
| 126 | Zambak | Cancun | 2 | sıfır |
Tablo, eksiksiz geçmişi korur. Kullanıcı 123'ün iki sürümü vardır (silindiğinde 6. sırada sona erdi). Kullanıcı 125'in şehir değişikliklerini gösteren üç sürümü vardır. ile __END_AT = null kayıtlar şu anda etkindir.
SCD Tür 2 ile sütun alt kümesini izleme
Varsayılan olarak, HERHANGI bir sütun değeri değiştiğinde SCD Tür 2 yeni bir sürüm oluşturur. İzlenmesi gereken sütunların bir alt kümesini belirtebilirsiniz, böylece diğer sütunlarda yapılan değişiklikler yeni bir geçmiş kaydı oluşturmak yerine geçerli sürümü yerinde güncelleştirir.
Aşağıdaki örnek city sütununu geçmiş izlenmesinden hariç tutar:
Piton
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
city Değişiklikler izlenmediği için, şehir güncelleştirmeleri yeni bir sürüm oluşturmak yerine geçerli satırın üzerine yazılır. Hedef tablo aşağıdaki kayıtları içerir:
| userId | name | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Chihuahua | 1 | 6 |
| 124 | Raul | Oaxaca | 1 | sıfır |
| 125 | Mercedes | Guadalajara | 2 | sıfır |
| 126 | Zambak | Cancun | 2 | sıfır |
AUTO CDC FROM SNAPSHOT örnekleri
Aşağıdaki bölümlerde, anlık görüntüleri SCD Tür 1 veya Tür 2 hedef tablolarında işlemek için kullanma AUTO CDC FROM SNAPSHOT örnekleri verilmiştir. Bu API'nin ne zaman kullanılacağına ilişkin arka plan için bkz. Veri yakalamayı ve anlık görüntüleri değiştirme.
Örnek: Alım zamanını kullanarak işlem hattı anlık görüntülerini oluşturma
Anlık görüntüler düzenli ve sırayla geldiğinde bu yaklaşımı kullanın ve sürüm oluşturma için işlem hattı çalıştırma zaman damgasını kullanabilirsiniz. Her işlem hattı güncelleştirmesi ile yeni bir anlık görüntü alınmaktadır.
Delta tabloları, bulut depolama dosyaları ve JDBC bağlantıları gibi birden çok kaynak türünden anlık görüntüleri okuyabilirsiniz.
1. Adım: Örnek veriler oluşturma
Anlık görüntü verilerini içeren bir tablo oluşturun. İşlem hattınızın klasöründe bir not defterinden veya Databricks SQL'den explorations aşağıdaki kodu çalıştırın:
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.snapshot (
userId INT,
city STRING
);
INSERT INTO main.cdc_tutorial.snapshot VALUES
(1, 'Oaxaca'),
(2, 'Monterrey'),
(3, 'Tijuana');
2. Adım: Anlık Görüntüden Auto CDC'yi çalıştırma
Bu adımda kodu çalıştırmak için Lakeflow Spark Bildirimli İşlem Hatları geliştirin.
Anlık görüntü görünümü için bir kaynak türü seçin (örnek oluşturma kodu bir Delta tablosu oluşturur):
Seçenek A: Delta tablosundan okuma
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("main.cdc_tutorial.snapshot")
B Seçeneği: Bulut depolamadan okuma
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.format("csv").option("header", True).load("<snapshot-path>")
C Seçeneği: JDBC'den okuma (yalnızca klasik işlem)
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
Tüm seçenekleri hedefe yaz
Ardından hedef tabloyu ve akışı ekleyin:
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = "source",
keys = ["userId"],
stored_as_scd_type = 2
)
İlk işlem hattı çalıştırıldıktan sonra tüm kayıtlar etkin satırlar olarak eklenir:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | sıfır |
| 2 | Monterrey | 0 | sıfır |
| 3 | Tijuana | 0 | sıfır |
Uyarı
Bunun yerine SCD Tür 1'i kullanmak ve yalnızca geçerli durumu korumak için ayarlayın stored_as_scd_type=1. Bu durumda, hedef tablo __START_AT ve __END_AT sütunlarını içermez.
3. Adım: Yeni bir anlık görüntü benzet ve yeniden çalıştır
Gelen yeni anlık görüntünün benzetimini yapmak için kaynak tabloyu güncelleştirin (bu kodu pipline klasörünüzdeki explorations bir not defterinden veya SQL dosyasından çalıştırın):
TRUNCATE TABLE main.cdc_tutorial.snapshot;
INSERT INTO main.cdc_tutorial.snapshot VALUES
(2, 'Carmel'),
(3, 'Los Angeles'),
(4, 'Death Valley'),
(6, 'Kings Canyon');
İşlem hattını yeniden çalıştırın.
AUTO CDC FROM SNAPSHOT yeni anlık görüntüyü öncekiyle karşılaştırır ve 1 kullanıcısının silindiğini, 2. ve 3. kullanıcıların güncelleştirildiğini ve 4. ve 6. kullanıcıların eklendiğini algılar. Bu, bir değişiklik akışı oluşturur ve çıkış tablosunu oluşturmak için AUTO CDC kullanır.
SCD Tür 2 ile ikinci çalıştırmadan sonra hedef tablo aşağıdaki kayıtları içerir:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | 1 |
| 2 | Monterrey | 0 | 1 |
| 2 | Carmel | 1 | sıfır |
| 3 | Tijuana | 0 | 1 |
| 3 | Los Angeles | 1 | sıfır |
| 4 | Ölüm Vadisi | 1 | sıfır |
| 6 | Kings Kanyonu | 1 | sıfır |
Kullanıcı 1 sonlandırıldı (silindi). 2 ve 3 kullanıcılarının her birinin şehir değişikliklerini gösteren iki sürümü vardır. 4 ve 6 kullanıcıları yeni eklendi.
SCD Tür 1 ile ikinci çalıştırmadan sonra hedef tablo yalnızca geçerli durumu gösterir:
| userId | city |
|---|---|
| 2 | Carmel |
| 3 | Los Angeles |
| 4 | Ölüm Vadisi |
| 6 | Kings Kanyonu |
Örnek: Sürüm işlevlerini kullanarak anlık görüntüleri işleme
Anlık görüntü sıralama üzerinde açık denetime ihtiyacınız olduğunda bu yaklaşımı kullanın. Örneğin, aynı anda birden çok anlık görüntü geldiğinde veya anlık görüntüler sıra dışında geldiğinde bu yaklaşımı kullanın. Bir işlev yazarsınız, hangi anlık görüntünün işleneceğini ve sürüm numarasını belirler. API, anlık görüntüleri artan sürüm sırasına göre işler:
- Depolama alanında birden çok anlık görüntü varsa, bunların tümü sırayla işlenir.
- Anlık görüntülerin sırası karışırsa (örneğin,
snapshot_3snapshot_4'den sonra gelirse), atlanır. - Yeni anlık görüntü yoksa işlev
Nonedöndürüyor ve hiçbir işlem gerçekleşmiyor.
1. Adım: Anlık görüntü dosyalarını hazırlama
Anlık görüntü verilerini içeren CSV dosyaları oluşturun ve bunları bir birime veya bulut depolama konumuna ekleyin. Dosyaları kronolojik olarak adlandırın (örneğin, snapshot_1.csv, snapshot_2.csv).
Her dosya userId ve city sütunlarını içermelidir. Örneğin:
snapshot_1.csv:
| userId | city |
|---|---|
| 1 | Oaxaca |
| 2 | Monterrey |
| 3 | Tijuana |
snapshot_2.csv:
| userId | city |
|---|---|
| 2 | Carmel |
| 3 | Los Angeles |
| 4 | Ölüm Vadisi |
2. Adım: AUTO CDC FROM SNAPSHOT komutunu bir sürüm fonksiyonu ile çalıştırın
Yeni bir not defteri oluşturun ve aşağıdaki işlem hattı kodunu yapıştırın. Ardından Lakeflow Spark Bildirimli Veri Hatları Geliştirin.
from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data
files = dbutils.fs.ls(snapshot_dir)
snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]
snapshot_versions = []
for filename in snapshot_files:
try:
version = int(filename.replace("snapshot_", "").replace(".csv", ""))
snapshot_versions.append(version)
except ValueError:
continue
snapshot_versions.sort()
if latest_snapshot_version is None:
if snapshot_versions:
next_version = snapshot_versions[0]
else:
return None
else:
next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
if next_versions:
next_version = next_versions[0]
else:
return None
snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
df = spark.read.format("csv").option("header", True).load(snapshot_path)
return (df, next_version)
dp.create_streaming_table("main.cdc_tutorial.target_versioned")
dp.create_auto_cdc_from_snapshot_flow(
target = "main.cdc_tutorial.target_versioned",
source = next_snapshot_and_version,
keys = ["userId"],
stored_as_scd_type = 2
)
Uyarı
Bunun yerine SCD Tür 1'i kullanmak için değerini ayarlayın stored_as_scd_type=1.
işlendikten snapshot_1.csvsonra hedef tablo aşağıdaki kayıtları içerir:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | sıfır |
| 2 | Monterrey | 1 | sıfır |
| 3 | Tijuana | 1 | sıfır |
işlendikten snapshot_2.csvsonra hedef tablo aşağıdaki kayıtları içerir:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | 2 |
| 2 | Monterrey | 1 | 2 |
| 2 | Carmel | 2 | sıfır |
| 3 | Tijuana | 1 | 2 |
| 3 | Los Angeles | 2 | sıfır |
| 4 | Ölüm Vadisi | 2 | sıfır |
Uyarı
SCD Tür 1 için tablonun en son anlık görüntüye tam olarak benzediğini unutmayın. Aradaki fark, aşağı akış sorgularının değişiklik akışını yalnızca değiştirilen kayıtları işlemek için kullanabilmesidir.
3. Adım: Yeni anlık görüntüler ekleme
Değiştirilen verilerle (örneğin, değiştirilen şehir değerleri, yeni satırlar veya kaldırılan satırlar) depolama konumuna yeni bir CSV dosyası ekleyin. Ardından yeni anlık görüntüyü işlemek için işlem hattını yeniden çalıştırın.
Sınırlamalar
- Sıralama sütunu sıralanabilir bir veri türü olmalıdır.
NULLsıralama değerleri desteklenmez. -
AUTO CDC FROM SNAPSHOTyalnızca Python işlem hattı arabiriminde desteklenir; SQL arabirimi desteklenmez.
Ek kaynaklar
- Veri yakalama ve anlık görüntüleri değiştirme: CDC kavramları, anlık görüntüler ve SCD türleri hakkında bilgi edinin.
-
AUTO CDCkullanarak bir dış RDBMS tablosunu çoğaltın:onceakışı ile ilk hidrasyonu gerçekleştirmeyi ve ardından değişiklikleri işlemeye devam etmeyi öğrenin. - Gelişmiş AUTO CDC konuları: AUTO CDC hedeflerindeki değişiklik işlemleri, değişiklik veri akışlarını okuma ve ölçümleri işleme hakkında bilgi edinin.
- Öğretici: Değişiklik verilerini yakalamayı kullanarak ETL işlem hattı oluşturma