Aracılığıyla paylaş


DEĞİşİkLİkLERİ UYGULAMA API'Sİ: Delta Live Tablolarında değişiklik veri yakalamayı basitleştirme

Delta Live Tables, API ile değişiklik veri yakalamayı APPLY CHANGES (CDC) basitleştirir. MERGE INTO Daha önce deyimi, Azure Databricks'te CDC kayıtlarını işlemek için yaygın olarak kullanılıyordu. Ancak, MERGE INTO sıra dışı kayıtlar nedeniyle yanlış sonuçlar verebilir veya kayıtları yeniden sıralamak için karmaşık mantık gerektirebilir.

Delta Live Tables'daki API, APPLY CHANGES sıra dışı kayıtları otomatik olarak işleyerek CDC kayıtlarının doğru işlenmesini sağlar ve sıra dışı kayıtları işlemek için karmaşık mantık geliştirme gereksinimini ortadan kaldırır.

APPLY CHANGES API, DELTA Live Tables SQL ve Python arabirimlerinde desteklenir; scd türü 1 ve tür 2 ile tabloları güncelleştirme desteği de dahil olmak üzere:

  • Kayıtları doğrudan güncelleştirmek için SCD tür 1'i kullanın. Güncelleştirilen kayıtlar için geçmiş korunmaz.
  • Tüm güncelleştirmelerde veya belirtilen bir sütun kümesindeki güncelleştirmelerde kayıtların geçmişini korumak için SCD tür 2'yi kullanın.

Söz dizimi ve diğer başvurular için bkz:

Not

Bu makalede, Delta Live Tables işlem hattınızdaki tabloların kaynak verilerdeki değişikliklere göre nasıl güncelleştirildiği açıklanır. Delta tabloları için satır düzeyi değişiklik bilgilerini kaydetmeyi ve sorgulamayı öğrenmek için bkz . Azure Databricks'te Delta Lake değişiklik veri akışını kullanma.

CDC, Delta Live Tables ile nasıl uygulanır?

Kaynak verilerde kayıtların sıralanması için Delta Live Tablolarının kaynak verilerin düzgün sıralanması için monoton olarak artan bir temsil olarak yorumladığı bir sütun belirtmeniz gerekir. Delta Live Tables, sıra dışı gelen verileri otomatik olarak işler. SCD Tür 2 değişiklikleri için Delta Live Tables, hedef tablonun ve __END_AT sütunlarına __START_AT uygun sıralama değerlerini yayılım. Her sıralama değerinde anahtar başına ayrı bir güncelleştirme olmalıdır ve NULL sıralama değerleri desteklenmez.

Delta Live Tables ile CDC işlemesi gerçekleştirmek için önce bir akış tablosu oluşturur ve ardından değişiklik akışı için kaynağı, anahtarları ve sıralamayı belirtmek üzere bir APPLY CHANGES INTO deyimi kullanırsınız. Hedef akış tablosunu oluşturmak için SQL'deki deyimini CREATE OR REFRESH STREAMING TABLE veya create_streaming_table() Python'daki işlevini kullanın. CDC işlemesini tanımlayan deyimini oluşturmak için SQL'deki deyimini veya apply_changes() Python'daki işlevini kullanınAPPLY CHANGES. Söz dizimi ayrıntıları için bkz. Delta Live Tablolarında SQL ile veri yakalamayı değiştirme veya Delta Live Tablolarında Python ile veri yakalamayı değiştirme.

Delta Live Tables CDC işlemesi için hangi veri nesneleri kullanılır?

Hive meta veri deposunda hedef tabloyu bildirdiğinizde iki veri yapısı oluşturulur:

  • Hedef tabloya atanan adı kullanan bir görünüm.
  • DELTA Live Tables tarafından CDC işlemeyi yönetmek için kullanılan iç yedekleme tablosu. Bu tablo, hedef tablo adına önceden bağlama __apply_changes_storage_ tarafından adlandırılır.

