DEĞİşİkLİkLERİ UYGULAMA API'Sİ: Delta Live Tablolarında değişiklik veri yakalamayı basitleştirme
Delta Live Tables, API ile değişiklik veri yakalamayı APPLY CHANGES
(CDC) basitleştirir. MERGE INTO
Daha önce deyimi, Azure Databricks'te CDC kayıtlarını işlemek için yaygın olarak kullanılıyordu. Ancak, MERGE INTO
sıra dışı kayıtlar nedeniyle yanlış sonuçlar verebilir veya kayıtları yeniden sıralamak için karmaşık mantık gerektirebilir.
Delta Live Tables'daki API, APPLY CHANGES
sıra dışı kayıtları otomatik olarak işleyerek CDC kayıtlarının doğru işlenmesini sağlar ve sıra dışı kayıtları işlemek için karmaşık mantık geliştirme gereksinimini ortadan kaldırır.
APPLY CHANGES
API, DELTA Live Tables SQL ve Python arabirimlerinde desteklenir; scd türü 1 ve tür 2 ile tabloları güncelleştirme desteği de dahil olmak üzere:
- Kayıtları doğrudan güncelleştirmek için SCD tür 1'i kullanın. Güncelleştirilen kayıtlar için geçmiş korunmaz.
- Tüm güncelleştirmelerde veya belirtilen bir sütun kümesindeki güncelleştirmelerde kayıtların geçmişini korumak için SCD tür 2'yi kullanın.
Söz dizimi ve diğer başvurular için bkz:
- Delta Live Tablolarında Python ile veri yakalamayı değiştirme
- Delta Live Tablolarında SQL ile veri yakalamayı değiştirme
- SCD tür 1 sorguları için silinmiş öğe yönetimini denetleme
Not
Bu makalede, Delta Live Tables işlem hattınızdaki tabloların kaynak verilerdeki değişikliklere göre nasıl güncelleştirildiği açıklanır. Delta tabloları için satır düzeyi değişiklik bilgilerini kaydetmeyi ve sorgulamayı öğrenmek için bkz . Azure Databricks'te Delta Lake değişiklik veri akışını kullanma.
CDC, Delta Live Tables ile nasıl uygulanır?
Kaynak verilerde kayıtların sıralanması için Delta Live Tablolarının kaynak verilerin düzgün sıralanması için monoton olarak artan bir temsil olarak yorumladığı bir sütun belirtmeniz gerekir. Delta Live Tables, sıra dışı gelen verileri otomatik olarak işler. SCD Tür 2 değişiklikleri için Delta Live Tables, hedef tablonun ve __END_AT
sütunlarına __START_AT
uygun sıralama değerlerini yayılım. Her sıralama değerinde anahtar başına ayrı bir güncelleştirme olmalıdır ve NULL sıralama değerleri desteklenmez.
Delta Live Tables ile CDC işlemesi gerçekleştirmek için önce bir akış tablosu oluşturur ve ardından değişiklik akışı için kaynağı, anahtarları ve sıralamayı belirtmek üzere bir APPLY CHANGES INTO
deyimi kullanırsınız. Hedef akış tablosunu oluşturmak için SQL'deki deyimini CREATE OR REFRESH STREAMING TABLE
veya create_streaming_table()
Python'daki işlevini kullanın. CDC işlemesini tanımlayan deyimini oluşturmak için SQL'deki deyimini veya apply_changes()
Python'daki işlevini kullanınAPPLY CHANGES
. Söz dizimi ayrıntıları için bkz. Delta Live Tablolarında SQL ile veri yakalamayı değiştirme veya Delta Live Tablolarında Python ile veri yakalamayı değiştirme.
Delta Live Tables CDC işlemesi için hangi veri nesneleri kullanılır?
Hive meta veri deposunda hedef tabloyu bildirdiğinizde iki veri yapısı oluşturulur:
- Hedef tabloya atanan adı kullanan bir görünüm.
- DELTA Live Tables tarafından CDC işlemeyi yönetmek için kullanılan iç yedekleme tablosu. Bu tablo, hedef tablo adına önceden bağlama
__apply_changes_storage_
tarafından adlandırılır.
Örneğin, adlı dlt_cdc_target
bir hedef tablo bildirirseniz, meta veri deposunda adlı dlt_cdc_target
bir görünüm ve adlı __apply_changes_storage_dlt_cdc_target
bir tablo görürsünüz. Görünüm oluşturmak, Delta Live Tablolarının sıra dışı verileri işlemek için gereken ek bilgileri (örneğin, kaldırıldı taşlarını ve sürümleri) filtrelemesini sağlar. İşlenen verileri görüntülemek için hedef görünümü sorgular. Tablonun şeması __apply_changes_storage_
gelecekteki özellikleri veya geliştirmeleri destekleyecek şekilde değişebileceğinden, tabloyu üretim kullanımı için sorgulamamalısınız. Tabloya el ile veri eklerseniz, sürüm sütunları eksik olduğundan kayıtların diğer değişikliklerden önce geldiği varsayılır.
İşlem hattı Unity Kataloğu'na yayımlanırsa, iç yedekleme tablolarına kullanıcılar erişilemez.
Delta Live Tables CDC sorgusu tarafından işlenen kayıtlar hakkında veri alma
Aşağıdaki ölçümler sorgular tarafından apply changes
yakalanır:
num_upserted_rows
: Güncelleştirme sırasında veri kümesine ekli çıkış satırlarının sayısı.num_deleted_rows
: Güncelleştirme sırasında veri kümesinden silinen mevcut çıkış satırlarının sayısı.
num_output_rows
CDC olmayan akışların çıkışı olan ölçüm sorgular için apply changes
yakalanmaz.
Sınırlamalar
Sorgunun veya apply_changes
işlevin APPLY CHANGES INTO
hedefi akış tablosu için kaynak olarak kullanılamaz. Sorgunun veya apply_changes
işlevin hedefinden okunan bir APPLY CHANGES INTO
tablo gerçekleştirilmiş bir görünüm olmalıdır.
Azure Databricks'te SCD tür 1 ve SCD tür 2
Aşağıdaki bölümlerde, hedef tabloları kaynak olaylara göre güncelleştiren Delta Live Tables SCD tür 1 ve tür 2 sorgularını gösteren örnekler verilmiştir:
- Yeni kullanıcı kayıtları oluşturun.
- Kullanıcı kaydını silme.
- Kullanıcı kayıtlarını güncelleştirin. SCD tür 1 örneğinde, son
UPDATE
işlemler geç gelir ve sıra dışı olayların işlenmesini gösteren hedef tablodan bırakılır.
Aşağıdaki örneklerde Delta Live Tables işlem hatlarını yapılandırma ve güncelleştirme konusunda bilgi sahibi olduğunuz varsayılır. Bkz . Öğretici: İlk Delta Live Tables işlem hattınızı çalıştırma.
Bu örnekleri çalıştırmak için bir örnek veri kümesi oluşturarak başlamalısınız. Bkz . Test verileri oluşturma.
Bu örneklerin giriş kayıtları aşağıda verilmiştir:
userId | Adı | şehir | operation | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lily | Cancun | INSERT | 2 |
123 | null | null | SİL | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Örnek verilerdeki son satırın açıklamasını kaldırdığınızda, kayıtların nerede kesilmesi gerektiğini belirten aşağıdaki kaydı ekler:
userId | Adı | şehir | operation | sequenceNum |
---|---|---|---|---|
null | null | null | TRUNCATE | 3 |
Not
Aşağıdaki örneklerin tümü hem hem de DELETE
TRUNCATE
işlemleri belirtme seçeneklerini içerir, ancak bunların her biri isteğe bağlıdır.
SCD tür 1 güncelleştirmelerini işleme
Aşağıdaki kod örneğinde SCD tür 1 güncelleştirmelerinin işlenmesi gösterilmektedir:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
SCD tür 1 örneğini çalıştırdıktan sonra hedef tablo aşağıdaki kayıtları içerir:
userId | Adı | şehir |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lily | Cancun |
EK kayıtla SCD tür 1 örneğini TRUNCATE
çalıştırdıktan sonra, içindeki işlem sequenceNum=3
nedeniyle TRUNCATE
kayıtlar 124
kesilir 126
ve hedef tablo aşağıdaki kaydı içerir:
userId | Adı | şehir |
---|---|---|
125 | Mercedes | Guadalajara |
SCD tür 2 güncelleştirmelerini işleme
Aşağıdaki kod örneğinde SCD tür 2 güncelleştirmelerinin işlenmesi gösterilmektedir:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
SCD tür 2 örneğini çalıştırdıktan sonra hedef tablo aşağıdaki kayıtları içerir:
userId | Adı | şehir | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | boş |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | boş |
126 | Lily | Cancun | 2 | boş |
SCD tür 2 sorgusu, hedef tablodaki geçmiş için izlenecek çıkış sütunlarının bir alt kümesini de belirtebilir. Diğer sütunlarda yapılan değişiklikler yeni geçmiş kayıtları oluşturmak yerine yerinde güncelleştirilir. Aşağıdaki örnek, sütunun izleme dışında tutulduğunu city
gösterir:
Aşağıdaki örnekte, SCD türü 2 ile izleme geçmişinin kullanılması gösterilmektedir:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Bu örneği ek TRUNCATE
kayıt olmadan çalıştırdıktan sonra hedef tablo aşağıdaki kayıtları içerir:
userId | Adı | şehir | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | boş |
125 | Mercedes | Guadalajara | 2 | boş |
126 | Lily | Cancun | 2 | boş |
Test verileri oluşturma
Aşağıdaki kod, bu öğreticide bulunan örnek sorgularda kullanılmak üzere örnek bir veri kümesi oluşturmak üzere sağlanmıştır. Yeni bir şema oluşturmak ve yeni bir tablo oluşturmak için uygun kimlik bilgilerine sahip olduğunuzu varsayarsak, bu deyimleri bir not defteri veya Databricks SQL ile yürütebilirsiniz. Aşağıdaki kodun Delta Live Tables işlem hattının bir parçası olarak çalıştırılması amaçlanmamıştır :
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Hedef akış tablosuna veri ekleme, değiştirme veya silme
İşlem hattınız Tabloları Unity Kataloğu'nda yayımlıyorsa, deyimler tarafından APPLY CHANGES INTO
oluşturulan hedef akış tablolarını değiştirmek için ekleme, güncelleştirme, silme ve birleştirme deyimleri gibi veri işleme dili (DML) deyimlerini kullanabilirsiniz.
Not
- Akış tablosunun tablo şemasını değiştiren DML deyimleri desteklenmez. DML deyimlerinizin tablo şemasını geliştirmeye çalışmadığından emin olun.
- Akış tablosunu güncelleştiren DML deyimleri, Databricks Runtime 13.3 LTS ve üzeri kullanılarak yalnızca paylaşılan unity kataloğu kümesinde veya SQL ambarında çalıştırılabilir.
- Akış yalnızca ekleme veri kaynakları gerektirdiğinden, işlemeniz bir kaynak akış tablosundan değişiklik içeren akış gerektiriyorsa (örneğin, DML deyimlerine göre), kaynak akış tablosunu okurken skipChangeCommits bayrağını ayarlayın.
skipChangeCommits
Ayarlandığında, kaynak tablodaki kayıtları silen veya değiştiren işlemler yoksayılır. İşlemeniz bir akış tablosu gerektirmiyorsa, hedef tablo olarak gerçekleştirilmiş bir görünüm (yalnızca ekleme kısıtlaması yoktur) kullanabilirsiniz.
Delta Live Tables belirtilen SEQUENCE BY
bir sütun kullandığından ve hedef tablonun ve __END_AT
sütunlarına __START_AT
uygun sıralama değerlerini yaydığından (SCD türü 2 için), DML deyimlerinin kayıtların düzgün sıralamasını korumak için bu sütunlar için geçerli değerler kullandığından emin olmanız gerekir. Bkz. CDC Delta Live Tablolarıyla nasıl uygulanır?.
Akış tablolarıyla DML deyimlerini kullanma hakkında daha fazla bilgi için bkz . Akış tablosunda veri ekleme, değiştirme veya silme.
Aşağıdaki örnek, başlangıç dizisi 5 olan etkin bir kayıt ekler:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);