Udostępnij za pomocą


create_auto_cdc_flow

Funkcja create_auto_cdc_flow() tworzy przepływ, który używa funkcji przechwytywania zmian danych (CDC) w deklaratywnych potokach Lakeflow Spark do przetwarzania danych źródłowych z kanału zmian danych (CDF).

Uwaga / Notatka

Ta funkcja zastępuje poprzednią funkcję apply_changes(). Te dwie funkcje mają ten sam podpis. Usługa Databricks zaleca aktualizację, aby używać nowej nazwy.

Ważne

Aby zastosować zmiany, należy zadeklarować docelową tabelę przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu create_auto_cdc_flow() tabeli docelowej należy uwzględnić __START_AT kolumny i __END_AT z tym samym typem danych co sequence_by pola.

Aby utworzyć wymaganą tabelę docelową, możesz użyć funkcji create_streaming_table() w interfejsie języka Python potoku.

Składnia

from pyspark import pipelines as dp

dp.create_auto_cdc_flow(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = <bool>,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None,
  name = None,
  once = <bool>
)

W przypadku przetwarzania za pomocą create_auto_cdc_flow, domyślne zachowanie dla zdarzeń INSERT i UPDATE polega na upsert zdarzeń CDC ze źródła: zaktualizować dowolne wiersze w tabeli docelowej, które odpowiadają określonym kluczom, lub wstawić nowy wiersz, gdy pasujący rekord nie istnieje w tabeli docelowej. Obsługę zdarzeń DELETE można określić za pomocą parametru apply_as_deletes .

Aby dowiedzieć się więcej na temat przetwarzania CDC za pomocą zestawienia zmian, zobacz Interfejsy API AUTO CDC: Upraszczanie przechwytywania zmian danych za pomocą potoków. Aby zapoznać się z przykładem użycia funkcji create_auto_cdc_flow(), zobacz Przykład: przetwarzanie typów SCD 1 i SCD 2 z danymi źródłowymi CDF.

Parametry

Parameter Typ Description
target str To jest wymagane. Nazwa tabeli do zaktualizowania. Za pomocą funkcji create_streaming_table() można utworzyć tabelę docelową przed wykonaniem create_auto_cdc_flow() funkcji.
source str To jest wymagane. Źródło danych zawierające rekordy CDC.
keys list To jest wymagane. Kolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Służy do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej. Możesz określić jedną z następujących opcji:
  • Lista ciągów: ["userId", "orderId"]
  • Lista funkcji Spark SQL col() : [col("userId"), col("orderId")]. Argumenty funkcji nie mogą zawierać kwalifikatorów col(). Można na przykład użyć col(userId), ale nie można użyć col(source.userId).
sequence_by str, col() lub struct() To jest wymagane. Nazwy kolumn określające logiczną kolejność zdarzeń CDC w danych źródłowych. Lakeflow Spark Deklaratywne Potoki używają tego sekwencjonowania do obsługi zdarzeń zmiany, które przychodzą w niewłaściwej kolejności. Określona kolumna musi być sortowalnym typem danych. Możesz określić jedną z następujących opcji:
  • Ciąg: "sequenceNum"
  • Funkcja Spark SQL col() : col("sequenceNum"). Argumenty funkcji nie mogą zawierać kwalifikatorów col(). Można na przykład użyć col(userId), ale nie można użyć col(source.userId).
  • struct() Łączenie wielu kolumn w celu rozstrzygnięcia remisów: struct("timestamp_col", "id_col"), najpierw nastąpi sortowanie według pierwszego pola struktury, następnie według drugiego pola w przypadku remisu, itd.
