Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Funkcja create_auto_cdc_flow() tworzy przepływ, który używa funkcji przechwytywania zmian danych (CDC) w deklaratywnych potokach Lakeflow Spark do przetwarzania danych źródłowych z kanału zmian danych (CDF).
Uwaga / Notatka
Ta funkcja zastępuje poprzednią funkcję apply_changes(). Te dwie funkcje mają ten sam podpis. Usługa Databricks zaleca aktualizację, aby używać nowej nazwy.
Ważne
Aby zastosować zmiany, należy zadeklarować docelową tabelę przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu create_auto_cdc_flow() tabeli docelowej należy uwzględnić __START_AT kolumny i __END_AT z tym samym typem danych co sequence_by pola.
Aby utworzyć wymaganą tabelę docelową, możesz użyć funkcji create_streaming_table() w interfejsie języka Python potoku.
Składnia
from pyspark import pipelines as dp
dp.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = <bool>,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None,
name = None,
once = <bool>
)
W przypadku przetwarzania za pomocą create_auto_cdc_flow, domyślne zachowanie dla zdarzeń INSERT i UPDATE polega na upsert zdarzeń CDC ze źródła: zaktualizować dowolne wiersze w tabeli docelowej, które odpowiadają określonym kluczom, lub wstawić nowy wiersz, gdy pasujący rekord nie istnieje w tabeli docelowej. Obsługę zdarzeń DELETE można określić za pomocą parametru apply_as_deletes .
Aby dowiedzieć się więcej na temat przetwarzania CDC za pomocą zestawienia zmian, zobacz Interfejsy API AUTO CDC: Upraszczanie przechwytywania zmian danych za pomocą potoków. Aby zapoznać się z przykładem użycia funkcji create_auto_cdc_flow(), zobacz Przykład: przetwarzanie typów SCD 1 i SCD 2 z danymi źródłowymi CDF.
Parametry
| Parameter | Typ | Description |
|---|---|---|
target |
str |
To jest wymagane. Nazwa tabeli do zaktualizowania. Za pomocą funkcji create_streaming_table() można utworzyć tabelę docelową przed wykonaniem create_auto_cdc_flow() funkcji. |
source |
str |
To jest wymagane. Źródło danych zawierające rekordy CDC. |
keys |
list |
To jest wymagane. Kolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Służy do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej. Możesz określić jedną z następujących opcji:
|
sequence_by |
str, col() lub struct() |
To jest wymagane. Nazwy kolumn określające logiczną kolejność zdarzeń CDC w danych źródłowych. Lakeflow Spark Deklaratywne Potoki używają tego sekwencjonowania do obsługi zdarzeń zmiany, które przychodzą w niewłaściwej kolejności. Określona kolumna musi być sortowalnym typem danych. Możesz określić jedną z następujących opcji:
|
ignore_null_updates |
bool |
Zezwalaj na pozyskiwanie aktualizacji zawierających podzestaw kolumn docelowych. Gdy zdarzenie CDC pasuje do istniejącego wiersza i ignore_null_updates jest True, kolumny z null zachowują swoje istniejące wartości w obiekcie docelowym. Dotyczy to również zagnieżdżonych kolumn z wartością null. Kiedy ignore_null_updates to False, istniejące wartości zostają nadpisane wartościami null.Wartość domyślna to False. |
apply_as_deletes |
str lub expr() |
Określa, kiedy zdarzenie CDC powinno być traktowane jako DELETE zamiast upsert. Możesz określić jedną z następujących opcji:
Aby obsłużyć dane nie w kolejności, usunięty wiersz jest tymczasowo zachowywany jako znacznik usunięcia w podstawowej tabeli Delta, a widok jest tworzony w katalogu metadanych, który filtruje te znaczniki usunięcia. Interwał przechowywania jest domyślnie ustawiony na dwa dni i można go skonfigurować za pomocą pipelines.cdc.tombstoneGCThresholdInSeconds właściwości tabeli. |
apply_as_truncates |
str lub expr() |
Określa, kiedy zdarzenie CDC powinno być traktowane jako pełna tabela TRUNCATE. Możesz określić jedną z następujących opcji:
Ponieważ ta klauzula wyzwala pełne trunkowanie tabeli docelowej, powinna być używana tylko w określonych przypadkach użycia, jeśli jest to wymagane przez daną funkcjonalność. Parametr apply_as_truncates jest obsługiwany tylko dla typu SCD 1. Typ SCD 2 nie obsługuje operacji skracania. |
column_list lub except_column_list |
list |
Podzbiór kolumn do uwzględnienia w tabeli docelowej. Użyj column_list polecenia , aby określić pełną listę kolumn do uwzględnienia. Użyj except_column_list, aby określić kolumny do wykluczenia. Możesz zadeklarować wartość jako listę ciągów lub jako funkcje Spark SQL col() :
Argumenty funkcji nie mogą zawierać kwalifikatorów col(). Można na przykład użyć col(userId), ale nie można użyć col(source.userId). Domyślnie dołączane są wszystkie kolumny do tabeli docelowej, gdy do funkcji nie zostanie przekazany żaden argument column_list lub except_column_list. |
stored_as_scd_type |
str lub int |
Określa, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2. Ustaw wartość 1 dla typu SCD 1 lub 2 dla typu SCD 2. Wartość domyślna to SCD typ 1. |
track_history_column_list lub track_history_except_column_list |
list |
Podzbiór kolumn wyjściowych do śledzenia historii zmian w tabeli docelowej. Użyj track_history_column_list polecenia , aby określić pełną listę kolumn do śledzenia. Użyj track_history_except_column_list polecenia , aby określić kolumny, które mają być wykluczone ze śledzenia. Możesz zadeklarować wartość jako listę ciągów lub jako funkcje Spark SQL col() :
Argumenty funkcji nie mogą zawierać kwalifikatorów col(). Można na przykład użyć col(userId), ale nie można użyć col(source.userId). Domyślnie dołączane są wszystkie kolumny do tabeli docelowej, gdy do funkcji nie zostanie przekazany żaden argument track_history_column_list lub track_history_except_column_list. |
name |
str |
Nazwa przepływu. Jeśli nie zostanie podana, wartość domyślna to ta sama wartość co target. |
once |
bool |
Opcjonalnie zdefiniuj przepływ jako przepływ jednorazowy, taki jak wypełnienie wsteczne. Używanie once=True zmienia przepływ na dwa sposoby:
|