DEĞIŞIKLIKLERI UYGULA API'leri: Delta Live Tablolarıyla değişiklik verilerini yakalamayı basitleştirme
Delta Live Tables, ve APPLY CHANGES FROM SNAPSHOT
API'leri ile değişiklik veri yakalamayı APPLY CHANGES
(CDC) basitleştirir. Kullandığınız arabirim, değişiklik verilerinin kaynağına bağlıdır:
- Değişiklik veri akışındaki (CDF) değişiklikleri işlemek için kullanın
APPLY CHANGES
. - Veritabanı anlık görüntülerindeki değişiklikleri işlemek için (Genel Önizleme) kullanın
APPLY CHANGES FROM SNAPSHOT
.
MERGE INTO
Daha önce deyimi, Azure Databricks'te CDC kayıtlarını işlemek için yaygın olarak kullanılıyordu. Ancak, MERGE INTO
sıra dışı kayıtlar nedeniyle yanlış sonuçlar verebilir veya kayıtları yeniden sıralamak için karmaşık mantık gerektirir.
APPLY CHANGES
API, Delta Live Tables SQL ve Python arabirimlerinde desteklenir. APPLY CHANGES FROM SNAPSHOT
API, Delta Live Tables Python arabiriminde desteklenir.
ScD tür 1 ve APPLY CHANGES FROM SNAPSHOT
tür 2 kullanarak tabloların güncelleştirilmesini hem hem de APPLY CHANGES
destekler:
- Kayıtları doğrudan güncelleştirmek için SCD tür 1'i kullanın. Güncelleştirilmiş kayıtlar için geçmiş korunmaz.
- Tüm güncelleştirmelerde veya belirtilen bir sütun kümesindeki güncelleştirmelerde kayıtların geçmişini korumak için SCD tür 2'yi kullanın.
Söz dizimi ve diğer başvurular için bkz:
- Delta Live Tablolarında Python ile değişiklik akışından veri yakalamayı değiştirme
- Delta Live Tablolarında SQL ile veri yakalamayı değiştirme
Not
Bu makalede, Delta Live Tables işlem hattınızdaki tabloların kaynak verilerdeki değişikliklere göre nasıl güncelleştirildiği açıklanır. Delta tabloları için satır düzeyi değişiklik bilgilerini kaydetmeyi ve sorgulamayı öğrenmek için bkz . Azure Databricks'te Delta Lake değişiklik veri akışını kullanma.
Gereksinimler
CDC API'lerini kullanmak için işlem hattınızın sunucusuz DLT işlem hatlarını veya Delta Live Tablolarını Pro
veya Advanced
sürümlerini kullanacak şekilde yapılandırılması gerekir.
CDC API ile APPLY CHANGES
nasıl uygulanır?
Delta Live Tables'daki API, APPLY CHANGES
sıra dışı kayıtları otomatik olarak işleyerek CDC kayıtlarının doğru işlenmesini sağlar ve sıra dışı kayıtları işlemek için karmaşık mantık geliştirme gereksinimini ortadan kaldırır. Kaynak verilerde kayıtların sıralanması için Delta Live Tablolarının kaynak verilerin düzgün sıralanması için monoton olarak artan bir temsil olarak yorumladığı bir sütun belirtmeniz gerekir. Delta Live Tables, sıra dışı gelen verileri otomatik olarak işler. SCD tür 2 değişiklikleri için Delta Live Tables, uygun sıralama değerlerini hedef tablonun ve __END_AT
sütunlarına yayılır__START_AT
. Her sıralama değerinde anahtar başına ayrı bir güncelleştirme olmalıdır ve NULL sıralama değerleri desteklenmez.
ile APPLY CHANGES
CDC işlemesi gerçekleştirmek için önce bir akış tablosu oluşturur ve ardından SQL'deki deyimini veya apply_changes()
Python'daki işlevi kullanarak APPLY CHANGES INTO
değişiklik akışının kaynağını, anahtarlarını ve sıralamasını belirtirsiniz. Hedef akış tablosunu oluşturmak için SQL'deki deyimini CREATE OR REFRESH STREAMING TABLE
veya create_streaming_table()
Python'daki işlevini kullanın. SCD tür 1 ve tür 2 işleme örneklerine bakın.
Söz dizimi ayrıntıları için Delta Live Tables SQL başvurusuna veya Python başvurusuna bakın.
CDC API ile APPLY CHANGES FROM SNAPSHOT
nasıl uygulanır?
Önemli
API Genel APPLY CHANGES FROM SNAPSHOT
Önizleme aşamasındadır.
APPLY CHANGES FROM SNAPSHOT
, bir dizi sıralı anlık görüntüyü karşılaştırarak kaynak verilerdeki değişiklikleri verimli bir şekilde belirleyen ve ardından anlık görüntülerdeki kayıtların CDC işlenmesi için gereken işlemeyi çalıştıran bildirim temelli bir API'dir. APPLY CHANGES FROM SNAPSHOT
yalnızca Delta Live Tables Python arabirimi tarafından desteklenir.
APPLY CHANGES FROM SNAPSHOT
birden çok kaynak türünden anlık görüntülerin alımını destekler:
- Mevcut bir tablo veya görünümden anlık görüntüleri almak için düzenli anlık görüntü alımını kullanın.
APPLY CHANGES FROM SNAPSHOT
mevcut bir veritabanı nesnesinden anlık görüntüleri düzenli aralıklarla alma desteği için basit, kolaylaştırılmış bir arabirime sahiptir. Her işlem hattı güncelleştirmesi ile yeni bir anlık görüntü alınıp alma süresi anlık görüntü sürümü olarak kullanılır. Bir işlem hattı sürekli modda çalıştırıldığında, ANLıK GÖRÜNTÜDEN DEĞİşİkLİk UYGULAMA işlemini içeren akışın tetikleyici aralığı ayarı tarafından belirlenen bir dönemde her işlem hattı güncelleştirmesi ile birden çok anlık görüntü alınmaktadır. - Oracle veya MySQL veritabanından ya da veri ambarından oluşturulan anlık görüntüler gibi veritabanı anlık görüntülerini içeren dosyaları işlemek için geçmiş anlık görüntü alımını kullanın.
ile APPLY CHANGES FROM SNAPSHOT
herhangi bir kaynak türünden CDC işlemesi gerçekleştirmek için önce bir akış tablosu oluşturur ve ardından python'da işlevini kullanarak apply_changes_from_snapshot()
işlemeyi uygulamak için gereken anlık görüntüyü, anahtarları ve diğer bağımsız değişkenleri belirtirsiniz. Düzenli anlık görüntü alımı ve geçmiş anlık görüntü alımı örneklerine bakın.
API'ye geçirilen anlık görüntülerin sürüme göre artan sırada olması gerekir. Delta Live Tables sıra dışı bir anlık görüntü algılarsa bir hata oluşur.
Söz dizimi ayrıntıları için Delta Live Tables Python başvurusuna bakın.
Sınırlamalar
Sıralama için kullanılan sütun sıralanabilir bir veri türü olmalıdır.
Örnek: CDF kaynak verileriyle SCD tür 1 ve SCD tür 2 işleme
Aşağıdaki bölümlerde, değişiklik veri akışındaki kaynak olayları temel alarak hedef tabloları güncelleştiren Delta Live Tables SCD tür 1 ve tür 2 sorguları örnekleri verilmiştir:
- Yeni kullanıcı kayıtları oluşturur.
- Kullanıcı kaydını siler.
- Kullanıcı kayıtlarını güncelleştirir. SCD tür 1 örneğinde, son
UPDATE
işlemler geç gelir ve sıra dışı olayların işlenmesini gösteren hedef tablodan bırakılır.
Aşağıdaki örneklerde Delta Live Tables işlem hatlarını yapılandırma ve güncelleştirme konusunda bilgi sahibi olduğunuz varsayılır. Bkz . Öğretici: İlk Delta Live Tables işlem hattınızı çalıştırma.
Bu örnekleri çalıştırmak için bir örnek veri kümesi oluşturarak başlamalısınız. Bkz . Test verileri oluşturma.
Bu örneklerin giriş kayıtları aşağıda verilmiştir:
userId | Adı | şehir | operation | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Zambak | Cancun | INSERT | 2 |
123 | null | null | SİL | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Örnek verilerdeki son satırın açıklamasını kaldırdığınızda, kayıtların nerede kesilmesi gerektiğini belirten aşağıdaki kaydı ekler:
userId | Adı | şehir | operation | sequenceNum |
---|---|---|---|---|
null | null | null | TRUNCATE | 3 |
Not
Aşağıdaki örneklerin tümü hem hem TRUNCATE
de DELETE
işlemleri belirtme seçeneklerini içerir, ancak her biri isteğe bağlıdır.
SCD tür 1 güncelleştirmelerini işleme
Aşağıdaki örnekte SCD tür 1 güncelleştirmelerinin işlenmesi gösterilmektedir:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
SCD tür 1 örneğini çalıştırdıktan sonra hedef tablo aşağıdaki kayıtları içerir:
userId | Adı | şehir |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Zambak | Cancun |
EK kayıtla SCD tür 1 örneğini TRUNCATE
çalıştırdıktan sonra, içindeki işlem sequenceNum=3
nedeniyle TRUNCATE
kayıtlar 124
kesilir 126
ve hedef tablo aşağıdaki kaydı içerir:
userId | Adı | şehir |
---|---|---|
125 | Mercedes | Guadalajara |
SCD tür 2 güncelleştirmelerini işleme
Aşağıdaki örnekte SCD tür 2 güncelleştirmelerinin işlenmesi gösterilmektedir:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
SCD tür 2 örneğini çalıştırdıktan sonra hedef tablo aşağıdaki kayıtları içerir:
userId | Adı | şehir | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | boş |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | boş |
126 | Zambak | Cancun | 2 | boş |
SCD tür 2 sorgusu, hedef tablodaki geçmiş için izlenecek çıkış sütunlarının bir alt kümesini de belirtebilir. Diğer sütunlarda yapılan değişiklikler yeni geçmiş kayıtları oluşturmak yerine yerinde güncelleştirilir. Aşağıdaki örnek, sütunun izleme dışında tutulduğunu city
gösterir:
Aşağıdaki örnekte, SCD türü 2 ile izleme geçmişinin kullanılması gösterilmektedir:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Bu örneği ek TRUNCATE
kayıt olmadan çalıştırdıktan sonra hedef tablo aşağıdaki kayıtları içerir:
userId | Adı | şehir | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | boş |
125 | Mercedes | Guadalajara | 2 | boş |
126 | Zambak | Cancun | 2 | boş |
Test verileri oluşturma
Aşağıdaki kod, bu öğreticide bulunan örnek sorgularda kullanılmak üzere örnek bir veri kümesi oluşturmak üzere sağlanmıştır. Yeni bir şema oluşturmak ve yeni bir tablo oluşturmak için uygun kimlik bilgilerine sahip olduğunuzu varsayarsak, bu deyimleri bir not defteri veya Databricks SQL ile çalıştırabilirsiniz. Aşağıdaki kodun Delta Live Tables işlem hattının bir parçası olarak çalıştırılması amaçlanmamıştır :
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Örnek: Düzenli anlık görüntü işleme
Aşağıdaki örnek, konumunda mycatalog.myschema.mytable
depolanan bir tablonun anlık görüntülerini alan SCD tür 2 işlemesini gösterir. İşleme sonuçları adlı target
bir tabloya yazılır.
mycatalog.myschema.mytable
2024-01-01 00:00:00 zaman damgasındaki kayıtlar
Anahtar | Değer |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
2024-01-01 12:00:00 zaman damgasındaki kayıtlar
Anahtar | Değer |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.apply_changes_from_snapshot(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
Anlık görüntüleri işledikten sonra hedef tablo aşağıdaki kayıtları içerir:
Anahtar | Değer | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | boş |
3 | a3 | 2024-01-01 12:00:00 | boş |
Örnek: Geçmiş anlık görüntü işleme
Aşağıdaki örnekte, bir bulut depolama sisteminde depolanan iki anlık görüntüden kaynak olayları temel alarak bir hedef tabloyu güncelleştiren SCD tür 2 işleme gösterilmektedir:
konumunda timestamp
depolanan anlık görüntü /<PATH>/filename1.csv
Anahtar | TrackingColumn | NonTrackingColumn |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
konumunda timestamp + 5
depolanan anlık görüntü /<PATH>/filename2.csv
Anahtar | TrackingColumn | NonTrackingColumn |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
4 | a4 | b4_new |
Aşağıdaki kod örneği, bu anlık görüntülerle SCD tür 2 güncelleştirmelerinin işlenmesini gösterir:
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.apply_changes_from_snapshot(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
Anlık görüntüleri işledikten sonra hedef tablo aşağıdaki kayıtları içerir:
Anahtar | TrackingColumn | NonTrackingColumn | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | boş |
3 | a3 | b3 | 2 | boş |
4 | a4 | b4_new | 1 | boş |
Hedef akış tablosuna veri ekleme, değiştirme veya silme
İşlem hattınız Tabloları Unity Kataloğu'nda yayımlıyorsa, deyimler tarafından APPLY CHANGES INTO
oluşturulan hedef akış tablolarını değiştirmek için ekleme, güncelleştirme, silme ve birleştirme deyimleri gibi veri işleme dili (DML) deyimlerini kullanabilirsiniz.
Not
- Akış tablosunun tablo şemasını değiştiren DML deyimleri desteklenmez. DML deyimlerinizin tablo şemasını geliştirmeye çalışmadığından emin olun.
- Akış tablosunu güncelleştiren DML deyimleri, Databricks Runtime 13.3 LTS ve üzeri kullanılarak yalnızca paylaşılan unity kataloğu kümesinde veya SQL ambarında çalıştırılabilir.
- Akış yalnızca ekleme veri kaynakları gerektirdiğinden, işlemeniz bir kaynak akış tablosundan değişiklik içeren akış gerektiriyorsa (örneğin, DML deyimlerine göre), kaynak akış tablosunu okurken skipChangeCommits bayrağını ayarlayın.
skipChangeCommits
Ayarlandığında, kaynak tablodaki kayıtları silen veya değiştiren işlemler yoksayılır. İşlemeniz bir akış tablosu gerektirmiyorsa, hedef tablo olarak gerçekleştirilmiş bir görünüm (yalnızca ekleme kısıtlaması yoktur) kullanabilirsiniz.
Delta Live Tables belirtilen SEQUENCE BY
bir sütun kullandığından ve hedef tablonun ve __END_AT
sütunlarına __START_AT
uygun sıralama değerlerini yaydığından (SCD türü 2 için), DML deyimlerinin kayıtların düzgün sıralamasını korumak için bu sütunlar için geçerli değerler kullandığından emin olmanız gerekir. Bkz. CDC, DEĞIŞIKLIKLERI UYGULA API'siyle nasıl uygulanır?.
Akış tablolarıyla DML deyimlerini kullanma hakkında daha fazla bilgi için bkz . Akış tablosunda veri ekleme, değiştirme veya silme.
Aşağıdaki örnek, başlangıç dizisi 5 olan etkin bir kayıt ekler:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
Hedef tablodan APPLY CHANGES
değişiklik veri akışını okuma
Databricks Runtime 15.2 ve üzeri sürümlerde, diğer Delta tablolarından değişiklik veri akışını okuduğunuz gibi veya APPLY CHANGES FROM SNAPSHOT
sorgularının APPLY CHANGES
hedefi olan bir akış tablosundan değişiklik veri akışını okuyabilirsiniz. Bir hedef akış tablosundan değişiklik veri akışını okumak için aşağıdakiler gereklidir:
- Hedef akış tablosunun Unity Kataloğu'nda yayımlanması gerekir. Bkz . Delta Live Tables işlem hatlarınızla Unity Kataloğu'nu kullanma.
- Veri akışını hedef akış tablosundan okumak için Databricks Runtime 15.2 veya üzerini kullanmanız gerekir. Değişiklik veri akışını başka bir Delta Live Tables işlem hattında okumak için işlem hattının Databricks Runtime 15.2 veya üzerini kullanacak şekilde yapılandırılması gerekir.
Delta Live Tables işlem hattında oluşturulan bir hedef akış tablosundaki değişiklik veri akışını, diğer Delta tablolarından değişiklik veri akışını okumakla aynı şekilde okursunuz. Python ve SQL örnekleri de dahil olmak üzere Delta değişiklik veri akışı işlevini kullanma hakkında daha fazla bilgi edinmek için bkz . Azure Databricks'te Delta Lake değişiklik veri akışını kullanma.
Not
Değişiklik veri akışı kaydı, değişiklik olayının türünü tanımlayan meta verileri içerir. Tablodaki bir kayıt güncelleştirildiğinde, ilişkili değişiklik kayıtlarının meta verileri genellikle ve update_postimage
olayları olarak update_preimage
ayarlanan değerleri içerir_change_type
.
Ancak, birincil anahtar değerlerinin değiştirilmesini _change_type
içeren hedef akış tablosunda güncelleştirmeler yapıldığında değerler farklıdır. Değişiklikler birincil anahtar güncelleştirmelerini içerdiğinde_change_type
, meta veri alanları ve delete
olayları olarak insert
ayarlanır. Birincil anahtarlarda yapılan değişiklikler, veya MERGE
deyimiyle anahtar alanlarından birinde el ile UPDATE
güncelleştirmeler yapıldığında veya SCD tür 2 tabloları için alan önceki bir başlangıç dizisi değerini yansıtacak şekilde değiştiğinde __start_at
oluşabilir.
Sorgu APPLY CHANGES
, SCD tür 1 ve SCD tür 2 işleme için farklılık gösteren birincil anahtar değerlerini belirler:
- SCD tür 1 işleme ve Delta Live Tables Python arabirimi için birincil anahtar işlevdeki
apply_changes()
parametreninkeys
değeridir. Delta Live Tables SQL arabirimi için birincil anahtar, deyimindeki yan tümcesiKEYS
APPLY CHANGES INTO
tarafından tanımlanan sütunlardır. - SCD tür 2 için birincil anahtar parametre veya
KEYS
yan tümce artı işlemdencoalesce(__START_AT, __END_AT)
dönüş değeridirkeys
; burada__START_AT
ve__END_AT
hedef akış tablosundan karşılık gelen sütunlardır.
Delta Live Tables CDC sorgusu tarafından işlenen kayıtlar hakkında veri alma
Not
Aşağıdaki ölçümler sorgular tarafından APPLY CHANGES
değil yalnızca sorgular tarafından APPLY CHANGES FROM SNAPSHOT
yakalanır.
Aşağıdaki ölçümler sorgular tarafından APPLY CHANGES
yakalanır:
num_upserted_rows
: Güncelleştirme sırasında veri kümesine ekli çıkış satırlarının sayısı.num_deleted_rows
: Güncelleştirme sırasında veri kümesinden silinen mevcut çıkış satırlarının sayısı.
num_output_rows
CDC olmayan akışların çıktısı olan ölçüm sorgular için apply changes
yakalanmaz.
Delta Live Tables CDC işlemesi için hangi veri nesneleri kullanılır?
Not: Aşağıdaki veri yapıları yalnızca işleme için geçerlidir, işleme için APPLY CHANGES
geçerli değildir APPLY CHANGES FROM SNAPSHOT
.
Hive meta veri deposunda hedef tabloyu bildirdiğinizde iki veri yapısı oluşturulur:
- Hedef tabloya atanan adı kullanan bir görünüm.
- DELTA Live Tables tarafından CDC işlemeyi yönetmek için kullanılan iç yedekleme tablosu. Bu tablo, hedef tablo adına önceden bağlama
__apply_changes_storage_
tarafından adlandırılır.
Örneğin, adlı dlt_cdc_target
bir hedef tablo bildirirseniz, meta veri deposunda adlı dlt_cdc_target
bir görünüm ve adlı __apply_changes_storage_dlt_cdc_target
bir tablo görürsünüz. Görünüm oluşturmak, Delta Live Tablolarının sıra dışı verileri işlemek için gereken ek bilgileri (örneğin, kaldırıldı taşlarını ve sürümleri) filtrelemesini sağlar. İşlenen verileri görüntülemek için hedef görünümü sorgular. Tablonun şeması __apply_changes_storage_
gelecekteki özellikleri veya geliştirmeleri destekleyecek şekilde değişebileceğinden, tabloyu üretim kullanımı için sorgulamamalısınız. Tabloya el ile veri eklerseniz, sürüm sütunları eksik olduğundan kayıtların diğer değişikliklerden önce geldiği varsayılır.
İşlem hattı Unity Kataloğu'nda yayımlanırsa, iç yedekleme tablolarına kullanıcılar erişemez.