Udostępnij za pośrednictwem


Interfejsy API AUTO CDC upraszczają przechwytywanie danych zmian za pomocą potoków.

Potoki deklaratywne Lakeflow Spark upraszczają przechwytywanie zmian danych za pomocą interfejsów API AUTO CDC i AUTO CDC FROM SNAPSHOT. Interfejsy API automatyzują złożoność przetwarzania wolno zmieniających się wymiarów (SCD) typu 1 i typu 2 z pliku CDC lub migawek bazy danych. Aby dowiedzieć się więcej na temat tych pojęć, zobacz Zmienianie przechwytywania i migawek danych.

Uwaga / Notatka

Interfejsy AUTO CDC API zastępują APPLY CHANGES interfejsy API i mają tę samą składnię. API APPLY CHANGES są nadal dostępne, ale Databricks zaleca używanie API AUTO CDC zamiast nich.

Używany interfejs API zależy od źródła danych zmian:

  • AUTO CDC: użyj tej opcji, gdy źródłowa baza danych ma włączony kanał CDC. AUTO CDC przetwarza zmiany ze strumienia danych o zmianach (CDF). Jest obsługiwany zarówno w interfejsach potoku SQL, jak i Python.
  • AUTO CDC FROM SNAPSHOT: użyj tej opcji, gdy usługa CDC nie jest włączona w źródłowej bazie danych i dostępne są tylko migawki. Ten interfejs API porównuje migawki w celu określenia zmian, a następnie ich przetwarzania. Jest on obsługiwany tylko w interfejsie języka Python.

Oba interfejsy API obsługują aktualizowanie tabel przy użyciu typu SCD 1 i typu 2:

  • Użyj typu SCD 1, aby bezpośrednio aktualizować rekordy. Historia nie jest zachowywana dla zaktualizowanych rekordów.
  • Użyj typu SCD 2, aby zachować historię rekordów dla wszystkich aktualizacji lub aktualizacji określonego zestawu kolumn.

API AUTO CDC nie są obsługiwane przez deklaratywne potoki Apache Spark.

Aby uzyskać informacje o składni i innych odwołaniach, zobacz AUTO CDC INTO (pipelines), create_auto_cdc_flow i create_auto_cdc_from_snapshot_flow.

Uwaga / Notatka

Na tej stronie opisano sposób aktualizowania tabel w pipeline'ach na podstawie zmian w danych źródłowych. Aby dowiedzieć się, jak rejestrować i wykonywać zapytania dotyczące zmian na poziomie wiersza dla tabel delty, zobacz Use Delta Lake change data feed on Azure Databricks (Używanie zestawienia zmian usługi Delta Lake w usłudze Azure Databricks).

Requirements

Aby korzystać z interfejsów API CDC, pipeline musi być skonfigurowany do użycia bezserwerowego SDP lub edycji SDP lub .

Jak działa AUTO CDC

Aby wykonać przetwarzanie CDC z użyciem AUTO CDC, utwórz tabelę strumieniową, a następnie użyj instrukcji AUTO CDC ... INTO w języku SQL lub funkcji create_auto_cdc_flow() w języku Python, aby określić źródło, klucze i sekwencjonowanie dla strumienia zmian. Aby uzyskać wyjaśnienie sposobu działania sekwencjonowania i logiki SCD, zobacz Zmienianie przechwytywania i migawek danych. Zobacz przykłady usługi AUTO CDC.

Dla inicjalnego zasilenia ze źródła ze strumieniem zmian użyj AUTO CDC z przepływem once, a następnie kontynuuj przetwarzanie strumienia zmian. Zobacz Replikowanie zewnętrznej tabeli RDBMS przy użyciu usługi AUTO CDC.

Aby uzyskać szczegółowe informacje o składni, zobacz AUTO CDC INTO (potoki) lub create_auto_cdc_flow.

Jak działa AUTO CDC FROM SNAPSHOT

AUTO CDC FROM SNAPSHOT określa zmiany w danych źródłowych, porównując migawki w porządku chronologicznym. Obsługiwany jest tylko w interfejsie potoku Pythona. Migawki można odczytywać bezpośrednio z tabeli Delta, z plików w magazynie chmurowym lub z JDBC.

Aby wykonać przetwarzanie CDC z AUTO CDC FROM SNAPSHOT, utwórz tabelę streamingu, a następnie użyj funkcji create_auto_cdc_from_snapshot_flow(), aby określić migawkę, klucze i inne argumenty. Aby uzyskać szczegółowe informacje na temat dwóch wzorców przystosowania i kiedy ich używać, zobacz Wzorce przetwarzania migawek. Zobacz przykłady AUTO CDC FROM SNAPSHOT.

Aby uzyskać szczegółowe informacje o składni, zobacz create_auto_cdc_from_snapshot_flow.

