Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Potoki deklaratywne Lakeflow Spark upraszczają przechwytywanie zmian danych za pomocą interfejsów API AUTO CDC i AUTO CDC FROM SNAPSHOT. Interfejsy API automatyzują złożoność przetwarzania wolno zmieniających się wymiarów (SCD) typu 1 i typu 2 z pliku CDC lub migawek bazy danych. Aby dowiedzieć się więcej na temat tych pojęć, zobacz Zmienianie przechwytywania i migawek danych.
Uwaga / Notatka
Interfejsy AUTO CDC API zastępują APPLY CHANGES interfejsy API i mają tę samą składnię. API APPLY CHANGES są nadal dostępne, ale Databricks zaleca używanie API AUTO CDC zamiast nich.
Używany interfejs API zależy od źródła danych zmian:
-
AUTO CDC: użyj tej opcji, gdy źródłowa baza danych ma włączony kanał CDC.AUTO CDCprzetwarza zmiany ze strumienia danych o zmianach (CDF). Jest obsługiwany zarówno w interfejsach potoku SQL, jak i Python. -
AUTO CDC FROM SNAPSHOT: użyj tej opcji, gdy usługa CDC nie jest włączona w źródłowej bazie danych i dostępne są tylko migawki. Ten interfejs API porównuje migawki w celu określenia zmian, a następnie ich przetwarzania. Jest on obsługiwany tylko w interfejsie języka Python.
Oba interfejsy API obsługują aktualizowanie tabel przy użyciu typu SCD 1 i typu 2:
- Użyj typu SCD 1, aby bezpośrednio aktualizować rekordy. Historia nie jest zachowywana dla zaktualizowanych rekordów.
- Użyj typu SCD 2, aby zachować historię rekordów dla wszystkich aktualizacji lub aktualizacji określonego zestawu kolumn.
API AUTO CDC nie są obsługiwane przez deklaratywne potoki Apache Spark.
Aby uzyskać informacje o składni i innych odwołaniach, zobacz AUTO CDC INTO (pipelines), create_auto_cdc_flow i create_auto_cdc_from_snapshot_flow.
Uwaga / Notatka
Na tej stronie opisano sposób aktualizowania tabel w pipeline'ach na podstawie zmian w danych źródłowych. Aby dowiedzieć się, jak rejestrować i wykonywać zapytania dotyczące zmian na poziomie wiersza dla tabel delty, zobacz Use Delta Lake change data feed on Azure Databricks (Używanie zestawienia zmian usługi Delta Lake w usłudze Azure Databricks).
Requirements
Aby korzystać z interfejsów API CDC, pipeline musi być skonfigurowany do użycia
Jak działa AUTO CDC
Aby wykonać przetwarzanie CDC z użyciem AUTO CDC, utwórz tabelę strumieniową, a następnie użyj instrukcji AUTO CDC ... INTO w języku SQL lub funkcji create_auto_cdc_flow() w języku Python, aby określić źródło, klucze i sekwencjonowanie dla strumienia zmian. Aby uzyskać wyjaśnienie sposobu działania sekwencjonowania i logiki SCD, zobacz Zmienianie przechwytywania i migawek danych. Zobacz przykłady usługi AUTO CDC.
Dla inicjalnego zasilenia ze źródła ze strumieniem zmian użyj AUTO CDC z przepływem once, a następnie kontynuuj przetwarzanie strumienia zmian. Zobacz Replikowanie zewnętrznej tabeli RDBMS przy użyciu usługi AUTO CDC.
Aby uzyskać szczegółowe informacje o składni, zobacz AUTO CDC INTO (potoki) lub create_auto_cdc_flow.
Jak działa AUTO CDC FROM SNAPSHOT
AUTO CDC FROM SNAPSHOT określa zmiany w danych źródłowych, porównując migawki w porządku chronologicznym. Obsługiwany jest tylko w interfejsie potoku Pythona. Migawki można odczytywać bezpośrednio z tabeli Delta, z plików w magazynie chmurowym lub z JDBC.
Aby wykonać przetwarzanie CDC z AUTO CDC FROM SNAPSHOT, utwórz tabelę streamingu, a następnie użyj funkcji create_auto_cdc_from_snapshot_flow(), aby określić migawkę, klucze i inne argumenty. Aby uzyskać szczegółowe informacje na temat dwóch wzorców przystosowania i kiedy ich używać, zobacz Wzorce przetwarzania migawek. Zobacz przykłady AUTO CDC FROM SNAPSHOT.
Aby uzyskać szczegółowe informacje o składni, zobacz create_auto_cdc_from_snapshot_flow.
Używanie wielu kolumn do sekwencjonowania
Aby sekwencjonować według wielu kolumn (na przykład znacznik czasu i identyfikator przerwania więzi), użyj elementu , STRUCT aby je połączyć. Interfejs API najpierw sortuje według pierwszego pola, a w przypadku remisu uwzględnia drugie pole i tak dalej.
SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)
Python
sequence_by = struct("timestamp_col", "id_col")
Przykłady usługi AUTO CDC
W poniższych przykładach pokazano przetwarzanie SCD typu 1 i 2 przy użyciu źródła strumienia danych zmian. Przykładowe dane tworzą nowe rekordy użytkownika, usuwają rekordy użytkownika i aktualizują rekordy użytkowników. W przykładzie SCD Type 1 ostatnie UPDATE operacje docierają późno i są usuwane z tabeli docelowej, demonstrując obsługę zdarzeń w nieprawidłowej kolejności.
Poniżej przedstawiono rekordy wejściowe używane w tych przykładach. Te dane są tworzone przez uruchomienie zapytania w sekcji Tworzenie przykładowych danych .
| userId | nazwa | city | operacja | sequenceNum |
|---|---|---|---|---|
| 124 | Raul | Oaxaca | INSERT | 1 |
| 123 | Isabel | Monterrey | INSERT | 1 |
| 125 | Mercedes | Tijuana | INSERT | 2 |
| 126 | Lilia | Cancun | INSERT | 2 |
| 123 | null | null | USUŃ | 6 |
| 125 | Mercedes | Guadalajara | UPDATE | 6 |
| 125 | Mercedes | Mexicali | UPDATE | 5 |
| 123 | Isabel | Chihuahua | UPDATE | 5 |
Jeśli anulujesz komentarz w ostatnim wierszu w zapytaniu generowania danych przykładowych, wstawia on następujący rekord, który określa obcięcie tabeli (wyczyść tabelę) w pliku sequenceNum=3:
| userId | nazwa | city | operacja | sequenceNum |
|---|---|---|---|---|
| null | null | null | SKRACAĆ | 3 |
Uwaga / Notatka
Wszystkie poniższe przykłady obejmują opcje określania operacji DELETE i TRUNCATE , ale każdy z nich jest opcjonalny.
Tworzenie przykładowych danych
Uruchom następujące instrukcje, aby utworzyć przykładowy zestaw danych. Ten kod nie jest przeznaczony do uruchomienia w ramach definicji potoku. Uruchom to z folderu eksploracji w twoim potoku danych, zamiast z folderu przekształceń.
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Przetwarzanie aktualizacji typu SCD 1
Typ SCD 1 przechowuje tylko najnowszą wersję każdego rekordu. Poniższy przykład odczytuje z powyższego kanału danych zmian i stosuje zmiany do docelowej tabeli przesyłania strumieniowego. Opracuj deklaratywne potoki Spark w usłudze Lakeflow, aby uruchomić ten kod.
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_current")
dp.create_auto_cdc_flow(
target = "users_current",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
CREATE OR REFRESH STREAMING TABLE users_current;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_current
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Po uruchomieniu przykładu typu SCD 1 tabela docelowa zawiera następujące rekordy:
| userId | nazwa | city |
|---|---|---|
| 124 | Raul | Oaxaca |
| 125 | Mercedes | Guadalajara |
| 126 | Lilia | Cancun |
Użytkownik 123 (Isabel) został usunięty i nie jest wyświetlany. Użytkownik 125 (Mercedes) pokazuje tylko najnowsze miasto (Guadalajara), ponieważ typ SCD 1 zastępuje poprzednie wartości. Wcześniejsza wartość w UPDATE i sequenceNum=5 została porzucona, ponieważ pojawiła się późniejsza aktualizacja w sequenceNum=6.
Po uruchomieniu przykładu z odkomentowanym rekordem TRUNCATE tabela zostanie wyczyszczona pod adresem sequenceNum=3. Oznacza to, że rekordy 124 i 126 nie znajdują się w tabeli, a końcowa tabela docelowa zawiera tylko następujący rekord:
| userId | nazwa | city |
|---|---|---|
| 125 | Mercedes | Guadalajara |
Przetwarzanie aktualizacji typu SCD 2
Typ SCD 2 zachowuje pełną historię zmian, tworząc nowe wiersze dla każdej wersji rekordu z kolumnami __START_AT i __END_AT wskazującymi, kiedy każda wersja była aktywna.
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Po uruchomieniu przykładu typu SCD 2 tabela docelowa zawiera następujące rekordy:
| userId | nazwa | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Monterrey | 1 | 5 |
| 123 | Isabel | Chihuahua | 5 | 6 |
| 124 | Raul | Oaxaca | 1 | null |
| 125 | Mercedes | Tijuana | 2 | 5 |
| 125 | Mercedes | Mexicali | 5 | 6 |
| 125 | Mercedes | Guadalajara | 6 | null |
| 126 | Lilia | Cancun | 2 | null |
Tabela zachowuje pełną historię danych. Użytkownik 123 ma dwie wersje (zakończone sekwencją 6 po usunięciu). Użytkownik 125 ma trzy wersje pokazujące zmiany miasta. Rekordy z __END_AT = null są obecnie aktywne.
Śledzenie podzestawu kolumn przy użyciu typu SCD 2
Domyślnie typ SCD 2 tworzy nową wersję za każdym razem, gdy każda wartość kolumny ulegnie zmianie. Możesz określić podzbiór kolumn do śledzenia, aby zmiany w innych kolumnach aktualizowały bieżącą wersję zamiast generowania nowego rekordu historii.
Poniższy przykład wyklucza kolumnę city ze śledzenia historii:
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Ponieważ city zmiany nie są śledzone, aktualizacje miejskie zastępują obecny wiersz zamiast tworzyć nową wersję. Tabela docelowa zawiera następujące rekordy:
| userId | nazwa | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Chihuahua | 1 | 6 |
| 124 | Raul | Oaxaca | 1 | null |
| 125 | Mercedes | Guadalajara | 2 | null |
| 126 | Lilia | Cancun | 2 | null |
Przykłady automatycznego CDC z migawki
W poniższych sekcjach przedstawiono przykłady użycia AUTO CDC FROM SNAPSHOT do przetwarzania obrazów danych w tabele docelowe typu SCD 1 lub SCD 2. Aby uzyskać informacje na temat tego, kiedy używać tego interfejsu API, zobacz Zmienianie przechwytywania i migawek danych.
Przykład: przetwarzanie migawek przy użyciu czasu pozyskiwania potoku
Użyj tego podejścia, gdy migawki są regularnie dostarczane i uporządkowane, a także można polegać na sygnaturze czasowej wykonania potoku do wersjonowania. Nowa migawka jest pozyskiwana wraz z każdą aktualizacją potoku.
Migawki można odczytywać z wielu typów źródeł, w tym z tabel Delta, plików w chmurze i połączeń JDBC.
Krok 1. Tworzenie przykładowych danych
Utwórz tabelę zawierającą dane migawki. Uruchom następujący kod z notatnika lub usługi Databricks SQL w folderze explorations Twojego potoku:
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.snapshot (
userId INT,
city STRING
);
INSERT INTO main.cdc_tutorial.snapshot VALUES
(1, 'Oaxaca'),
(2, 'Monterrey'),
(3, 'Tijuana');
Krok 2. Uruchom automatyczny CDC z migawki
Aby uruchomić kod w tym kroku, twórz deklaratywne potoki Spark na platformie Lakeflow.
Wybierz typ źródła dla widoku migawki (przykładowy kod tworzenia generuje Tabelę Delta):
Opcja A: Odczyt z tabeli delty
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("main.cdc_tutorial.snapshot")
Opcja B: odczyt z magazynu w chmurze
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.format("csv").option("header", True).load("<snapshot-path>")
Opcja C: Odczyt z JDBC (tylko obliczenia klasyczne)
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
Wszystkie opcje, zapis do obiektu docelowego
Następnie dodaj tabelę docelową i przepływ pracy:
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = "source",
keys = ["userId"],
stored_as_scd_type = 2
)
Po pierwszym uruchomieniu procesu potokowego wszystkie rekordy są wstawiane jako aktywne zapisy.
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | null |
| 2 | Monterrey | 0 | null |
| 3 | Tijuana | 0 | null |
Uwaga / Notatka
Aby zamiast tego użyć typu SCD 1 i zachować tylko bieżący stan, ustaw wartość stored_as_scd_type=1. W tym przypadku tabela docelowa nie zawiera kolumn __START_AT i __END_AT.
Krok 3: Zasymuluj nową migawkę i uruchom ponownie
Zaktualizuj tabelę źródłową, aby zasymulować przybycie nowej momentki (uruchom ten kod z notebooka lub pliku SQL w folderze explorations swojej pipeline):
TRUNCATE TABLE main.cdc_tutorial.snapshot;
INSERT INTO main.cdc_tutorial.snapshot VALUES
(2, 'Carmel'),
(3, 'Los Angeles'),
(4, 'Death Valley'),
(6, 'Kings Canyon');
Uruchom ponownie potok.
AUTO CDC FROM SNAPSHOT Porównuje nową migawkę z poprzednią i wykrywa, że użytkownik 1 został usunięty, użytkownicy 2 i 3 zostały zaktualizowane, a użytkownicy 4 i 6 zostały wstawione. To generuje strumień zmian, i używa AUTO CDC do utworzenia tabeli wyjściowej.
Po drugim uruchomieniu z typem SCD 2 tabela docelowa zawiera następujące rekordy:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | 1 |
| 2 | Monterrey | 0 | 1 |
| 2 | Carmel | 1 | null |
| 3 | Tijuana | 0 | 1 |
| 3 | Los Angeles | 1 | null |
| 4 | Dolina Śmierci | 1 | null |
| 6 | Kanion Królów | 1 | null |
Użytkownik 1 został zakończony (usunięty). Użytkownicy 2 i 3 mają dwie wersje pokazujące zmiany w mieście. Nowo wstawiono użytkowników 4 i 6.
Po drugim uruchomieniu z typem SCD 1 tabela docelowa zawiera tylko bieżący stan:
| userId | city |
|---|---|
| 2 | Carmel |
| 3 | Los Angeles |
| 4 | Dolina Śmierci |
| 6 | Kanion Królów |
Przykład: przetwarzanie migawek przy użyciu funkcji wersji
Użyj tej metody, jeśli potrzebujesz jawnej kontroli nad kolejnością migawek. Na przykład użyj tego podejścia, gdy wiele migawek przychodzi w tym samym czasie, lub gdy migawki przychodzą w nieprawidłowej kolejności. Piszesz funkcję określającą, która migawka ma być przetwarzana jako następna oraz jej numer wersji. Interfejs API przetwarza migawki w kolejności rosnącej wersji:
- Gdy wiele migawek jest przechowywanych, wszystkie są przetwarzane w kolejności.
- Jeśli migawka pojawi się w niewłaściwej kolejności (na przykład
snapshot_3pojawi się posnapshot_4), zostanie pominięta. - Jeśli nie ma żadnych nowych migawek, funkcja zwraca
None, a przetwarzanie nie odbywa się.
Krok 1. Przygotowywanie plików migawek
Utwórz pliki CSV zawierające dane migawek i dodaj je do lokalizacji zasobu danych lub przechowywania w chmurze. Nazwij pliki chronologicznie (na przykład snapshot_1.csv, snapshot_2.csv).
Każdy plik powinien zawierać kolumny dla userId i city. Przykład:
snapshot_1.csv:
| userId | city |
|---|---|
| 1 | Oaxaca |
| 2 | Monterrey |
| 3 | Tijuana |
snapshot_2.csv:
| userId | city |
|---|---|
| 2 | Carmel |
| 3 | Los Angeles |
| 4 | Dolina Śmierci |
Krok 2: Uruchom AUTO CDC FROM SNAPSHOT z funkcją wersji.
Utwórz nowy notes i wklej następujący kod potoku. Następnie opracuj deklaratywne potoki Lakeflow Spark.
from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data
files = dbutils.fs.ls(snapshot_dir)
snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]
snapshot_versions = []
for filename in snapshot_files:
try:
version = int(filename.replace("snapshot_", "").replace(".csv", ""))
snapshot_versions.append(version)
except ValueError:
continue
snapshot_versions.sort()
if latest_snapshot_version is None:
if snapshot_versions:
next_version = snapshot_versions[0]
else:
return None
else:
next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
if next_versions:
next_version = next_versions[0]
else:
return None
snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
df = spark.read.format("csv").option("header", True).load(snapshot_path)
return (df, next_version)
dp.create_streaming_table("main.cdc_tutorial.target_versioned")
dp.create_auto_cdc_from_snapshot_flow(
target = "main.cdc_tutorial.target_versioned",
source = next_snapshot_and_version,
keys = ["userId"],
stored_as_scd_type = 2
)
Uwaga / Notatka
Aby zamiast tego użyć typu SCD 1, ustaw wartość stored_as_scd_type=1.
Po przetworzeniu snapshot_1.csvtabela docelowa zawiera następujące rekordy:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | null |
| 2 | Monterrey | 1 | null |
| 3 | Tijuana | 1 | null |
Po przetworzeniu snapshot_2.csvtabela docelowa zawiera następujące rekordy:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | 2 |
| 2 | Monterrey | 1 | 2 |
| 2 | Carmel | 2 | null |
| 3 | Tijuana | 1 | 2 |
| 3 | Los Angeles | 2 | null |
| 4 | Dolina Śmierci | 2 | null |
Uwaga / Notatka
Pamiętaj, że w przypadku typu SCD 1 tabela wygląda dokładnie tak jak najnowsza migawka. Różnica polega na tym, że zapytania podrzędne mogą używać zestawienia zmian tylko do przetwarzania zmienionych rekordów.
Krok 3: Dodaj nowe migawki
Dodaj nowy plik CSV do miejsca przechowywania z danymi, które zostały zmodyfikowane (na przykład ze zmienionymi wartościami miast, nowymi wierszami lub usuniętymi wierszami). Następnie ponownie uruchom potok, aby przetworzyć nową migawkę.
Ograniczenia
- Kolumna sekwencjonowania musi być sortowalnym typem danych.
NULLwartości sekwencjonowania nie są obsługiwane. -
AUTO CDC FROM SNAPSHOTjest obsługiwany tylko w interfejsie potoku języka Python; interfejs SQL nie jest obsługiwany.
Dodatkowe zasoby
- Przechwytywanie danych i migawki: zapoznaj się z koncepcjami CDC, migawkami oraz typami SCD.
-
Replikuj zewnętrzną tabelę RDBMS za pomocą
AUTO CDC: Dowiedz się, jak przeprowadzić początkowe załadowanie danych w przepływieonce, a potem kontynuować przetwarzanie zmian. - Zaawansowane tematy usługi AUTO CDC: Dowiedz się więcej o operacjach zmian w obiektach docelowych usługi AUTO CDC, odczytywaniu źródeł danych zmian i przetwarzaniu metryk.
- Samouczek: zbudować potok ETL przy użyciu wychwytywania zmian danych