Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Potoki Deklaratywne Lakeflow upraszczają przechwytywanie zmian danych (CDC) za pomocą interfejsów API AUTO CDC
i AUTO CDC FROM SNAPSHOT
.
Uwaga
Interfejsy AUTO CDC
API zastępują APPLY CHANGES
interfejsy API i mają tę samą składnię. Interfejsy APPLY CHANGES
API są nadal dostępne, ale usługa Databricks zaleca korzystanie z AUTO CDC
interfejsów API w ich miejscu.
Interfejs, którego używasz, zależy od źródła danych o zmianach.
- Użyj
AUTO CDC
do przetwarzania zmian z kanału danych zmian (CDF). - Użyj
AUTO CDC FROM SNAPSHOT
(publiczna wersja zapoznawcza i dostępna tylko dla języka Python), aby przetworzyć zmiany w migawkach bazy danych.
Wcześniej polecenie MERGE INTO
było często używane do przetwarzania rekordów CDC na platformie Azure Databricks. Jednak MERGE INTO
może generować niepoprawne wyniki z powodu nieuporządkowanych rekordów lub wymaga skomplikowanej logiki do ponownego uporządkowania rekordów.
Interfejs AUTO CDC
API jest obsługiwany w interfejsach SQL i Python potoków deklaratywnych Lakeflow. Interfejs API jest obsługiwany w interfejsie języka Python potoków deklaratywnych Lakeflow.
Zarówno AUTO CDC
, jak i AUTO CDC FROM SNAPSHOT
obsługują aktualizowanie tabel przy użyciu typu SCD 1 i typu 2:
- Użyj typu SCD 1, aby bezpośrednio aktualizować rekordy. Historia nie jest zachowywana dla zaktualizowanych rekordów.
- Użyj typu SCD 2, aby zachować historię rekordów we wszystkich aktualizacjach lub aktualizacjach określonego zestawu kolumn.
Aby uzyskać informacje o składni i innych odwołaniach, zobacz AUTO CDC for Lakeflow Deklaratywne potoki SQL, AUTO CDC for Lakeflow Deklaratywne potoki Python i AUTO CDC FROM SNAPSHOT for Lakeflow Deklaratywne potoki Python.
Uwaga
W tym artykule opisano, jak aktualizować tabele w Lakeflow Declarative Pipelines na podstawie zmian danych źródłowych. Aby dowiedzieć się, jak rejestrować i zapytaniać o zmiany na poziomie wiersza dla tabel Delta, zobacz Use Delta Lake change data feed on Azure Databricks.
Wymagania
Aby korzystać z interfejsów API CDC, potok musi być skonfigurowany do używania potoków deklaratywnych bezserwerowych Lakeflow lub potoków deklaratywnych Pro
Lakeflow lub Advanced
edycji.
W jaki sposób CDC jest implementowane za pomocą interfejsu API AUTO CDC?
Dzięki automatycznej obsłudze rekordów poza sekwencją, API AUTO CDC w Lakeflow Declarative Pipelines zapewnia prawidłowe przetwarzanie rekordów CDC i eliminuje konieczność opracowywania złożonej logiki do obsługi rekordów poza sekwencją. Należy określić kolumnę w danych źródłowych, na której mają być sekwencjonowane rekordy, co deklaratywne potoki Lakeflow interpretują jako monotonicznie rosnącą reprezentację prawidłowego porządku danych źródłowych. Lakeflow Declarative Pipelines automatycznie obsługują dane przychodzące w niewłaściwej kolejności. W przypadku zmian typu SCD 2, potoki deklaratywne Lakeflow propagują odpowiednie wartości sekwencjonowania do kolumn __START_AT
i __END_AT
tabeli docelowej. Powinna istnieć jedna odrębna aktualizacja dla każdego klucza przy każdym wartości sekwencji, a wartości sekwencji NULL nie są obsługiwane.
Aby wykonać przetwarzanie CDC za pomocą AUTO CDC
, należy najpierw utworzyć tabelę streamingową, a następnie użyć instrukcji AUTO CDC ... INTO
w języku SQL lub funkcji create_auto_cdc_flow()
w języku Python, aby określić źródło, klucze i sekwencjonowanie dla strumienia zmian. Aby utworzyć docelową tabelę przesyłania strumieniowego, użyj instrukcji CREATE OR REFRESH STREAMING TABLE
w języku SQL lub funkcji create_streaming_table()
w języku Python. Zobacz przykłady przetwarzania typów SCD 1 i 2.
Aby uzyskać szczegółowe informacje o składni, zobacz dokumentację SQL potoków deklaratywnych lakeflow lub dokumentację języka Python.
Jak jest zaimplementowane CDC w<|> API?
Ważne
Interfejs API AUTO CDC FROM SNAPSHOT
znajduje się w fazie Public Preview dla użytkowników publicznych.
AUTO CDC FROM SNAPSHOT
to deklaratywne API, które wydajnie określa zmiany w danych źródłowych, porównując serię uporządkowanych migawk, a następnie uruchamia przetwarzanie wymagane do przetwarzania zmian danych (CDC) zapisów w migawkach.
AUTO CDC FROM SNAPSHOT
jest obsługiwany wyłącznie przez interfejs Python Lakeflow do potoków deklaratywnych.
AUTO CDC FROM SNAPSHOT
obsługuje pobieranie migawkowych obrazów z wielu typów źródeł.
- Użyj okresowego pozyskiwania migawek, aby pobrać migawki z istniejącej tabeli lub widoku.
AUTO CDC FROM SNAPSHOT
ma prosty, usprawniony interfejs, aby wspierać okresowe pobieranie migawek z istniejącego obiektu bazy danych. Nowa migawka jest wczytywana z każdą aktualizacją potoku, a czas wczytywania jest używany jako wersja migawki. Gdy potok jest uruchamiany w trybie ciągłym, podczas każdej aktualizacji potoku pobieranych jest wiele migawek w okresach określonych przez ustawienie interwału wyzwalacza dla przepływu obejmującegoAUTO CDC FROM SNAPSHOT
przetwarzanie. - Użyj historycznego pobierania migawek do przetwarzania plików zawierających migawki bazy danych, takich jak migawki wygenerowane z bazy danych Oracle lub MySQL albo magazynu danych.
Aby wykonać przetwarzanie CDC z dowolnego typu źródła z użyciem AUTO CDC FROM SNAPSHOT
, należy najpierw utworzyć tabelę strumieniową, a następnie użyć funkcji create_auto_cdc_from_snapshot_flow()
w Pythonie, aby określić migawkę, klucze i inne argumenty wymagane do zaimplementowania przetwarzania. Zobacz przykłady okresowego pobierania migawki i historycznego pobierania migawki.
Migawki przekazywane do API muszą być uporządkowane rosnąco według wersji. Jeśli Potoki Deklaratywne Lakeflow wykryją migawkę poza kolejnością, wystąpi błąd.
Aby uzyskać szczegółowe informacje o składni, zobacz odniesienie dotyczące języka Python dla Deklaratywnych Potoków Lakeflow.
Użyj wielu kolumn do sekwencjonowania
Możesz ustawić sekwencję według wielu kolumn (na przykład znacznik czasu i identyfikator, aby rozstrzygnąć remisy), możesz użyć struktury STRUCT do ich połączenia: najpierw sortuje według pierwszego pola w STRUCT, a w przypadku remisu bierze pod uwagę drugie pole i tak dalej.
Przykład w języku SQL:
SEQUENCE BY STRUCT(timestamp_col, id_col)
Przykład w języku Python:
sequence_by = struct("timestamp_col", "id_col")
Ograniczenia
Kolumna używana do sekwencjonowania musi być sortowalnym typem danych.
Przykład: przetwarzanie typ 1 SCD i typ 2 SCD z danymi źródłowymi CDF
W poniższych sekcjach przedstawiono przykłady potoków deklaratywnych Lakeflow typu 1 i typu 2, zapytań aktualizujących tabele docelowe na podstawie zdarzeń źródłowych z kanału danych zmiany, które:
- Tworzy nowe rekordy użytkowników.
- Usuwa rekord użytkownika.
- Aktualizuje rekordy użytkowników. W przykładzie SCD typu 1, ostatnie operacje
UPDATE
docierają późno i są usuwane z tabeli docelowej, co pokazuje obsługę zdarzeń poza kolejnością.
Poniższe przykłady zakładają, że masz świadomość, jak konfigurować i aktualizować deklaratywne potoki Lakeflow. Zobacz Samouczek: tworzenie potoku ETL przy użyciu przechwytywania zmian danych za pomocą potoków deklaratywnych lakeflow.
Aby uruchomić te przykłady, musisz zacząć od utworzenia przykładowego zestawu danych. Zobacz Generowanie danych testowych.
Poniżej przedstawiono rekordy wejściowe dla tych przykładów:
Identyfikator użytkownika | nazwa | miejscowość | operacja | numer sekwencji |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lilia | Cancun | INSERT | 2 |
123 | null | null | USUŃ | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Jeśli odkomentujesz ostatni wiersz w przykładowych danych, zostanie dodany następujący rekord, który określa, w którym miejscu rekordy powinny zostać obcięte.
Identyfikator użytkownika | nazwa | miejscowość | operacja | numer sekwencji |
---|---|---|---|---|
null | null | null | SKRÓCIĆ | 3 |
Uwaga
Wszystkie poniższe przykłady zawierają opcje do określenia zarówno operacji DELETE
, jak i TRUNCATE
, ale każda z nich jest opcjonalna.
Przetwarzanie aktualizacji SCD typu 1
Następujący przykład demonstruje przetwarzanie aktualizacji typu SCD 1:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flowname AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Po uruchomieniu przykładu typu SCD 1 tabela docelowa zawiera następujące rekordy:
Identyfikator użytkownika | nazwa | miejscowość |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lilia | Cancun |
Po uruchomieniu przykładu typu SCD 1 z dodatkowym rekordem TRUNCATE
rekordy 124
i 126
są obcinane z powodu operacji TRUNCATE
w sequenceNum=3
, a tabela docelowa zawiera następujący rekord:
Identyfikator użytkownika | nazwa | miejscowość |
---|---|---|
125 | Mercedes | Guadalajara |
Przetwarzanie aktualizacji typu 2 SCD
Przykład poniżej demonstruje przetwarzanie aktualizacji typu 2 SCD:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Po uruchomieniu przykładowego typu SCD 2 tabela docelowa zawiera następujące rekordy:
Identyfikator użytkownika | nazwa | miejscowość | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | null |
126 | Lilia | Cancun | 2 | null |
Zapytanie typu SCD 2 może również określać podzestaw kolumn wyjściowych do śledzenia historii w tabeli docelowej. Zmiany w innych kolumnach są aktualizowane bezpośrednio, zamiast generować nowe rekordy historii. W poniższym przykładzie pokazano wykluczenie kolumny city
ze śledzenia:
Poniższy przykład demonstruje użycie historii śledzenia z typem 2 SCD:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Po uruchomieniu tego przykładu bez dodatkowego rekordu TRUNCATE
tabela docelowa zawiera następujące rekordy:
Identyfikator użytkownika | nazwa | miejscowość | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Guadalajara | 2 | null |
126 | Lilia | Cancun | 2 | null |
Generowanie danych testowych
Poniższy kod służy do generowania przykładowego zestawu danych do użycia w przykładowych zapytaniach znajdujących się w tym samouczku. Zakładając, że masz odpowiednie poświadczenia do utworzenia nowego schematu i utworzenia nowej tabeli, możesz uruchomić te instrukcje za pomocą notesu lub bazy danych Databricks SQL. Następujący kod nie jest przeznaczony do uruchamiania w ramach deklaratywnych potoków Lakeflow:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Przykład: Okresowe przetwarzanie migawki
Poniższy przykład pokazuje przetwarzanie typu SCD 2, które pobiera migawki tabeli przechowywanej w mycatalog.myschema.mytable
. Wyniki przetwarzania są zapisywane w tabeli o nazwie target
.
mycatalog.myschema.mytable
rekordy w znaczniku czasu 2024-01-01 00:00:00
Klawisz | Wartość |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
zapisy z oznaczeniem czasu 2024-01-01 12:00:00
Klawisz | Wartość |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
Po przetworzeniu migawek tabela docelowa zawiera następujące rekordy:
Klawisz | Wartość | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | null |
3 | a3 | 2024-01-01 12:00:00 | null |
Przykład: Historyczne przetwarzanie migawek
W poniższym przykładzie pokazano przetwarzanie typu SCD 2, które aktualizuje tabelę docelową na podstawie zdarzeń źródłowych z dwóch migawek przechowywanych w systemie magazynu w chmurze:
Migawka w timestamp
, przechowywana w /<PATH>/filename1.csv
Klawisz | Kolumna Śledzenia | KolumnaBezŚledzenia |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
Migawka w timestamp + 5
, przechowywana w /<PATH>/filename2.csv
Klawisz | Kolumna Śledzenia | KolumnaBezŚledzenia |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
4 | a4 | b4_new |
Następujący przykład kodu demonstruje przetwarzanie aktualizacji typu SCD 2 z tymi migawkami.
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.create_auto_cdc_from_snapshot_flow(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
Po przetworzeniu migawek tabela docelowa zawiera następujące rekordy:
Klawisz | Kolumna Śledzenia | KolumnaBezŚledzenia | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | null |
3 | a3 | b3 | 2 | null |
4 | a4 | b4_new | 1 | null |
Dodawanie, zmienianie lub usuwanie danych w docelowej tabeli przesyłania strumieniowego
Jeśli potok publikuje tabele w Unity Catalog, możesz użyć instrukcji języka manipulowania danymi (DML), w tym instrukcji wstawiania, aktualizowania, usuwania i scalania, aby zmodyfikować docelowe tabele przesyłania strumieniowego utworzone przez instrukcje AUTO CDC ... INTO
.
Uwaga
- Instrukcje DML modyfikujące schemat tabeli przesyłania strumieniowego nie są obsługiwane. Upewnij się, że instrukcje DML nie próbują rozwijać schematu tabeli.
- Instrukcje DML, które aktualizują tabelę strumieniową, mogą być uruchamiane tylko w udostępnionym klastrze Unity Catalog lub w usłudze SQL Warehouse przy użyciu środowiska Databricks Runtime 13.3 LTS lub nowszego.
- Ponieważ przesyłanie strumieniowe wymaga źródeł danych wyłącznie do dołączania, jeśli twoje przetwarzanie wymaga przesyłania strumieniowego z tabeli źródłowej, która zawiera zmiany (na przykład przy użyciu instrukcji DML), ustaw flagę skipChangeCommits podczas odczytu tej tabeli źródłowej. Po ustawieniu
skipChangeCommits
transakcje, które usuwają lub modyfikują rekordy w tabeli źródłowej, są ignorowane. Jeśli Twoje przetwarzanie nie wymaga tabeli streamingowej, możesz użyć widoku materializowanego (który nie ma ograniczenia tylko do appenda) jako tabeli docelowej.
Ponieważ potoki deklaratywne lakeflow używają określonej SEQUENCE BY
kolumny i propagują odpowiednie wartości sekwencjonowania do __START_AT
kolumn i __END_AT
tabeli docelowej (dla typu SCD 2), należy upewnić się, że instrukcje DML używają prawidłowych wartości dla tych kolumn, aby zachować właściwą kolejność rekordów. Zobacz Jak CDC jest implementowane za pomocą API AUTO CDC?.
Aby uzyskać więcej informacji na temat używania instrukcji DML z tabelami przesyłania strumieniowego, zobacz Dodawanie, zmienianie lub usuwanie danych w tabeli przesyłania strumieniowego.
Poniższy przykład wstawia aktywny rekord z sekwencją początkową 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
Odczytywanie strumienia danych zmian z tabeli docelowej AUTO CDC
W środowisku Databricks Runtime 15.2 lub nowszym można odczytać strumień danych zmian z tabeli strumieniowej, która jest celem zapytań AUTO CDC
lub AUTO CDC FROM SNAPSHOT
, w taki sam sposób, jak odczytywanie strumienia danych zmian z innych tabel Delta. Aby odczytać strumień danych zmian z docelowej tabeli strumieniowej, wymagane są następujące elementy:
- Docelowa tabela strumieniowa musi zostać opublikowana w Unity Catalog. Zobacz, jak korzystać z Unity Catalog z potokami deklaratywnymi Lakeflow.
- Aby odczytać zestawienie zmian danych z docelowej tabeli przesyłania strumieniowego, musisz użyć środowiska Databricks Runtime 15.2 lub nowszego. Aby odczytać zestawienie danych zmian w innym potoku, potok musi być skonfigurowany do używania środowiska Databricks Runtime 15.2 lub nowszego.
Odczytujesz strumień danych o zmianach z docelowej tabeli strumieniowej utworzonej w ramach Lakeflow Declarative Pipelines w taki sam sposób, jak strumień zmian danych z innych tabel Delta. Aby dowiedzieć się więcej o korzystaniu z funkcji strumienia danych zmiany Delta, w tym z przykładów w językach Python i SQL, zobacz Używanie strumienia danych zmiany Delta Lake na Azure Databricks.
Uwaga
Zapis kanału danych zmiany zawiera metadane identyfikujące rodzaj zdarzenia zmiany. Po zaktualizowaniu rekordu w tabeli, metadane powiązanych rekordów zmian zazwyczaj zawierają wartości _change_type
ustawione na update_preimage
oraz zdarzenia update_postimage
.
Jednak wartości _change_type
są różne, jeśli do docelowej tabeli streamingowej wprowadzane są aktualizacje obejmujące zmianę wartości klucza podstawowego. Gdy zmiany obejmują aktualizacje kluczy podstawowych, pola metadanych _change_type
są ustawione na wydarzenia insert
i delete
. Zmiany kluczy podstawowych mogą wystąpić, gdy ręczne aktualizacje są wprowadzane do jednego z pól klucza z instrukcją UPDATE
lub MERGE
lub w przypadku tabel typu SCD 2, gdy pole __start_at
zmieni się w celu odzwierciedlenia wcześniejszej wartości sekwencji początkowej.
Zapytanie AUTO CDC
określa wartości klucza podstawowego, które różnią się w przypadku przetwarzania typu SCD 1 i SCD typu 2:
- W przypadku przetwarzania typu SCD 1 i deklaratywnego interfejsu Python Lakeflow Pipelines, wartość klucza podstawowego jest zawarta w parametrze
keys
w funkcjicreate_auto_cdc_flow()
. W przypadku interfejsu SQL potoków deklaratywnych Lakeflow klucz podstawowy to kolumny zdefiniowane przez klauzulęKEYS
w instrukcjiAUTO CDC ... INTO
. - W przypadku typu SCD 2 klucz podstawowy jest parametrem
keys
lub klauzuląKEYS
oraz wartością zwracaną z operacjicoalesce(__START_AT, __END_AT)
, gdzie__START_AT
i__END_AT
są odpowiednimi kolumnami z docelowej tabeli przesyłania strumieniowego.
Pobierz dane dotyczące rekordów przetwarzanych przez kwerendę CDC potoków deklaratywnych Lakeflow
Uwaga
Następujące metryki są zbierane tylko przez zapytania AUTO CDC
, a nie przez zapytania AUTO CDC FROM SNAPSHOT
.
Następujące metryki są gromadzone przez zapytania AUTO CDC
:
-
num_upserted_rows
: Liczba wierszy wyjściowych dodanych lub zaktualizowanych do zestawu danych podczas aktualizacji. -
num_deleted_rows
: liczba istniejących wierszy wyjściowych usuniętych z zestawu danych podczas aktualizacji.
Metryka num_output_rows
, wynik dla przepływów innych niż CDC, nie jest rejestrowana dla zapytań AUTO CDC
.
Jakie obiekty danych są używane do przetwarzania CDC deklaratywnych potoków Lakeflow?
Uwaga
- Te struktury danych dotyczą tylko przetwarzania
AUTO CDC
, a nie przetwarzaniaAUTO CDC FROM SNAPSHOT
. - Te struktury danych mają zastosowanie tylko wtedy, gdy tabela docelowa zostanie opublikowana w magazynie metadanych Hive. Jeśli potok publikuje w Unity Catalog, wewnętrzne tabele wspierające są niedostępne dla użytkowników.
Po zadeklarowaniu tabeli docelowej w magazynie metadanych Hive tworzone są dwie struktury danych:
- Widok używający nazwy przypisanej do tabeli docelowej.
- Wewnętrzna tabela pomocnicza używana przez potoki deklaratywne Lakeflow do zarządzania przetwarzaniem zmian w danych (CDC). Nazwa tej tabeli powstaje przez dodanie przedrostka
__apply_changes_storage_
do nazwy tabeli docelowej.
Jeśli na przykład zadeklarujesz tabelę docelową o nazwie dlt_cdc_target
, w magazynie metadanych zostanie wyświetlony widok o nazwie dlt_cdc_target
i tabela o nazwie __apply_changes_storage_dlt_cdc_target
. Utworzenie widoku umożliwia Potokom Deklaratywnym Lakeflow filtrowanie dodatkowych informacji, takich jak nagrobki i wersje, które są wymagane do obsługi danych nieuporządkowanych. Aby zobaczyć przetworzone dane, zapytaj docelowy widok. Ponieważ schemat tabeli __apply_changes_storage_
może ulec zmianie w celu obsługi przyszłych funkcji lub ulepszeń, nie należy wykonywać zapytań dotyczących tabeli do użytku produkcyjnego. W przypadku ręcznego dodawania danych do tabeli zakłada się, że rekordy zostaną wprowadzone przed innymi zmianami, ponieważ brakuje kolumn wersji.