Używanie wielu kolumn do sekwencjonowania

Aby sekwencjonować według wielu kolumn (na przykład znacznik czasu i identyfikator przerwania więzi), użyj elementu , STRUCT aby je połączyć. Interfejs API najpierw sortuje według pierwszego pola, a w przypadku remisu uwzględnia drugie pole i tak dalej.

SQL

SEQUENCE BY STRUCT(timestamp_col, id_col)

Python

sequence_by = struct("timestamp_col", "id_col")

Przykłady usługi AUTO CDC

W poniższych przykładach pokazano przetwarzanie SCD typu 1 i 2 przy użyciu źródła strumienia danych zmian. Przykładowe dane tworzą nowe rekordy użytkownika, usuwają rekordy użytkownika i aktualizują rekordy użytkowników. W przykładzie SCD Type 1 ostatnie UPDATE operacje docierają późno i są usuwane z tabeli docelowej, demonstrując obsługę zdarzeń w nieprawidłowej kolejności.

Poniżej przedstawiono rekordy wejściowe używane w tych przykładach. Te dane są tworzone przez uruchomienie zapytania w sekcji Tworzenie przykładowych danych .

userId nazwa city operacja sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lilia Cancun INSERT 2
123 null null USUŃ 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Jeśli anulujesz komentarz w ostatnim wierszu w zapytaniu generowania danych przykładowych, wstawia on następujący rekord, który określa obcięcie tabeli (wyczyść tabelę) w pliku sequenceNum=3:

userId nazwa city operacja sequenceNum
null null null SKRACAĆ 3

Uwaga / Notatka

Wszystkie poniższe przykłady obejmują opcje określania operacji DELETE i TRUNCATE , ale każdy z nich jest opcjonalny.

Tworzenie przykładowych danych

Uruchom następujące instrukcje, aby utworzyć przykładowy zestaw danych. Ten kod nie jest przeznaczony do uruchomienia w ramach definicji potoku. Uruchom to z folderu eksploracji w twoim potoku danych, zamiast z folderu przekształceń.

CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Przetwarzanie aktualizacji typu SCD 1

Typ SCD 1 przechowuje tylko najnowszą wersję każdego rekordu. Poniższy przykład odczytuje z powyższego kanału danych zmian i stosuje zmiany do docelowej tabeli przesyłania strumieniowego. Opracuj deklaratywne potoki Spark w usłudze Lakeflow, aby uruchomić ten kod.

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_current")