Örneğin, adlı dlt_cdc_targetbir hedef tablo bildirirseniz, meta veri deposunda adlı dlt_cdc_target bir görünüm ve adlı __apply_changes_storage_dlt_cdc_target bir tablo görürsünüz. Görünüm oluşturmak, Delta Live Tablolarının sıra dışı verileri işlemek için gereken ek bilgileri (örneğin, kaldırıldı taşlarını ve sürümleri) filtrelemesini sağlar. İşlenen verileri görüntülemek için hedef görünümü sorgular. Tablonun şeması __apply_changes_storage_ gelecekteki özellikleri veya geliştirmeleri destekleyecek şekilde değişebileceğinden, tabloyu üretim kullanımı için sorgulamamalısınız. Tabloya el ile veri eklerseniz, sürüm sütunları eksik olduğundan kayıtların diğer değişikliklerden önce geldiği varsayılır.

İşlem hattı Unity Kataloğu'na yayımlanırsa, iç yedekleme tablolarına kullanıcılar erişilemez.

Delta Live Tables CDC sorgusu tarafından işlenen kayıtlar hakkında veri alma

Aşağıdaki ölçümler sorgular tarafından apply changes yakalanır:

  • num_upserted_rows: Güncelleştirme sırasında veri kümesine ekli çıkış satırlarının sayısı.
  • num_deleted_rows: Güncelleştirme sırasında veri kümesinden silinen mevcut çıkış satırlarının sayısı.

num_output_rows CDC olmayan akışların çıkışı olan ölçüm sorgular için apply changes yakalanmaz.

Sınırlamalar

Sorgunun veya apply_changes işlevin APPLY CHANGES INTO hedefi akış tablosu için kaynak olarak kullanılamaz. Sorgunun veya apply_changes işlevin hedefinden okunan bir APPLY CHANGES INTO tablo gerçekleştirilmiş bir görünüm olmalıdır.

Azure Databricks'te SCD tür 1 ve SCD tür 2

Aşağıdaki bölümlerde, hedef tabloları kaynak olaylara göre güncelleştiren Delta Live Tables SCD tür 1 ve tür 2 sorgularını gösteren örnekler verilmiştir:

  1. Yeni kullanıcı kayıtları oluşturun.
  2. Kullanıcı kaydını silme.
  3. Kullanıcı kayıtlarını güncelleştirin. SCD tür 1 örneğinde, son UPDATE işlemler geç gelir ve sıra dışı olayların işlenmesini gösteren hedef tablodan bırakılır.

Aşağıdaki örneklerde Delta Live Tables işlem hatlarını yapılandırma ve güncelleştirme konusunda bilgi sahibi olduğunuz varsayılır. Bkz . Öğretici: İlk Delta Live Tables işlem hattınızı çalıştırma.

Bu örnekleri çalıştırmak için bir örnek veri kümesi oluşturarak başlamalısınız. Bkz . Test verileri oluşturma.

Bu örneklerin giriş kayıtları aşağıda verilmiştir:

userId Adı şehir operation sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancun INSERT 2
123 null null SİL 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Örnek verilerdeki son satırın açıklamasını kaldırdığınızda, kayıtların nerede kesilmesi gerektiğini belirten aşağıdaki kaydı ekler:

userId Adı şehir operation sequenceNum
null null null TRUNCATE 3

Not

Aşağıdaki örneklerin tümü hem hem de DELETETRUNCATE işlemleri belirtme seçeneklerini içerir, ancak bunların her biri isteğe bağlıdır.

SCD tür 1 güncelleştirmelerini işleme

Aşağıdaki kod örneğinde SCD tür 1 güncelleştirmelerinin işlenmesi gösterilmektedir:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

SCD tür 1 örneğini çalıştırdıktan sonra hedef tablo aşağıdaki kayıtları içerir:

userId Adı şehir
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lily Cancun

EK kayıtla SCD tür 1 örneğini TRUNCATE çalıştırdıktan sonra, içindeki işlem sequenceNum=3nedeniyle TRUNCATE kayıtlar 124 kesilir 126 ve hedef tablo aşağıdaki kaydı içerir:

