Udostępnij za pośrednictwem


Interfejsy API usługi AUTO CDC: upraszczanie przechwytywania zmian przy użyciu potoków deklaratywnych usługi Lakeflow

Potoki Deklaratywne Lakeflow upraszczają przechwytywanie zmian danych (CDC) za pomocą interfejsów API AUTO CDC i AUTO CDC FROM SNAPSHOT.

Uwaga

Interfejsy AUTO CDC API zastępują APPLY CHANGES interfejsy API i mają tę samą składnię. Interfejsy APPLY CHANGES API są nadal dostępne, ale usługa Databricks zaleca korzystanie z AUTO CDC interfejsów API w ich miejscu.

Interfejs, którego używasz, zależy od źródła danych o zmianach.

  • Użyj AUTO CDC do przetwarzania zmian z kanału danych zmian (CDF).
  • Użyj AUTO CDC FROM SNAPSHOT (publiczna wersja zapoznawcza i dostępna tylko dla języka Python), aby przetworzyć zmiany w migawkach bazy danych.

Wcześniej polecenie MERGE INTO było często używane do przetwarzania rekordów CDC na platformie Azure Databricks. Jednak MERGE INTO może generować niepoprawne wyniki z powodu nieuporządkowanych rekordów lub wymaga skomplikowanej logiki do ponownego uporządkowania rekordów.

Interfejs AUTO CDC API jest obsługiwany w interfejsach SQL i Python potoków deklaratywnych Lakeflow. Interfejs API jest obsługiwany w interfejsie języka Python potoków deklaratywnych Lakeflow.

Zarówno AUTO CDC, jak i AUTO CDC FROM SNAPSHOT obsługują aktualizowanie tabel przy użyciu typu SCD 1 i typu 2:

  • Użyj typu SCD 1, aby bezpośrednio aktualizować rekordy. Historia nie jest zachowywana dla zaktualizowanych rekordów.
  • Użyj typu SCD 2, aby zachować historię rekordów we wszystkich aktualizacjach lub aktualizacjach określonego zestawu kolumn.

Aby uzyskać informacje o składni i innych odwołaniach, zobacz AUTO CDC for Lakeflow Deklaratywne potoki SQL, AUTO CDC for Lakeflow Deklaratywne potoki Python i AUTO CDC FROM SNAPSHOT for Lakeflow Deklaratywne potoki Python.

Uwaga

W tym artykule opisano, jak aktualizować tabele w Lakeflow Declarative Pipelines na podstawie zmian danych źródłowych. Aby dowiedzieć się, jak rejestrować i zapytaniać o zmiany na poziomie wiersza dla tabel Delta, zobacz Use Delta Lake change data feed on Azure Databricks.

Wymagania

Aby korzystać z interfejsów API CDC, potok musi być skonfigurowany do używania potoków deklaratywnych bezserwerowych Lakeflow lub potoków deklaratywnych Pro Lakeflow lub Advancededycji.

W jaki sposób CDC jest implementowane za pomocą interfejsu API AUTO CDC?

Dzięki automatycznej obsłudze rekordów poza sekwencją, API AUTO CDC w Lakeflow Declarative Pipelines zapewnia prawidłowe przetwarzanie rekordów CDC i eliminuje konieczność opracowywania złożonej logiki do obsługi rekordów poza sekwencją. Należy określić kolumnę w danych źródłowych, na której mają być sekwencjonowane rekordy, co deklaratywne potoki Lakeflow interpretują jako monotonicznie rosnącą reprezentację prawidłowego porządku danych źródłowych. Lakeflow Declarative Pipelines automatycznie obsługują dane przychodzące w niewłaściwej kolejności. W przypadku zmian typu SCD 2, potoki deklaratywne Lakeflow propagują odpowiednie wartości sekwencjonowania do kolumn __START_AT i __END_AT tabeli docelowej. Powinna istnieć jedna odrębna aktualizacja dla każdego klucza przy każdym wartości sekwencji, a wartości sekwencji NULL nie są obsługiwane.

Aby wykonać przetwarzanie CDC za pomocą AUTO CDC, należy najpierw utworzyć tabelę streamingową, a następnie użyć instrukcji AUTO CDC ... INTO w języku SQL lub funkcji create_auto_cdc_flow() w języku Python, aby określić źródło, klucze i sekwencjonowanie dla strumienia zmian. Aby utworzyć docelową tabelę przesyłania strumieniowego, użyj instrukcji CREATE OR REFRESH STREAMING TABLE w języku SQL lub funkcji create_streaming_table() w języku Python. Zobacz przykłady przetwarzania typów SCD 1 i 2.