dp.create_auto_cdc_flow(
  target = "users_current",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

CREATE OR REFRESH STREAMING TABLE users_current;

CREATE FLOW apply_cdc AS AUTO CDC INTO
  users_current
FROM
  stream(main.cdc_tutorial.users_cdf)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Po uruchomieniu przykładu typu SCD 1 tabela docelowa zawiera następujące rekordy:

userId nazwa city
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lilia Cancun

Użytkownik 123 (Isabel) został usunięty i nie jest wyświetlany. Użytkownik 125 (Mercedes) pokazuje tylko najnowsze miasto (Guadalajara), ponieważ typ SCD 1 zastępuje poprzednie wartości. Wcześniejsza wartość w UPDATE i sequenceNum=5 została porzucona, ponieważ pojawiła się późniejsza aktualizacja w sequenceNum=6.

Po uruchomieniu przykładu z odkomentowanym rekordem TRUNCATE tabela zostanie wyczyszczona pod adresem sequenceNum=3. Oznacza to, że rekordy 124 i 126 nie znajdują się w tabeli, a końcowa tabela docelowa zawiera tylko następujący rekord:

userId nazwa city
125 Mercedes Guadalajara

Przetwarzanie aktualizacji typu SCD 2

Typ SCD 2 zachowuje pełną historię zmian, tworząc nowe wiersze dla każdej wersji rekordu z kolumnami __START_AT i __END_AT wskazującymi, kiedy każda wersja była aktywna.

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
  target = "users_history",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

CREATE OR REFRESH STREAMING TABLE users_history;

CREATE FLOW apply_cdc AS AUTO CDC INTO
  users_history
FROM
  stream(main.cdc_tutorial.users_cdf)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Po uruchomieniu przykładu typu SCD 2 tabela docelowa zawiera następujące rekordy:

userId nazwa city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 null
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lilia Cancun 2 null

Tabela zachowuje pełną historię danych. Użytkownik 123 ma dwie wersje (zakończone sekwencją 6 po usunięciu). Użytkownik 125 ma trzy wersje pokazujące zmiany miasta. Rekordy z __END_AT = null są obecnie aktywne.

Śledzenie podzestawu kolumn przy użyciu typu SCD 2

Domyślnie typ SCD 2 tworzy nową wersję za każdym razem, gdy każda wartość kolumny ulegnie zmianie. Możesz określić podzbiór kolumn do śledzenia, aby zmiany w innych kolumnach aktualizowały bieżącą wersję zamiast generowania nowego rekordu historii.

Poniższy przykład wyklucza kolumnę city ze śledzenia historii:

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
  target = "users_history",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

CREATE OR REFRESH STREAMING TABLE users_history;

CREATE FLOW apply_cdc AS AUTO CDC INTO
  users_history
FROM
  stream(main.cdc_tutorial.users_cdf)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Ponieważ city zmiany nie są śledzone, aktualizacje miejskie zastępują obecny wiersz zamiast tworzyć nową wersję. Tabela docelowa zawiera następujące rekordy:

userId nazwa city __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 null
125 Mercedes Guadalajara 2 null
126 Lilia Cancun 2 null

Przykłady automatycznego CDC z migawki

W poniższych sekcjach przedstawiono przykłady użycia AUTO CDC FROM SNAPSHOT do przetwarzania obrazów danych w tabele docelowe typu SCD 1 lub SCD 2. Aby uzyskać informacje na temat tego, kiedy używać tego interfejsu API, zobacz Zmienianie przechwytywania i migawek danych.

Przykład: przetwarzanie migawek przy użyciu czasu pozyskiwania potoku

Użyj tego podejścia, gdy migawki są regularnie dostarczane i uporządkowane, a także można polegać na sygnaturze czasowej wykonania potoku do wersjonowania. Nowa migawka jest pozyskiwana wraz z każdą aktualizacją potoku.

Migawki można odczytywać z wielu typów źródeł, w tym z tabel Delta, plików w chmurze i połączeń JDBC.

Krok 1. Tworzenie przykładowych danych

Utwórz tabelę zawierającą dane migawki. Uruchom następujący kod z notatnika lub usługi Databricks SQL w folderze explorations Twojego potoku:

CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.snapshot (
  userId INT,
  city STRING
);

INSERT INTO main.cdc_tutorial.snapshot VALUES
  (1, 'Oaxaca'),
  (2, 'Monterrey'),
  (3, 'Tijuana');

Krok 2. Uruchom automatyczny CDC z migawki

Aby uruchomić kod w tym kroku, twórz deklaratywne potoki Spark na platformie Lakeflow.

Wybierz typ źródła dla widoku migawki (przykładowy kod tworzenia generuje Tabelę Delta):

Opcja A: Odczyt z tabeli delty
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
  return spark.read.table("main.cdc_tutorial.snapshot")
Opcja B: odczyt z magazynu w chmurze
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
  return spark.read.format("csv").option("header", True).load("<snapshot-path>")
Opcja C: Odczyt z JDBC (tylko obliczenia klasyczne)
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
  return (spark.read
    .format("jdbc")
    .option("url", "<jdbc-url>")
    .option("dbtable", "<table-name>")
    .option("user", "<username>")
    .option("password", "<password>")
    .load()
  )

Wszystkie opcje, zapis do obiektu docelowego

Następnie dodaj tabelę docelową i przepływ pracy:

dp.create_streaming_table("target")

dp.create_auto_cdc_from_snapshot_flow(
  target = "target",
  source = "source",
  keys = ["userId"],
  stored_as_scd_type = 2
)

Po pierwszym uruchomieniu procesu potokowego wszystkie rekordy są wstawiane jako aktywne zapisy.

userId city __START_AT __END_AT
1 Oaxaca 0 null
2 Monterrey 0 null
3 Tijuana 0 null

Uwaga / Notatka

Aby zamiast tego użyć typu SCD 1 i zachować tylko bieżący stan, ustaw wartość stored_as_scd_type=1. W tym przypadku tabela docelowa nie zawiera kolumn __START_AT i __END_AT.

Krok 3: Zasymuluj nową migawkę i uruchom ponownie

Zaktualizuj tabelę źródłową, aby zasymulować przybycie nowej momentki (uruchom ten kod z notebooka lub pliku SQL w folderze explorations swojej pipeline):

TRUNCATE TABLE main.cdc_tutorial.snapshot;

INSERT INTO main.cdc_tutorial.snapshot VALUES
  (2, 'Carmel'),
  (3, 'Los Angeles'),
  (4, 'Death Valley'),
  (6, 'Kings Canyon');

Uruchom ponownie potok. AUTO CDC FROM SNAPSHOT Porównuje nową migawkę z poprzednią i wykrywa, że użytkownik 1 został usunięty, użytkownicy 2 i 3 zostały zaktualizowane, a użytkownicy 4 i 6 zostały wstawione. To generuje strumień zmian, i używa AUTO CDC do utworzenia tabeli wyjściowej.

Po drugim uruchomieniu z typem SCD 2 tabela docelowa zawiera następujące rekordy:

userId city __START_AT __END_AT
1 Oaxaca 0 1
2 Monterrey 0 1
2 Carmel 1 null
3 Tijuana 0 1
3 Los Angeles 1 null
4 Dolina Śmierci 1 null
6 Kanion Królów 1 null

Użytkownik 1 został zakończony (usunięty). Użytkownicy 2 i 3 mają dwie wersje pokazujące zmiany w mieście. Nowo wstawiono użytkowników 4 i 6.

Po drugim uruchomieniu z typem SCD 1 tabela docelowa zawiera tylko bieżący stan:

userId city
2 Carmel
3 Los Angeles
4 Dolina Śmierci
6 Kanion Królów

Przykład: przetwarzanie migawek przy użyciu funkcji wersji

Użyj tej metody, jeśli potrzebujesz jawnej kontroli nad kolejnością migawek. Na przykład użyj tego podejścia, gdy wiele migawek przychodzi w tym samym czasie, lub gdy migawki przychodzą w nieprawidłowej kolejności. Piszesz funkcję określającą, która migawka ma być przetwarzana jako następna oraz jej numer wersji. Interfejs API przetwarza migawki w kolejności rosnącej wersji:

  • Gdy wiele migawek jest przechowywanych, wszystkie są przetwarzane w kolejności.
  • Jeśli migawka pojawi się w niewłaściwej kolejności (na przykład snapshot_3 pojawi się po snapshot_4), zostanie pominięta.
  • Jeśli nie ma żadnych nowych migawek, funkcja zwraca None, a przetwarzanie nie odbywa się.

Krok 1. Przygotowywanie plików migawek

Utwórz pliki CSV zawierające dane migawek i dodaj je do lokalizacji zasobu danych lub przechowywania w chmurze. Nazwij pliki chronologicznie (na przykład snapshot_1.csv, snapshot_2.csv).

Każdy plik powinien zawierać kolumny dla userId i city. Przykład:

snapshot_1.csv:

userId city
1 Oaxaca
2 Monterrey
3 Tijuana

snapshot_2.csv:

userId city
2 Carmel
3 Los Angeles
4 Dolina Śmierci

Krok 2: Uruchom AUTO CDC FROM SNAPSHOT z funkcją wersji.

Utwórz nowy notes i wklej następujący kod potoku. Następnie opracuj deklaratywne potoki Lakeflow Spark.

from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
  snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data

  files = dbutils.fs.ls(snapshot_dir)
  snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]

  snapshot_versions = []
  for filename in snapshot_files:
    try:
      version = int(filename.replace("snapshot_", "").replace(".csv", ""))
      snapshot_versions.append(version)
    except ValueError:
      continue

  snapshot_versions.sort()

  if latest_snapshot_version is None:
    if snapshot_versions:
      next_version = snapshot_versions[0]
    else:
      return None
  else:
    next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
    if next_versions:
      next_version = next_versions[0]
    else:
      return None

  snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
  df = spark.read.format("csv").option("header", True).load(snapshot_path)
  return (df, next_version)


