Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Lakeflow Spark Bildirimli Boru Hatları (SDP), AUTO CDC ve AUTO CDC FROM SNAPSHOT API'leri ile değişiklik verilerini yakalamayı (CDC) basitleştirir.
Uyarı
API'ler AUTO CDC API'lerin APPLY CHANGES yerini alır ve aynı söz dizimine sahiptir.
APPLY CHANGES API'ler hala kullanılabilir durumdadır, ancak Databricks yerine AUTO CDC API'lerin kullanılmasını önerir.
Kullandığınız arabirim, değişiklik verilerinin kaynağına bağlıdır:
- Değişiklik veri akışındaki (CDF) değişiklikleri işlemek için kullanın
AUTO CDC. - Veritabanı anlık görüntülerindeki değişiklikleri işlemek için (Genel Önizleme ve yalnızca Python için kullanılabilir) kullanın
AUTO CDC FROM SNAPSHOT.
MERGE INTO Daha önce deyimi, Azure Databricks'te CDC kayıtlarını işlemek için yaygın olarak kullanılıyordu. Ancak, MERGE INTO sıra dışı kayıtlar nedeniyle yanlış sonuçlar verebilir veya kayıtları yeniden sıralamak için karmaşık mantık gerektirir.
AUTO CDC API, pipelline SQL ve Python arabirimlerinde desteklenir.
AUTO CDC FROM SNAPSHOT API, Python arabiriminde desteklenir. API'ler AUTO CDC Apache Spark Bildirimli İşlem Hatları tarafından desteklenmez.
SCD tür 1 ve tür 2 kullanarak tabloların güncelleştirilmesini hem AUTO CDC hem de AUTO CDC FROM SNAPSHOT destekler.
- Kayıtları doğrudan güncelleştirmek için SCD tür 1'i kullanın. Güncelleştirilmiş kayıtlar için geçmiş korunmaz.
- Tüm güncelleştirmelerde veya belirtilen bir sütun kümesindeki güncelleştirmelerde kayıtların geçmişini korumak için SCD tür 2'yi kullanın.
Söz dizimi ve diğer başvurular için bkz. İşlem hatları için AUTO CDC (SQL), işlem hatları için AUTO CDC (Python) ve işlem hatları için AUTO CDC FROM SNAPSHOT (Python).
Uyarı
Bu makalede, kaynak verilerdeki değişikliklere göre işlem hatlarınızdaki tabloların nasıl güncelleştirildiği açıklanır. Delta tabloları için satır düzeyi değişiklik bilgilerini kaydetmeyi ve sorgulamayı öğrenmek için bkz. Azure Databricks'te Delta Lake değişiklik veri akışını kullanma.
Gereksinimler
CDC API'lerini kullanmak için işlem hattınızın sunucusuz SDP veya SDPPro ya da Advanced sürümleri kullanacak şekilde yapılandırılması gerekir.
CDC, AUTO CDC API'siyle nasıl uygulanır?
AUTO CDC API'si, sıra dışı kayıtları otomatik olarak işleyerek CDC kayıtlarının doğru işlenmesini sağlar ve sıra dışı kayıtları işlemek için karmaşık mantık geliştirme gereksinimini ortadan kaldırır. **
Kayıtların sıralanması için kaynak verilerde, API'lerin kaynak verilerin doğru sıralanması için monoton olarak artan bir temsil şeklinde yorumladığı bir sütunu belirtmeniz gerekir. Veri boru hatları, sırasız gelen verileri otomatik olarak işler. SCD tür 2 değişiklikleri için işlem hatları sıralama değerlerini hedef tablonun __START_AT ve __END_AT sütunlarına uygun şekilde yaymaktadır. Her sıralama değerinde anahtar başına ayrı bir güncelleştirme olmalıdır ve NULL sıralama değerleri desteklenmez.
AUTO CDC ile CDC işlemesi gerçekleştirmek için, önce bir akış tablosu oluşturur ve ardından SQL'deki AUTO CDC ... INTO deyimini veya Python'daki create_auto_cdc_flow() işlevini kullanarak değişiklik akışının kaynağını, anahtarlarını ve sıralamasını belirtirsiniz. Hedef akış tablosunu oluşturmak için SQL'deki CREATE OR REFRESH STREAMING TABLE deyimini veya Python'daki create_streaming_table() işlevini kullanın.
SCD tür 1 ve tür 2 işleme örneklerine bakın.
Söz dizimi ayrıntıları için bkz. pipeline'lar SQL referansı veya Python referansı.
CDC, AUTO CDC FROM SNAPSHOT API ile nasıl uygulanır?
Önemli
AUTO CDC FROM SNAPSHOT API Genel Kullanıma Açık Önizleme aşamasındadır.
AUTO CDC FROM SNAPSHOT , bir dizi sıralı anlık görüntüyü karşılaştırarak kaynak verilerdeki değişiklikleri verimli bir şekilde belirleyen ve ardından anlık görüntülerdeki kayıtların CDC işlenmesi için gereken işlemeyi çalıştıran bildirim temelli bir API'dir.
AUTO CDC FROM SNAPSHOT yalnızca Python işlem hatları arabirimi tarafından desteklenir.
AUTO CDC FROM SNAPSHOT birden çok kaynak türünden anlık görüntü alma işlemini destekler.
- Mevcut bir tablo veya görünümden anlık görüntüleri almak için düzenli anlık görüntü alımını kullanın.
AUTO CDC FROM SNAPSHOTmevcut bir veritabanı nesnesinden anlık görüntüleri düzenli aralıklarla alma desteği için basit, kolaylaştırılmış bir arabirime sahiptir. Her işlem hattı güncelleştirmesi ile yeni bir anlık görüntü alınır ve alım zamanı anlık görüntü sürümü olarak kullanılır. Bir işlem hattı sürekli modda çalıştırıldığında, işlemeyi içeren akışın tetikleyici aralığı ayarı tarafından belirlenen bir dönemde her işlem hattı güncelleştirmesi ile birden çok anlık görüntü alınmaktadırAUTO CDC FROM SNAPSHOT. - Oracle veya MySQL veritabanından ya da veri ambarından oluşturulan anlık görüntüler gibi veritabanı anlık görüntülerini içeren dosyaları işlemek için geçmiş anlık görüntü alımını kullanın.
herhangi bir AUTO CDC FROM SNAPSHOT kaynağından CDC işlemesi gerçekleştirmek için önce bir akış tablosu oluşturur ve ardından Python'da create_auto_cdc_from_snapshot_flow() işlevini kullanarak anlık görüntüyü, anahtarları ve diğer gerekli bağımsız değişkenleri belirtirsiniz.
Düzenli anlık görüntü alımı ve geçmiş anlık görüntü alımı örneklerine bakın.
API'ye geçirilen anlık görüntülerin sürüme göre artan sırada olması gerekir. SDP sıra dışı bir anlık görüntü algılarsa bir hata oluşur.
Söz dizimi ayrıntıları için bkz. işlem hatları Python başvurusu.
Sıralama için birden çok sütun kullanma
Birden çok sütuna göre sıralayabilirsiniz (örneğin, bir zaman damgası ve bağlantıları kesmek için bir kimlik), bunları birleştirmek için bir STRUCT kullanabilirsiniz: önce STRUCT'nin ilk alanına göre sipariş verir ve bir bağlama durumunda ikinci alanı dikkate alır vb.
SQL örneği:
SEQUENCE BY STRUCT(timestamp_col, id_col)
Python örneği:
sequence_by = struct("timestamp_col", "id_col")
Sınırlamalar
Sıralama için kullanılan sütun sıralanabilir bir veri türü olmalıdır.
Örnek: CDF kaynak verileriyle SCD tür 1 ve SCD tür 2 işleme
Aşağıdaki bölümlerde, bir değişiklik veri akışındaki kaynak olayları temel alarak hedef tabloları güncelleştiren SCD tür 1 ve tür 2 sorgularının örnekleri verilmiştir:
- Yeni kullanıcı kayıtları oluşturur.
- Kullanıcı kaydını siler.
- Kullanıcı kayıtlarını güncelleştirir. SCD türü 1 örneğinde, son
UPDATEişlemleri geç gelir ve bu yüzden hedef tablodan kaldırılarak sıra dışı olayların nasıl işlendiği gösterilir.
Aşağıdaki örneklerde işlem hatlarını yapılandırma ve güncelleştirme konusunda bilgi sahibi olduğunuz varsayılır. Bkz. Öğretici: Değişiklik verilerini yakalamayı kullanarak ETL işlem hattı oluşturma.
Bu örnekleri çalıştırmak için bir örnek veri kümesi oluşturarak başlamalısınız. Bkz. Test verileri oluşturma.
Bu örneklerin giriş kayıtları aşağıda verilmiştir:
| userId | name | city | Operasyon | sıraNumarası |
|---|---|---|---|---|
| 124 | Raul | Oaxaca | INSERT | 1 |
| 123 | Isabel | Monterrey | INSERT | 1 |
| 125 | Mercedes | Tijuana | INSERT | 2 |
| 126 | Zambak | Cancun | INSERT | 2 |
| 123 | sıfır | sıfır | DELETE | 6 |
| 125 | Mercedes | Guadalajara | UPDATE | 6 |
| 125 | Mercedes | Mexicali | UPDATE | 5 |
| 123 | Isabel | Chihuahua | UPDATE | 5 |
Örnek verilerdeki son satırı yorumdan çıkardığınızda, kayıtların nerede kesileceğini belirten aşağıdaki kaydı ekler:
| userId | name | city | Operasyon | sıraNumarası |
|---|---|---|---|---|
| sıfır | sıfır | sıfır | TRUNCATE | 3 |
Uyarı
Aşağıdaki örneklerin tümü hem hem DELETE de TRUNCATE işlemleri belirtme seçeneklerini içerir, ancak her biri isteğe bağlıdır.
SCD tür 1 güncelleştirmelerini işleme
Aşağıdaki örnekte SCD tür 1 güncelleştirmelerinin işlenmesi gösterilmektedir:
Piton
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flowname AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
SCD tür 1 örneğini çalıştırdıktan sonra hedef tablo aşağıdaki kayıtları içerir:
| userId | name | city |
|---|---|---|
| 124 | Raul | Oaxaca |
| 125 | Mercedes | Guadalajara |
| 126 | Zambak | Cancun |
SCD tür 1 örneğini ek TRUNCATE kayıtla çalıştırdıktan sonra, 124 işleminin 126 konumunda olması nedeniyle TRUNCATE ve sequenceNum=3 kayıtları kesilir ve hedef tablo aşağıdaki kaydı içerir:
| userId | name | city |
|---|---|---|
| 125 | Mercedes | Guadalajara |
SCD tür 2 güncelleştirmelerini işleme
Aşağıdaki örnekte SCD tür 2 güncelleştirmelerinin işlenmesi gösterilmektedir:
Piton
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
SCD tür 2 örneğini çalıştırdıktan sonra hedef tablo aşağıdaki kayıtları içerir:
| userId | name | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Monterrey | 1 | 5 |
| 123 | Isabel | Chihuahua | 5 | 6 |
| 124 | Raul | Oaxaca | 1 | sıfır |
| 125 | Mercedes | Tijuana | 2 | 5 |
| 125 | Mercedes | Mexicali | 5 | 6 |
| 125 | Mercedes | Guadalajara | 6 | sıfır |
| 126 | Zambak | Cancun | 2 | sıfır |
SCD tür 2 sorgusu, hedef tablodaki geçmiş için izlenecek çıkış sütunlarının bir alt kümesini de belirtebilir. Diğer sütunlarda yapılan değişiklikler yeni geçmiş kayıtları oluşturmak yerine yerinde güncelleştirilir. Aşağıdaki örnek, city sütunu izlemeden hariç tutmayı gösterir.
Aşağıdaki örnekte, SCD türü 2 ile izleme geçmişinin kullanılması gösterilmektedir:
Piton
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Bu örneği ek TRUNCATE kayıt olmadan çalıştırdıktan sonra hedef tablo aşağıdaki kayıtları içerir:
| userId | name | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Chihuahua | 1 | 6 |
| 124 | Raul | Oaxaca | 1 | sıfır |
| 125 | Mercedes | Guadalajara | 2 | sıfır |
| 126 | Zambak | Cancun | 2 | sıfır |
Test verileri oluşturma
Aşağıdaki kod, bu öğreticide bulunan örnek sorgularda kullanılmak üzere örnek bir veri kümesi oluşturmak üzere sağlanmıştır. Yeni bir şema oluşturmak ve yeni bir tablo oluşturmak için uygun kimlik bilgilerine sahip olduğunuzu varsayarsak, bu deyimleri bir not defteri veya Databricks SQL ile çalıştırabilirsiniz. Aşağıdaki kodun işlem hattı tanımının bir parçası olarak çalıştırılması amaçlanmamıştır :
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Örnek: Düzenli anlık görüntü işleme
Aşağıdaki örnek, konumunda mycatalog.myschema.mytabledepolanan bir tablonun anlık görüntülerini alan SCD tür 2 işlemesini gösterir. İşleme sonuçları adlı targetbir tabloya yazılır.
mycatalog.myschema.mytable 2024-01-01 00:00:00 zaman damgasındaki kayıtlar
| Key | Değer |
|---|---|
| 1 | a1 |
| 2 | a2 |
mycatalog.myschema.mytable 2024-01-01 12:00:00 zaman damgasındaki kayıtlar
| Key | Değer |
|---|---|
| 2 | b2 |
| 3 | a3 |
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
Anlık görüntüleri işledikten sonra hedef tablo aşağıdaki kayıtları içerir:
| Key | Değer | __START_AT | __END_AT |
|---|---|---|---|
| 1 | a1 | 01.01.2024 00:00 | 2024-01-01 12:00:00 |
| 2 | a2 | 01.01.2024 00:00 | 2024-01-01 12:00:00 |
| 2 | b2 | 2024-01-01 12:00:00 | sıfır |
| 3 | a3 | 2024-01-01 12:00:00 | sıfır |
Örnek: Geçmiş anlık görüntü işleme
Aşağıdaki örnekte, bir bulut depolama sisteminde depolanan iki anlık görüntüden kaynak olayları temel alarak bir hedef tabloyu güncelleştiren SCD tür 2 işleme gösterilmektedir:
timestamp konumunda depolanan /<PATH>/filename1.csv anlık görüntü
| Key | TrackingColumn | İzlenmeyenSütun |
|---|---|---|
| 1 | a1 | b1 |
| 2 | a2 | b2 |
| 4 | a4 | b4 |
timestamp + 5 konumunda depolanan /<PATH>/filename2.csv anlık görüntü
| Key | TrackingColumn | İzlenmeyenSütun |
|---|---|---|
| 2 | a2_new | b2 |
| 3 | a3 | b3 |
| 4 | a4 | b4_new |
Aşağıdaki kod örneği, bu anlık görüntülerle SCD tür 2 güncelleştirmelerinin işlenmesini gösterir:
from pyspark import pipelines as dp
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dp.create_streaming_live_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
Anlık görüntüleri işledikten sonra hedef tablo aşağıdaki kayıtları içerir:
| Key | TrackingColumn | İzlenmeyenSütun | __START_AT | __END_AT |
|---|---|---|---|---|
| 1 | a1 | b1 | 1 | 2 |
| 2 | a2 | b2 | 1 | 2 |
| 2 | a2_new | b2 | 2 | sıfır |
| 3 | a3 | b3 | 2 | sıfır |
| 4 | a4 | b4_new | 1 | sıfır |
Hedef akış tablosuna veri ekleme, değiştirme veya silme
İşlem hattınız tabloları Unity Kataloğu'nda yayımlıyorsa, akış tablolarında (DML) deyimlerini, örneğin ekleme, güncelleştirme, silme ve birleştirme deyimlerini kullanarak, AUTO CDC ... INTO deyimleriyle oluşturulan hedef tabloları değiştirebilirsiniz.
Uyarı
- Akış tablosunun tablo şemasını değiştiren DML deyimleri desteklenmez. DML deyimlerinizin tablo şemasını geliştirmeye çalışmadığından emin olun.
- Akış tablosunu güncelleştiren DML deyimleri, Databricks Runtime 13.3 LTS ve üzeri kullanılarak yalnızca paylaşılan unity kataloğu kümesinde veya SQL ambarında çalıştırılabilir.
- Akış, yalnızca eklemeye izin veren veri kaynakları gerektirdiğinden, işlemeniz bir kaynak akış tablosundan değişiklikler içeren akış gerektiriyorsa (örneğin, DML deyimleri kullanılarak), kaynak akış tablosunu okurken skipChangeCommits bayrağını ayarlayın.
skipChangeCommitsayarlandığında, kaynak tablodaki kayıtları silen veya değiştiren işlemler yoksayılır. İşlemeniz bir akış tablosu gerektirmiyorsa, ekleme kısıtlaması olmayan gerçekleştirilmiş bir görünümü hedef tablo olarak kullanabilirsiniz.
Lakeflow Spark Bildirimli İşlem Hatları belirtilen SEQUENCE BY bir sütun kullandığından ve hedef tablonun ve __START_AT sütunlarına __END_AT uygun sıralama değerlerini yaydığından (SCD türü 2 için), DML deyimlerinin kayıtların düzgün sıralanmasını sağlamak için bu sütunlar için geçerli değerler kullandığından emin olmanız gerekir. Bkz. AUTO CDC API'siyle CDC nasıl uygulanır?.
Akış tablolarıyla DML deyimlerini kullanma hakkında daha fazla bilgi için bkz. Akış tablosunda veri ekleme, değiştirme veya silme.
Aşağıdaki örnek, başlangıç dizisi 5 olan etkin bir kayıt ekler:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
AUTO CDC hedef tablosundan değişiklik veri akışını okuma
Databricks Runtime 15.2 ve üzeri sürümlerde, diğer Delta tablolarındaki değişiklik veri akışını okuduğunuz şekilde, AUTO CDC veya AUTO CDC FROM SNAPSHOT sorgularının hedefi olan bir akış tablosundan da değişiklik veri akışını okuyabilirsiniz. Bir hedef akış tablosundan değişiklik veri akışını okumak için aşağıdakiler gereklidir:
- Hedef akış tablosunun Unity Kataloğu'nda yayımlanması gerekir. Bkz. Unity Kataloğu'nu işlem hatlarıyla kullanma.
- Hedef akış tablosundaki değişiklik veri akışını erişmek için Databricks Runtime 15.2 veya üzerini kullanmanız gerekir. Değişiklik veri akışını farklı bir işlem hattında okumak için işlem hattının Databricks Runtime 15.2 veya üzerini kullanacak şekilde yapılandırılması gerekir.
Lakeflow Spark Bildirimli İşlem Hatlarında oluşturulan bir hedef akış tablosundaki değişiklik veri akışını, diğer Delta tablolarından değişiklik veri akışını okumakla aynı şekilde okursunuz. Python ve SQL örnekleri de dahil olmak üzere Delta değişiklik veri akışı işlevini kullanma hakkında daha fazla bilgi edinmek için bkz. Azure Databricks'te Delta Lake değişiklik veri akışını kullanma.
Uyarı
Değişiklik veri akışı kaydı, değişiklik olayının türünü tanımlayan meta verileri içerir. Tabloda bir kayıt güncellendiğinde, ilişkili değişiklik kayıtlarının meta verileri genellikle _change_type değerlerine ayarlanmış ve update_preimage ile update_postimage olaylarını içerir.
Ancak, birincil anahtar değerlerinin değiştirilmesini _change_type içeren hedef akış tablosunda güncelleştirmeler yapıldığında değerler farklıdır. Değişiklikler birincil anahtar güncelleştirmelerini içerdiğinde, _change_type meta veri alanları, insert ve delete olayları olarak ayarlanır. Birincil anahtarlardaki değişiklikler, UPDATE veya MERGE deyimiyle anahtar alanlarından birinde manuel güncellemeler yapıldığında veya SCD tür 2 tabloları için __start_at alanı, daha önceki bir başlangıç dizisi değerini yansıtacak şekilde değiştiğinde meydana gelebilir.
Sorgu AUTO CDC , SCD tür 1 ve SCD tür 2 işleme için farklılık gösteren birincil anahtar değerlerini belirler:
- SCD tür 1 işlemesi ve işlem hatlarının Python arabirimi için birincil anahtar,
keysparametresinincreate_auto_cdc_flow()işlevindeki değeridir. SQL arabirimi için birincil anahtar,KEYSifadesindekiAUTO CDC ... INTOyan tümcesi tarafından tanımlanan sütunlardır. - SCD tip 2 için birincil anahtar,
keysparametresi veyaKEYSyan tümcesi artıcoalesce(__START_AT, __END_AT)işleminin dönüş değeridir; burada__START_ATve__END_AThedef akış tablosunun karşılık gelen sütunlarıdır.
Veri işleme hatlarındaki CDC sorgusu tarafından işlenen kayıtlar hakkında veri alın
Uyarı
Aşağıdaki ölçümler yalnızca AUTO CDC sorgular tarafından yakalanır, AUTO CDC FROM SNAPSHOT sorgular tarafından yakalanmaz.
Aşağıdaki metrikler AUTO CDC sorgular tarafından yakalanır.
-
num_upserted_rows: Güncelleştirme sırasında veri kümesine ekli çıkış satırlarının sayısı. -
num_deleted_rows: Güncelleştirme sırasında veri kümesinden silinen mevcut çıkış satırlarının sayısı.
num_output_rows CDC olmayan akışlar için çıkış olan ölçüm, AUTO CDC sorgular için yakalanmaz.
Bir işlem hattında CDC işleme için hangi veri nesneleri kullanılır?
Uyarı
- Bu veri yapıları yalnızca
AUTO CDCişlemi için geçerlidir,AUTO CDC FROM SNAPSHOTişlemi için geçerli değildir. - Bu veri yapıları yalnızca hedef tablo Hive meta veri deposunda yayımlandığında geçerlidir. İşlem hattı Unity Kataloğu'nda yayımlanırsa, iç yedekleme tablolarına kullanıcılar erişemez.
Hive meta veri deposunda hedef tabloyu bildirdiğinizde iki veri yapısı oluşturulur:
- Hedef tabloya atanan adı kullanan bir görünüm.
- CDC işlemeyi yönetmek için işlem hattı tarafından kullanılan bir iç yedekleme tablosu. Bu tablo, hedef tablo adının önüne
__apply_changes_storage_eklenerek adlandırılır.
Örneğin, adlı dp_cdc_targetbir hedef tablo bildirirseniz, meta veri deposunda adlı dp_cdc_target bir görünüm ve adlı __apply_changes_storage_dp_cdc_target bir tablo görürsünüz. Bir görünüm oluşturmak, Lakeflow Spark Deklaratif İşlem Hatlarının sırasız verileri işlemek için gereken fazladan bilgileri (örneğin, mezar taşları ve sürümleri) filtrelemesini sağlar. İşlenen verileri görüntülemek için hedef görünümü sorgular. Tablonun şeması __apply_changes_storage_ gelecekteki özellikleri veya geliştirmeleri destekleyecek şekilde değişebileceğinden, tabloyu üretim kullanımı için sorgulamamalısınız. Tabloya el ile veri eklerseniz, sürüm sütunları eksik olduğundan kayıtların diğer değişikliklerden önce geldiği varsayılır.