Aby uzyskać szczegółowe informacje o składni, zobacz dokumentację SQL potoków deklaratywnych lakeflow lub dokumentację języka Python.

Jak jest zaimplementowane CDC w<|> API?

Ważne

Interfejs API AUTO CDC FROM SNAPSHOT znajduje się w fazie Public Preview dla użytkowników publicznych.

AUTO CDC FROM SNAPSHOT to deklaratywne API, które wydajnie określa zmiany w danych źródłowych, porównując serię uporządkowanych migawk, a następnie uruchamia przetwarzanie wymagane do przetwarzania zmian danych (CDC) zapisów w migawkach. AUTO CDC FROM SNAPSHOT jest obsługiwany wyłącznie przez interfejs Python Lakeflow do potoków deklaratywnych.

AUTO CDC FROM SNAPSHOT obsługuje pobieranie migawkowych obrazów z wielu typów źródeł.

  • Użyj okresowego pozyskiwania migawek, aby pobrać migawki z istniejącej tabeli lub widoku. AUTO CDC FROM SNAPSHOT ma prosty, usprawniony interfejs, aby wspierać okresowe pobieranie migawek z istniejącego obiektu bazy danych. Nowa migawka jest wczytywana z każdą aktualizacją potoku, a czas wczytywania jest używany jako wersja migawki. Gdy potok jest uruchamiany w trybie ciągłym, podczas każdej aktualizacji potoku pobieranych jest wiele migawek w okresach określonych przez ustawienie interwału wyzwalacza dla przepływu obejmującego AUTO CDC FROM SNAPSHOT przetwarzanie.
  • Użyj historycznego pobierania migawek do przetwarzania plików zawierających migawki bazy danych, takich jak migawki wygenerowane z bazy danych Oracle lub MySQL albo magazynu danych.

Aby wykonać przetwarzanie CDC z dowolnego typu źródła z użyciem AUTO CDC FROM SNAPSHOT, należy najpierw utworzyć tabelę strumieniową, a następnie użyć funkcji create_auto_cdc_from_snapshot_flow() w Pythonie, aby określić migawkę, klucze i inne argumenty wymagane do zaimplementowania przetwarzania. Zobacz przykłady okresowego pobierania migawki i historycznego pobierania migawki.

Migawki przekazywane do API muszą być uporządkowane rosnąco według wersji. Jeśli Potoki Deklaratywne Lakeflow wykryją migawkę poza kolejnością, wystąpi błąd.

Aby uzyskać szczegółowe informacje o składni, zobacz odniesienie dotyczące języka Python dla Deklaratywnych Potoków Lakeflow.

Użyj wielu kolumn do sekwencjonowania

Możesz ustawić sekwencję według wielu kolumn (na przykład znacznik czasu i identyfikator, aby rozstrzygnąć remisy), możesz użyć struktury STRUCT do ich połączenia: najpierw sortuje według pierwszego pola w STRUCT, a w przypadku remisu bierze pod uwagę drugie pole i tak dalej.

Przykład w języku SQL:

SEQUENCE BY STRUCT(timestamp_col, id_col)

Przykład w języku Python:

sequence_by = struct("timestamp_col", "id_col")

Ograniczenia

Kolumna używana do sekwencjonowania musi być sortowalnym typem danych.

Przykład: przetwarzanie typ 1 SCD i typ 2 SCD z danymi źródłowymi CDF

W poniższych sekcjach przedstawiono przykłady potoków deklaratywnych Lakeflow typu 1 i typu 2, zapytań aktualizujących tabele docelowe na podstawie zdarzeń źródłowych z kanału danych zmiany, które:

  1. Tworzy nowe rekordy użytkowników.
  2. Usuwa rekord użytkownika.
  3. Aktualizuje rekordy użytkowników. W przykładzie SCD typu 1, ostatnie operacje UPDATE docierają późno i są usuwane z tabeli docelowej, co pokazuje obsługę zdarzeń poza kolejnością.

Poniższe przykłady zakładają, że masz świadomość, jak konfigurować i aktualizować deklaratywne potoki Lakeflow. Zobacz Samouczek: tworzenie potoku ETL przy użyciu przechwytywania zmian danych za pomocą potoków deklaratywnych lakeflow.