userId Adı şehir
125 Mercedes Guadalajara

SCD tür 2 güncelleştirmelerini işleme

Aşağıdaki kod örneğinde SCD tür 2 güncelleştirmelerinin işlenmesi gösterilmektedir:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

SCD tür 2 örneğini çalıştırdıktan sonra hedef tablo aşağıdaki kayıtları içerir:

userId Adı şehir __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 boş
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 boş
126 Lily Cancun 2 boş

SCD tür 2 sorgusu, hedef tablodaki geçmiş için izlenecek çıkış sütunlarının bir alt kümesini de belirtebilir. Diğer sütunlarda yapılan değişiklikler yeni geçmiş kayıtları oluşturmak yerine yerinde güncelleştirilir. Aşağıdaki örnek, sütunun izleme dışında tutulduğunu city gösterir:

Aşağıdaki örnekte, SCD türü 2 ile izleme geçmişinin kullanılması gösterilmektedir:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Bu örneği ek TRUNCATE kayıt olmadan çalıştırdıktan sonra hedef tablo aşağıdaki kayıtları içerir:

userId Adı şehir __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 boş
125 Mercedes Guadalajara 2 boş
126 Lily Cancun 2 boş

Test verileri oluşturma

Aşağıdaki kod, bu öğreticide bulunan örnek sorgularda kullanılmak üzere örnek bir veri kümesi oluşturmak üzere sağlanmıştır. Yeni bir şema oluşturmak ve yeni bir tablo oluşturmak için uygun kimlik bilgilerine sahip olduğunuzu varsayarsak, bu deyimleri bir not defteri veya Databricks SQL ile yürütebilirsiniz. Aşağıdaki kodun Delta Live Tables işlem hattının bir parçası olarak çalıştırılması amaçlanmamıştır :

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Hedef akış tablosuna veri ekleme, değiştirme veya silme

İşlem hattınız Tabloları Unity Kataloğu'nda yayımlıyorsa, deyimler tarafından APPLY CHANGES INTO oluşturulan hedef akış tablolarını değiştirmek için ekleme, güncelleştirme, silme ve birleştirme deyimleri gibi veri işleme dili (DML) deyimlerini kullanabilirsiniz.

Not

  • Akış tablosunun tablo şemasını değiştiren DML deyimleri desteklenmez. DML deyimlerinizin tablo şemasını geliştirmeye çalışmadığından emin olun.
  • Akış tablosunu güncelleştiren DML deyimleri, Databricks Runtime 13.3 LTS ve üzeri kullanılarak yalnızca paylaşılan unity kataloğu kümesinde veya SQL ambarında çalıştırılabilir.
  • Akış yalnızca ekleme veri kaynakları gerektirdiğinden, işlemeniz bir kaynak akış tablosundan değişiklik içeren akış gerektiriyorsa (örneğin, DML deyimlerine göre), kaynak akış tablosunu okurken skipChangeCommits bayrağını ayarlayın. skipChangeCommits Ayarlandığında, kaynak tablodaki kayıtları silen veya değiştiren işlemler yoksayılır. İşlemeniz bir akış tablosu gerektirmiyorsa, hedef tablo olarak gerçekleştirilmiş bir görünüm (yalnızca ekleme kısıtlaması yoktur) kullanabilirsiniz.

Delta Live Tables belirtilen SEQUENCE BY bir sütun kullandığından ve hedef tablonun ve __END_AT sütunlarına __START_AT uygun sıralama değerlerini yaydığından (SCD türü 2 için), DML deyimlerinin kayıtların düzgün sıralamasını korumak için bu sütunlar için geçerli değerler kullandığından emin olmanız gerekir. Bkz. CDC Delta Live Tablolarıyla nasıl uygulanır?.

Akış tablolarıyla DML deyimlerini kullanma hakkında daha fazla bilgi için bkz . Akış tablosunda veri ekleme, değiştirme veya silme.

Aşağıdaki örnek, başlangıç dizisi 5 olan etkin bir kayıt ekler:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);