Udostępnij za pomocą


create_auto_cdc_from_snapshot_flow

Ważne

Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

Funkcja create_auto_cdc_from_snapshot_flow tworzy przepływ, który używa funkcji przechwytywania zmian danych w potokach deklaratywnych (CDC) usługi Lakeflow Spark w celu przetwarzania danych źródłowych z migawek bazy danych. Zobacz Jak usługa CDC jest implementowana za pomocą interfejsu AUTO CDC FROM SNAPSHOT API?.

Uwaga / Notatka

Ta funkcja zastępuje poprzednią funkcję apply_changes_from_snapshot(). Te dwie funkcje mają ten sam podpis. Usługa Databricks zaleca aktualizację, aby używać nowej nazwy.

Ważne

Dla tej operacji musisz mieć docelową tabelę streamingu.

Aby utworzyć wymaganą tabelę docelową, możesz użyć funkcji create_streaming_table().

Składnia

from pyspark import pipelines as dp

dp.create_auto_cdc_from_snapshot_flow(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Uwaga / Notatka

W przypadku AUTO CDC FROM SNAPSHOT przetwarzania domyślne zachowanie polega na wstawieniu nowego wiersza, gdy pasujący rekord z tymi samymi kluczami nie istnieje w obiekcie docelowym. Jeśli istnieje pasujący rekord, jest aktualizowany tylko wtedy, gdy którakolwiek z wartości w wierszu uległa zmianie. Wiersze z kluczami obecnymi w obiekcie docelowym, ale nieobecnymi już w źródle, są usuwane.

Aby dowiedzieć się więcej na temat przetwarzania CDC za pomocą migawek, zobacz artykuł Interfejsy API AUTO CDC: Uproszczenie przechwytywania zmian danych za pomocą potoków. Przykłady użycia funkcji można znaleźć w przykładach okresowego pozyskiwania migawek oraz historycznego pozyskiwania migawek.

Parametry

Parameter Typ Description
target str To jest wymagane. Nazwa tabeli do zaktualizowania. Za pomocą funkcji create_streaming_table() można utworzyć tabelę docelową przed wykonaniem create_auto_cdc_from_snapshot_flow() funkcji.
source str lub lambda function To jest wymagane. Nazwa tabeli lub widoku do okresowych migawek albo funkcja lambda w języku Python, która zwraca ramkę danych migawki do przetworzenia oraz wersję migawki. Zobacz Implementowanie argumentusource.
keys list To jest wymagane. Kolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Służy do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej. Możesz określić jedną z następujących opcji:
  • Lista ciągów: ["userId", "orderId"]
  • Lista funkcji Spark SQL col() : [col("userId"), col("orderId"]. Argumenty do funkcji nie mogą zawierać kwalifikatorów col(). Można na przykład użyć elementu col(userId), ale nie można użyć elementu col(source.userId).
stored_as_scd_type str lub int Określa, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2. Ustaw wartość 1 dla typu SCD 1 lub 2 dla typu SCD 2. Wartość domyślna to SCD typ 1.
track_history_column_list lub track_history_except_column_list list Podzbiór kolumn wyjściowych do śledzenia historii zmian w tabeli docelowej. Użyj track_history_column_list polecenia , aby określić pełną listę kolumn do śledzenia. Użyj track_history_except_column_list polecenia , aby określić kolumny, które mają być wykluczone ze śledzenia. Możesz zadeklarować wartość jako listę ciągów lub jako funkcje Spark SQL col() :
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumenty do funkcji nie mogą zawierać kwalifikatorów col(). Można na przykład użyć elementu col(userId), ale nie można użyć elementu col(source.userId). Wartością domyślną jest dołączenie wszystkich kolumn do tabeli docelowej, gdy do funkcji nie zostanie przekazany argument track_history_column_list lub track_history_except_column_list.

Implementowanie argumentu source

Funkcja create_auto_cdc_from_snapshot_flow() zawiera source argument . W przypadku przetwarzania migawek historycznych argument source powinien być funkcją lambda w Pythonie, która zwraca dwie wartości do funkcji create_auto_cdc_from_snapshot_flow(): Pythonowy DataFrame zawierający dane migawki do przetworzenia oraz wersję migawki.

Poniżej znajduje się podpis funkcji lambda:

lambda Any => Optional[(DataFrame, Any)]
  • Argument funkcji lambda jest ostatnio przetworzoną wersją migawki.
  • Wartość zwracana funkcji lambda to None lub krotka dwóch wartości: Pierwsza wartość krotki to ramka danych zawierająca migawkę do przetworzenia. Drugą wartością krotki jest wersja migawki, która reprezentuje logiczną kolejność migawki.

Przykład, który implementuje i wywołuje funkcję lambda:

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

create_auto_cdc_from_snapshot_flow(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Środowisko uruchomieniowe Deklaratywne Potoki Lakeflow Spark wykonuje następujące kroki za każdym razem, gdy potok zawierający funkcję create_auto_cdc_from_snapshot_flow() jest wyzwalany:

  1. Uruchamia funkcję next_snapshot_and_version w celu załadowania następnego DataFrame migawki i odpowiedniej wersji migawki.
  2. Jeśli żaden DataFrame nie zostanie zwrócony, przebieg zostanie zakończony, a aktualizacja procesu potokowego zostanie oznaczona jako ukończona.
  3. Wykrywa zmiany w nowej migawce i inkrementalnie stosuje je do tabeli docelowej.
  4. Wraca do kroku 1, aby załadować następną migawkę i jej wersję.