Aby uruchomić te przykłady, musisz zacząć od utworzenia przykładowego zestawu danych. Zobacz Generowanie danych testowych.

Poniżej przedstawiono rekordy wejściowe dla tych przykładów:

Identyfikator użytkownika nazwa miejscowość operacja numer sekwencji
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lilia Cancun INSERT 2
123 null null USUŃ 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Jeśli odkomentujesz ostatni wiersz w przykładowych danych, zostanie dodany następujący rekord, który określa, w którym miejscu rekordy powinny zostać obcięte.

Identyfikator użytkownika nazwa miejscowość operacja numer sekwencji
null null null SKRÓCIĆ 3

Uwaga

Wszystkie poniższe przykłady zawierają opcje do określenia zarówno operacji DELETE, jak i TRUNCATE, ale każda z nich jest opcjonalna.

Przetwarzanie aktualizacji SCD typu 1

Następujący przykład demonstruje przetwarzanie aktualizacji typu SCD 1:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flowname AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Po uruchomieniu przykładu typu SCD 1 tabela docelowa zawiera następujące rekordy:

Identyfikator użytkownika nazwa miejscowość
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lilia Cancun

Po uruchomieniu przykładu typu SCD 1 z dodatkowym rekordem TRUNCATE rekordy 124 i 126 są obcinane z powodu operacji TRUNCATE w sequenceNum=3, a tabela docelowa zawiera następujący rekord:

Identyfikator użytkownika nazwa miejscowość
125 Mercedes Guadalajara

Przetwarzanie aktualizacji typu 2 SCD

Przykład poniżej demonstruje przetwarzanie aktualizacji typu 2 SCD:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW target_flow
AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Po uruchomieniu przykładowego typu SCD 2 tabela docelowa zawiera następujące rekordy:

Identyfikator użytkownika nazwa miejscowość __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 null
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lilia Cancun 2 null

Zapytanie typu SCD 2 może również określać podzestaw kolumn wyjściowych do śledzenia historii w tabeli docelowej. Zmiany w innych kolumnach są aktualizowane bezpośrednio, zamiast generować nowe rekordy historii. W poniższym przykładzie pokazano wykluczenie kolumny city ze śledzenia:

Poniższy przykład demonstruje użycie historii śledzenia z typem 2 SCD:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW target_flow
AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Po uruchomieniu tego przykładu bez dodatkowego rekordu TRUNCATE tabela docelowa zawiera następujące rekordy:

Identyfikator użytkownika nazwa miejscowość __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 null
125 Mercedes Guadalajara 2 null
126 Lilia Cancun 2 null

Generowanie danych testowych

Poniższy kod służy do generowania przykładowego zestawu danych do użycia w przykładowych zapytaniach znajdujących się w tym samouczku. Zakładając, że masz odpowiednie poświadczenia do utworzenia nowego schematu i utworzenia nowej tabeli, możesz uruchomić te instrukcje za pomocą notesu lub bazy danych Databricks SQL. Następujący kod nie jest przeznaczony do uruchamiania w ramach deklaratywnych potoków Lakeflow:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Przykład: Okresowe przetwarzanie migawki

Poniższy przykład pokazuje przetwarzanie typu SCD 2, które pobiera migawki tabeli przechowywanej w mycatalog.myschema.mytable. Wyniki przetwarzania są zapisywane w tabeli o nazwie target.

mycatalog.myschema.mytable rekordy w znaczniku czasu 2024-01-01 00:00:00

Klawisz Wartość
1 a1
2 a2

mycatalog.myschema.mytable zapisy z oznaczeniem czasu 2024-01-01 12:00:00

