Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Użyj instrukcji AUTO CDC ... INTO
, aby utworzyć przepływ, który używa funkcji przechwytywania danych zmieniających się (CDC) w ramach Lakeflow Declarative Pipelines. Ta instrukcja odczytuje zmiany ze źródła CDC i stosuje je do obiektu docelowego przesyłania strumieniowego.
- Aby dowiedzieć się więcej o usłudze CDC, zobacz Co to jest przechwytywanie zmian danych (CDC)?.
- Aby uzyskać więcej informacji na temat korzystania z
AUTO CDC
, zobacz API AUTO CDC: Uproszczenie przechwytywania zmian danych z pomocą potoków deklaratywnych Lakeflow. - Aby uzyskać więcej informacji na temat
CREATE FLOW
, zobacz CREATE FLOW (Lakeflow Declarative Pipelines).
Składnia
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Definiujesz ograniczenia dotyczące jakości danych dla celu, używając tej samej klauzuli CONSTRAINT
co inne zapytania Lakeflow Deklaratywne Potoki. Zobacz Zarządzanie jakością danych przy użyciu oczekiwań dotyczących przepływu danych.
Domyślnym działaniem zdarzeń INSERT
i UPDATE
jest wykonanie upsert zdarzeń CDC ze źródła: zaktualizowanie wierszy w tabeli docelowej, które odpowiadają określonym kluczom, lub wstawienie nowego wiersza, jeśli pasujący rekord nie istnieje w tabeli docelowej. Obsługę zdarzeń DELETE
można określić warunkiem APPLY AS DELETE WHEN
.
Ważne
Aby zastosować zmiany, należy zadeklarować docelową tabelę przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. W przypadku tabel typu SCD 2 podczas określania schematu tabeli docelowej należy również uwzględnić kolumny __START_AT
i __END_AT
z tym samym typem danych co pole sequence_by
.
Parametry
flow_name
Nazwa przepływu do utworzenia.
source
Źródło danych. Źródło musi być źródłem przesyłania strumieniowego . Użyj słowa kluczowego STREAM, aby stosować semantykę przesyłania strumieniowego do odczytu ze źródła. Jeśli odczyt napotka zmianę lub usunięcie istniejącego rekordu, zostanie zgłoszony błąd. Najbezpieczniej jest odczytywać dane ze źródeł statycznych lub takich, do których można dodawać dane, ale nie można ich modyfikować ani usuwać. Aby pozyskiwać dane, które mają zatwierdzenia zmian, możesz użyć Pythona i opcji
SkipChangeCommits
do obsługi błędów.Aby uzyskać więcej informacji na temat danych przesyłanych strumieniowo, zobacz Przekształcanie danych za pomocą potoków.
KEYS
Kolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Wartości w tych kolumnach służą do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej.
Aby zdefiniować kombinację kolumn, użyj rozdzielanej przecinkami listy kolumn.
Ta klauzula ia jest wymagana.
IGNORE NULL UPDATES
Umożliwia przyjmowanie aktualizacji zawierających część kolumn docelowych. Gdy zdarzenie CDC pasuje do istniejącego wiersza i zostanie określone, że należy ignorować aktualizacje o wartości NULL, kolumny z wartością
null
zachowają swoje istniejące wartości w elemencie docelowym. Dotyczy to również zagnieżdżonych kolumn z wartościąnull
.Ta klauzula jest opcjonalna.
Wartością domyślną jest zastąpienie istniejących kolumn wartościami
null
.APPLY AS DELETE WHEN
Określa, kiedy zdarzenie CDC powinno być traktowane jako
DELETE
zamiast upsert.W przypadku źródeł typu 2 SCD, do obsługi danych poza kolejnością, usunięty wiersz jest tymczasowo zachowywany jako nagrobek w podstawowej tabeli Delta, a widok jest tworzony w metadanych, który odfiltrowuje te nagrobki. Interwał przechowywania można skonfigurować za
pipelines.cdc.tombstoneGCThresholdInSeconds
pomocą właściwości tabeli.Ta klauzula jest opcjonalna.
APPLY AS TRUNCATE WHEN
Określa, kiedy zdarzenie CDC powinno być rozpatrywane jako cała tabela
TRUNCATE
. Ponieważ ta klauzula wyzwala całkowite usunięcie danych z tabeli docelowej, powinna być używana tylko w konkretnych przypadkach użycia wymagających tej funkcjonalności.Klauzula
APPLY AS TRUNCATE WHEN
jest obsługiwana tylko dla typu SCD 1. Typ SCD 2 nie obsługuje operacji skracania.Ta klauzula jest opcjonalna.
SEQUENCE BY
Nazwa kolumny określająca kolejność logiczną zdarzeń CDC w danych źródłowych. Deklaratywne potoki Lakeflow używają tego sekwencjonowania do obsługi zdarzeń zmiany, które docierają w nieodpowiedniej kolejności.
Jeśli do sekwencjonowania jest potrzebnych wiele kolumn, użyj wyrażenia
STRUCT
: najpierw uporządkuje według pierwszego pola struktury, następnie według drugiego pola, jeśli wystąpi remis, i tak dalej w kolejności.Określone kolumny muszą być sortowalnymi typami danych.
Ta klauzula jest wymagana.
COLUMNS
Określa podzbiór kolumn do uwzględnienia w tabeli docelowej. Masz dwie możliwości:
- Określ pełną listę kolumn do uwzględnienia:
COLUMNS (userId, name, city)
. - Określ listę kolumn do wykluczenia:
COLUMNS * EXCEPT (operation, sequenceNum)
Ta klauzula jest opcjonalna.
Domyślnie wszystkie kolumny są dołączane do tabeli docelowej, gdy klauzula
COLUMNS
nie jest określona.- Określ pełną listę kolumn do uwzględnienia:
STORED AS
Określa, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2.
Ta klauzula jest opcjonalna.
Wartość domyślna to SCD typ 1.
TRACK HISTORY ON
Określa podzbiór kolumn wyjściowych do generowania rekordów historii, gdy istnieją jakiekolwiek zmiany w tych określonych kolumnach. Masz dwie możliwości:
- Określ pełną listę kolumn do śledzenia:
COLUMNS (userId, name, city)
. - Określ listę kolumn, które mają być wykluczone ze śledzenia:
COLUMNS * EXCEPT (operation, sequenceNum)
Ta klauzula jest opcjonalna. Wartością domyślną jest śledzenie historii wszystkich kolumn wyjściowych w przypadku jakichkolwiek zmian, co odpowiada
TRACK HISTORY ON *
.- Określ pełną listę kolumn do śledzenia:
Przykłady
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);