ignore_null_updates bool Zezwalaj na pozyskiwanie aktualizacji zawierających podzestaw kolumn docelowych. Gdy zdarzenie CDC pasuje do istniejącego wiersza i ignore_null_updates jest True, kolumny z null zachowują swoje istniejące wartości w obiekcie docelowym. Dotyczy to również zagnieżdżonych kolumn z wartością null. Kiedy ignore_null_updates to False, istniejące wartości zostają nadpisane wartościami null.
Wartość domyślna to False.
apply_as_deletes str lub expr() Określa, kiedy zdarzenie CDC powinno być traktowane jako DELETE zamiast upsert. Możesz określić jedną z następujących opcji:
  • Ciąg: "Operation = 'DELETE'"
  • Funkcja Spark SQL expr() : expr("Operation = 'DELETE'")

Aby obsłużyć dane nie w kolejności, usunięty wiersz jest tymczasowo zachowywany jako znacznik usunięcia w podstawowej tabeli Delta, a widok jest tworzony w katalogu metadanych, który filtruje te znaczniki usunięcia. Interwał przechowywania jest domyślnie ustawiony na dwa dni i można go skonfigurować za pomocą pipelines.cdc.tombstoneGCThresholdInSeconds właściwości tabeli.
apply_as_truncates str lub expr() Określa, kiedy zdarzenie CDC powinno być traktowane jako pełna tabela TRUNCATE. Możesz określić jedną z następujących opcji:
  • Ciąg: "Operation = 'TRUNCATE'"
  • Funkcja Spark SQL expr() : expr("Operation = 'TRUNCATE'")

Ponieważ ta klauzula wyzwala pełne trunkowanie tabeli docelowej, powinna być używana tylko w określonych przypadkach użycia, jeśli jest to wymagane przez daną funkcjonalność. Parametr apply_as_truncates jest obsługiwany tylko dla typu SCD 1. Typ SCD 2 nie obsługuje operacji skracania.
column_list lub except_column_list list Podzbiór kolumn do uwzględnienia w tabeli docelowej. Użyj column_list polecenia , aby określić pełną listę kolumn do uwzględnienia. Użyj except_column_list, aby określić kolumny do wykluczenia. Możesz zadeklarować wartość jako listę ciągów lub jako funkcje Spark SQL col() :
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

Argumenty funkcji nie mogą zawierać kwalifikatorów col(). Można na przykład użyć col(userId), ale nie można użyć col(source.userId). Domyślnie dołączane są wszystkie kolumny do tabeli docelowej, gdy do funkcji nie zostanie przekazany żaden argument column_list lub except_column_list.
stored_as_scd_type str lub int Określa, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2. Ustaw wartość 1 dla typu SCD 1 lub 2 dla typu SCD 2. Wartość domyślna to SCD typ 1.
track_history_column_list lub track_history_except_column_list list Podzbiór kolumn wyjściowych do śledzenia historii zmian w tabeli docelowej. Użyj track_history_column_list polecenia , aby określić pełną listę kolumn do śledzenia. Użyj track_history_except_column_list polecenia , aby określić kolumny, które mają być wykluczone ze śledzenia. Możesz zadeklarować wartość jako listę ciągów lub jako funkcje Spark SQL col() :
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumenty funkcji nie mogą zawierać kwalifikatorów col(). Można na przykład użyć col(userId), ale nie można użyć col(source.userId). Domyślnie dołączane są wszystkie kolumny do tabeli docelowej, gdy do funkcji nie zostanie przekazany żaden argument track_history_column_list lub track_history_except_column_list.
name str Nazwa przepływu. Jeśli nie zostanie podana, wartość domyślna to ta sama wartość co target.
once bool Opcjonalnie zdefiniuj przepływ jako przepływ jednorazowy, taki jak wypełnienie wsteczne. Używanie once=True zmienia przepływ na dwa sposoby:
  • Wartość zwracana. streaming-query. w tym przypadku musi być wsadową ramką danych, a nie strumieniową ramką danych.
  • Domyślnie przepływ jest uruchamiany jeden raz. W przypadku zaktualizowania pipeline'u przez pełne odświeżenie, przepływ ONCE zostanie uruchomiony ponownie w celu odtworzenia danych.