Klawisz Wartość
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_from_snapshot_flow(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

Po przetworzeniu migawek tabela docelowa zawiera następujące rekordy:

Klawisz Wartość __START_AT __END_AT
1 a1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 a2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 null
3 a3 2024-01-01 12:00:00 null

Przykład: Historyczne przetwarzanie migawek

W poniższym przykładzie pokazano przetwarzanie typu SCD 2, które aktualizuje tabelę docelową na podstawie zdarzeń źródłowych z dwóch migawek przechowywanych w systemie magazynu w chmurze:

Migawka w timestamp, przechowywana w /<PATH>/filename1.csv

Klawisz Kolumna Śledzenia KolumnaBezŚledzenia
1 a1 b1
2 a2 b2
4 a4 b4

Migawka w timestamp + 5, przechowywana w /<PATH>/filename2.csv

Klawisz Kolumna Śledzenia KolumnaBezŚledzenia
2 a2_new b2
3 a3 b3
4 a4 b4_new

Następujący przykład kodu demonstruje przetwarzanie aktualizacji typu SCD 2 z tymi migawkami.

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.create_auto_cdc_from_snapshot_flow(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

Po przetworzeniu migawek tabela docelowa zawiera następujące rekordy:

Klawisz Kolumna Śledzenia KolumnaBezŚledzenia __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 null
3 a3 b3 2 null
4 a4 b4_new 1 null

Dodawanie, zmienianie lub usuwanie danych w docelowej tabeli przesyłania strumieniowego

Jeśli potok publikuje tabele w Unity Catalog, możesz użyć instrukcji języka manipulowania danymi (DML), w tym instrukcji wstawiania, aktualizowania, usuwania i scalania, aby zmodyfikować docelowe tabele przesyłania strumieniowego utworzone przez instrukcje AUTO CDC ... INTO.

Uwaga

  • Instrukcje DML modyfikujące schemat tabeli przesyłania strumieniowego nie są obsługiwane. Upewnij się, że instrukcje DML nie próbują rozwijać schematu tabeli.
  • Instrukcje DML, które aktualizują tabelę strumieniową, mogą być uruchamiane tylko w udostępnionym klastrze Unity Catalog lub w usłudze SQL Warehouse przy użyciu środowiska Databricks Runtime 13.3 LTS lub nowszego.
  • Ponieważ przesyłanie strumieniowe wymaga źródeł danych wyłącznie do dołączania, jeśli twoje przetwarzanie wymaga przesyłania strumieniowego z tabeli źródłowej, która zawiera zmiany (na przykład przy użyciu instrukcji DML), ustaw flagę skipChangeCommits podczas odczytu tej tabeli źródłowej. Po ustawieniu skipChangeCommits transakcje, które usuwają lub modyfikują rekordy w tabeli źródłowej, są ignorowane. Jeśli Twoje przetwarzanie nie wymaga tabeli streamingowej, możesz użyć widoku materializowanego (który nie ma ograniczenia tylko do appenda) jako tabeli docelowej.

Ponieważ potoki deklaratywne lakeflow używają określonej SEQUENCE BY kolumny i propagują odpowiednie wartości sekwencjonowania do __START_AT kolumn i __END_AT tabeli docelowej (dla typu SCD 2), należy upewnić się, że instrukcje DML używają prawidłowych wartości dla tych kolumn, aby zachować właściwą kolejność rekordów. Zobacz Jak CDC jest implementowane za pomocą API AUTO CDC?.

Aby uzyskać więcej informacji na temat używania instrukcji DML z tabelami przesyłania strumieniowego, zobacz Dodawanie, zmienianie lub usuwanie danych w tabeli przesyłania strumieniowego.

Poniższy przykład wstawia aktywny rekord z sekwencją początkową 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Odczytywanie strumienia danych zmian z tabeli docelowej AUTO CDC

W środowisku Databricks Runtime 15.2 lub nowszym można odczytać strumień danych zmian z tabeli strumieniowej, która jest celem zapytań AUTO CDC lub AUTO CDC FROM SNAPSHOT, w taki sam sposób, jak odczytywanie strumienia danych zmian z innych tabel Delta. Aby odczytać strumień danych zmian z docelowej tabeli strumieniowej, wymagane są następujące elementy:

  • Docelowa tabela strumieniowa musi zostać opublikowana w Unity Catalog. Zobacz, jak korzystać z Unity Catalog z potokami deklaratywnymi Lakeflow.
  • Aby odczytać zestawienie zmian danych z docelowej tabeli przesyłania strumieniowego, musisz użyć środowiska Databricks Runtime 15.2 lub nowszego. Aby odczytać zestawienie danych zmian w innym potoku, potok musi być skonfigurowany do używania środowiska Databricks Runtime 15.2 lub nowszego.

Odczytujesz strumień danych o zmianach z docelowej tabeli strumieniowej utworzonej w ramach Lakeflow Declarative Pipelines w taki sam sposób, jak strumień zmian danych z innych tabel Delta. Aby dowiedzieć się więcej o korzystaniu z funkcji strumienia danych zmiany Delta, w tym z przykładów w językach Python i SQL, zobacz Używanie strumienia danych zmiany Delta Lake na Azure Databricks.

Uwaga

Zapis kanału danych zmiany zawiera metadane identyfikujące rodzaj zdarzenia zmiany. Po zaktualizowaniu rekordu w tabeli, metadane powiązanych rekordów zmian zazwyczaj zawierają wartości _change_type ustawione na update_preimage oraz zdarzenia update_postimage.

Jednak wartości _change_type są różne, jeśli do docelowej tabeli streamingowej wprowadzane są aktualizacje obejmujące zmianę wartości klucza podstawowego. Gdy zmiany obejmują aktualizacje kluczy podstawowych, pola metadanych _change_type są ustawione na wydarzenia insert i delete. Zmiany kluczy podstawowych mogą wystąpić, gdy ręczne aktualizacje są wprowadzane do jednego z pól klucza z instrukcją UPDATE lub MERGE lub w przypadku tabel typu SCD 2, gdy pole __start_at zmieni się w celu odzwierciedlenia wcześniejszej wartości sekwencji początkowej.

Zapytanie AUTO CDC określa wartości klucza podstawowego, które różnią się w przypadku przetwarzania typu SCD 1 i SCD typu 2:

  • W przypadku przetwarzania typu SCD 1 i deklaratywnego interfejsu Python Lakeflow Pipelines, wartość klucza podstawowego jest zawarta w parametrze keys w funkcji create_auto_cdc_flow(). W przypadku interfejsu SQL potoków deklaratywnych Lakeflow klucz podstawowy to kolumny zdefiniowane przez klauzulę KEYS w instrukcji AUTO CDC ... INTO.
  • W przypadku typu SCD 2 klucz podstawowy jest parametrem keys lub klauzulą KEYS oraz wartością zwracaną z operacji coalesce(__START_AT, __END_AT), gdzie __START_AT i __END_AT są odpowiednimi kolumnami z docelowej tabeli przesyłania strumieniowego.

Pobierz dane dotyczące rekordów przetwarzanych przez kwerendę CDC potoków deklaratywnych Lakeflow

Uwaga

Następujące metryki są zbierane tylko przez zapytania AUTO CDC, a nie przez zapytania AUTO CDC FROM SNAPSHOT.

Następujące metryki są gromadzone przez zapytania AUTO CDC:

  • num_upserted_rows: Liczba wierszy wyjściowych dodanych lub zaktualizowanych do zestawu danych podczas aktualizacji.
  • num_deleted_rows: liczba istniejących wierszy wyjściowych usuniętych z zestawu danych podczas aktualizacji.

Metryka num_output_rows, wynik dla przepływów innych niż CDC, nie jest rejestrowana dla zapytań AUTO CDC.

Jakie obiekty danych są używane do przetwarzania CDC deklaratywnych potoków Lakeflow?

Uwaga

  • Te struktury danych dotyczą tylko przetwarzania AUTO CDC, a nie przetwarzania AUTO CDC FROM SNAPSHOT.
  • Te struktury danych mają zastosowanie tylko wtedy, gdy tabela docelowa zostanie opublikowana w magazynie metadanych Hive. Jeśli potok publikuje w Unity Catalog, wewnętrzne tabele wspierające są niedostępne dla użytkowników.

Po zadeklarowaniu tabeli docelowej w magazynie metadanych Hive tworzone są dwie struktury danych:

  • Widok używający nazwy przypisanej do tabeli docelowej.
  • Wewnętrzna tabela pomocnicza używana przez potoki deklaratywne Lakeflow do zarządzania przetwarzaniem zmian w danych (CDC). Nazwa tej tabeli powstaje przez dodanie przedrostka __apply_changes_storage_ do nazwy tabeli docelowej.

Jeśli na przykład zadeklarujesz tabelę docelową o nazwie dlt_cdc_target, w magazynie metadanych zostanie wyświetlony widok o nazwie dlt_cdc_target i tabela o nazwie __apply_changes_storage_dlt_cdc_target. Utworzenie widoku umożliwia Potokom Deklaratywnym Lakeflow filtrowanie dodatkowych informacji, takich jak nagrobki i wersje, które są wymagane do obsługi danych nieuporządkowanych. Aby zobaczyć przetworzone dane, zapytaj docelowy widok. Ponieważ schemat tabeli __apply_changes_storage_ może ulec zmianie w celu obsługi przyszłych funkcji lub ulepszeń, nie należy wykonywać zapytań dotyczących tabeli do użytku produkcyjnego. W przypadku ręcznego dodawania danych do tabeli zakłada się, że rekordy zostaną wprowadzone przed innymi zmianami, ponieważ brakuje kolumn wersji.

Dodatkowe zasoby