dp.create_streaming_table("main.cdc_tutorial.target_versioned")

dp.create_auto_cdc_from_snapshot_flow(
  target = "main.cdc_tutorial.target_versioned",
  source = next_snapshot_and_version,
  keys = ["userId"],
  stored_as_scd_type = 2
)

Uwaga / Notatka

Aby zamiast tego użyć typu SCD 1, ustaw wartość stored_as_scd_type=1.

Po przetworzeniu snapshot_1.csvtabela docelowa zawiera następujące rekordy:

userId city __START_AT __END_AT
1 Oaxaca 1 null
2 Monterrey 1 null
3 Tijuana 1 null

Po przetworzeniu snapshot_2.csvtabela docelowa zawiera następujące rekordy:

userId city __START_AT __END_AT
1 Oaxaca 1 2
2 Monterrey 1 2
2 Carmel 2 null
3 Tijuana 1 2
3 Los Angeles 2 null
4 Dolina Śmierci 2 null

Uwaga / Notatka

Pamiętaj, że w przypadku typu SCD 1 tabela wygląda dokładnie tak jak najnowsza migawka. Różnica polega na tym, że zapytania podrzędne mogą używać zestawienia zmian tylko do przetwarzania zmienionych rekordów.

Krok 3: Dodaj nowe migawki

Dodaj nowy plik CSV do miejsca przechowywania z danymi, które zostały zmodyfikowane (na przykład ze zmienionymi wartościami miast, nowymi wierszami lub usuniętymi wierszami). Następnie ponownie uruchom potok, aby przetworzyć nową migawkę.

Ograniczenia

  • Kolumna sekwencjonowania musi być sortowalnym typem danych. NULL wartości sekwencjonowania nie są obsługiwane.
  • AUTO CDC FROM SNAPSHOT jest obsługiwany tylko w interfejsie potoku języka Python; interfejs SQL nie jest obsługiwany.

Dodatkowe zasoby