Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Użyj instrukcji AUTO CDC ... INTO, aby utworzyć przepływ wykorzystujący funkcję przechwytywania zmian danych (CDC) w Lakeflow Spark Declarative Pipelines. Ta instrukcja odczytuje zmiany ze źródła CDC i stosuje je do obiektu docelowego przesyłania strumieniowego.
- Aby dowiedzieć się więcej o usłudze CDC, zobacz Co to jest przechwytywanie zmian danych (CDC)?.
- Aby uzyskać więcej informacji na temat korzystania z systemu
AUTO CDC, zobacz AUTO CDC API: Uproszczenie przechwytywania danych o zmianach za pomocą potoków. - Aby uzyskać więcej informacji na temat
CREATE FLOW, zobacz w CREATE FLOW (pipelines).
Składnia
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Definiujesz ograniczenia jakości danych dla celu, używając tej samej klauzuli CONSTRAINT, co inne zapytania potoku. Zobacz Zarządzanie jakością danych przy użyciu oczekiwań dotyczących przepływu danych.
Domyślne zachowanie dla zdarzeń INSERT i UPDATE polega na upsercie zdarzeń CDC ze źródła: aktualizuje wiersze w tabeli docelowej, które pasują do określonych kluczy, lub wstawia nowy wiersz, gdy pasujący rekord nie istnieje w tabeli docelowej. Obsługę zdarzeń DELETE można określić za pomocą APPLY AS DELETE WHEN warunku.
Ważne
Aby zastosować zmiany, należy zadeklarować docelową tabelę przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. W przypadku tabel typu SCD 2 podczas określania schematu tabeli docelowej należy również uwzględnić kolumny __START_AT i __END_AT z takim samym typem danych jak pole sequence_by.
Zobacz Interfejsy API AUTO CDC: upraszczają przechwytywanie zmian danych za pomocą potoków.
Parametry
flow_nameNazwa przepływu do utworzenia.
sourceŹródło danych. Źródło musi być strumieniowe. Użyj słowa kluczowego STREAM, aby stosować semantykę przesyłania strumieniowego do odczytu ze źródła. Jeśli odczyt napotka zmianę lub usunięcie istniejącego rekordu, zostanie zgłoszony błąd. Odczyt ze źródeł statycznych lub dołączanych jest najbezpieczniejszy. Aby pozyskiwać dane, które mają zatwierdzenia zmian, możesz użyć Pythona i opcji
SkipChangeCommitsdo obsługi błędów.Aby uzyskać więcej informacji na temat danych przesyłanych strumieniowo, zobacz Przekształcanie danych za pomocą potoków.
KEYSKolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Wartości w tych kolumnach służą do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej.
Aby zdefiniować kombinację kolumn, użyj rozdzielanej przecinkami listy kolumn.
Ta klauzula jest wymagana.
IGNORE NULL UPDATESUmożliwia przyjmowanie aktualizacji zawierających podzbiór kolumn docelowych. Gdy zdarzenie CDC pasuje do istniejącego wiersza i zostanie określone ignoruj aktualizacje o wartości NULL, kolumny z wartością
nullzachowają istniejące wartości w obiekcie docelowym. Dotyczy to również zagnieżdżonych kolumn z wartościąnull.Ta klauzula jest opcjonalna.
Wartością domyślną jest zastąpienie istniejących kolumn wartościami
null.APPLY AS DELETE WHENOkreśla, kiedy zdarzenie CDC powinno być traktowane jako
DELETEzamiast upsert.W przypadku źródeł typu SCD 2, aby obsłużyć dane poza kolejnością, usunięty wiersz jest tymczasowo zachowywany jako znacznik usunięcia w podstawowej tabeli Delta, a widok jest tworzony w magazynie metadanych, który odfiltrowuje te znaczniki. Interwał przechowywania można skonfigurować za
pipelines.cdc.tombstoneGCThresholdInSecondspomocą właściwości tabeli.Ta klauzula jest opcjonalna.
APPLY AS TRUNCATE WHENOkreśla, kiedy zdarzenie CDC powinno być traktowane jako pełna tabela
TRUNCATE. Ponieważ ta klauzula wyzwala pełne trunkowanie tabeli docelowej, powinna być używana tylko w określonych przypadkach użycia, jeśli jest to wymagane przez daną funkcjonalność.Klauzula jest obsługiwana
APPLY AS TRUNCATE WHENtylko dla typu SCD 1. Typ SCD 2 nie obsługuje operacji obcinania.Ta klauzula jest opcjonalna.
SEQUENCE BYNazwa kolumny określająca kolejność logiczną zdarzeń CDC w danych źródłowych. Przetwarzanie potoku używa tej sekwencji w celu obsługi zdarzeń zmiany, które docierają w niewłaściwej kolejności.
Jeśli do sekwencjonowania jest potrzebnych wiele kolumn, użyj wyrażenia
STRUCT: najpierw będzie ono uporządkowane według pierwszego pola struktury, a następnie przez drugie pole, jeśli istnieje remis, i tak dalej.Określone kolumny muszą być sortowalnymi typami danych.
Ta klauzula jest wymagana.
COLUMNSOkreśla podzbiór kolumn do uwzględnienia w tabeli docelowej. Masz dwie możliwości:
- Określ pełną listę kolumn do uwzględnienia:
COLUMNS (userId, name, city). - Określ listę kolumn do wykluczenia:
COLUMNS * EXCEPT (operation, sequenceNum)
Ta klauzula jest opcjonalna.
Domyślnie wszystkie kolumny są dołączane do tabeli docelowej, gdy klauzula
COLUMNSnie jest określona.- Określ pełną listę kolumn do uwzględnienia:
STORED ASOkreśla, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2.
Ta klauzula jest opcjonalna.
Wartość domyślna to SCD typ 1.
TRACK HISTORY ONOkreśla podzbiór kolumn wyjściowych do generowania rekordów historii, gdy istnieją jakiekolwiek zmiany w tych określonych kolumnach. Masz dwie możliwości:
- Określ pełną listę kolumn do śledzenia:
COLUMNS (userId, name, city). - Określ listę kolumn, które mają być wykluczone ze śledzenia:
COLUMNS * EXCEPT (operation, sequenceNum)
Ta klauzula jest opcjonalna. Wartości domyślnej odpowiada śledzenie historii dla wszystkich kolumn wynikowych, gdy zachodzą jakiekolwiek zmiany, co jest równoważne
TRACK HISTORY ON *.- Określ pełną listę kolumn do śledzenia:
Przykłady
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);