Udostępnij za pośrednictwem


AUTO CDC INTO (Deklaratywne potoki Lakeflow)

Użyj instrukcji AUTO CDC ... INTO, aby utworzyć przepływ, który używa funkcji przechwytywania danych zmieniających się (CDC) w ramach Lakeflow Declarative Pipelines. Ta instrukcja odczytuje zmiany ze źródła CDC i stosuje je do obiektu docelowego przesyłania strumieniowego.

Składnia

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Definiujesz ograniczenia dotyczące jakości danych dla celu, używając tej samej klauzuli CONSTRAINT co inne zapytania Lakeflow Deklaratywne Potoki. Zobacz Zarządzanie jakością danych przy użyciu oczekiwań dotyczących przepływu danych.

Domyślnym działaniem zdarzeń INSERT i UPDATE jest wykonanie upsert zdarzeń CDC ze źródła: zaktualizowanie wierszy w tabeli docelowej, które odpowiadają określonym kluczom, lub wstawienie nowego wiersza, jeśli pasujący rekord nie istnieje w tabeli docelowej. Obsługę zdarzeń DELETE można określić warunkiem APPLY AS DELETE WHEN.

Ważne

Aby zastosować zmiany, należy zadeklarować docelową tabelę przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. W przypadku tabel typu SCD 2 podczas określania schematu tabeli docelowej należy również uwzględnić kolumny __START_AT i __END_AT z tym samym typem danych co pole sequence_by.

Zobacz Interfejsy API AUTO CDC: Uproszczenie przechwytywania danych zmian za pomocą deklaratywnych potoków Lakeflow.

Parametry

  • flow_name

    Nazwa przepływu do utworzenia.

  • source

    Źródło danych. Źródło musi być źródłem przesyłania strumieniowego . Użyj słowa kluczowego STREAM, aby stosować semantykę przesyłania strumieniowego do odczytu ze źródła. Jeśli odczyt napotka zmianę lub usunięcie istniejącego rekordu, zostanie zgłoszony błąd. Najbezpieczniej jest odczytywać dane ze źródeł statycznych lub takich, do których można dodawać dane, ale nie można ich modyfikować ani usuwać. Aby pozyskiwać dane, które mają zatwierdzenia zmian, możesz użyć Pythona i opcji SkipChangeCommits do obsługi błędów.

    Aby uzyskać więcej informacji na temat danych przesyłanych strumieniowo, zobacz Przekształcanie danych za pomocą potoków.

  • KEYS

    Kolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Wartości w tych kolumnach służą do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej.

    Aby zdefiniować kombinację kolumn, użyj rozdzielanej przecinkami listy kolumn.

    Ta klauzula ia jest wymagana.

  • IGNORE NULL UPDATES

    Umożliwia przyjmowanie aktualizacji zawierających część kolumn docelowych. Gdy zdarzenie CDC pasuje do istniejącego wiersza i zostanie określone, że należy ignorować aktualizacje o wartości NULL, kolumny z wartością null zachowają swoje istniejące wartości w elemencie docelowym. Dotyczy to również zagnieżdżonych kolumn z wartością null .

    Ta klauzula jest opcjonalna.

    Wartością domyślną jest zastąpienie istniejących kolumn wartościami null .

  • APPLY AS DELETE WHEN

    Określa, kiedy zdarzenie CDC powinno być traktowane jako DELETE zamiast upsert.

    W przypadku źródeł typu 2 SCD, do obsługi danych poza kolejnością, usunięty wiersz jest tymczasowo zachowywany jako nagrobek w podstawowej tabeli Delta, a widok jest tworzony w metadanych, który odfiltrowuje te nagrobki. Interwał przechowywania można skonfigurować za pipelines.cdc.tombstoneGCThresholdInSeconds pomocą właściwości tabeli.

    Ta klauzula jest opcjonalna.

  • APPLY AS TRUNCATE WHEN

    Określa, kiedy zdarzenie CDC powinno być rozpatrywane jako cała tabela TRUNCATE. Ponieważ ta klauzula wyzwala całkowite usunięcie danych z tabeli docelowej, powinna być używana tylko w konkretnych przypadkach użycia wymagających tej funkcjonalności.

    Klauzula APPLY AS TRUNCATE WHEN jest obsługiwana tylko dla typu SCD 1. Typ SCD 2 nie obsługuje operacji skracania.

    Ta klauzula jest opcjonalna.

  • SEQUENCE BY

    Nazwa kolumny określająca kolejność logiczną zdarzeń CDC w danych źródłowych. Deklaratywne potoki Lakeflow używają tego sekwencjonowania do obsługi zdarzeń zmiany, które docierają w nieodpowiedniej kolejności.

    Jeśli do sekwencjonowania jest potrzebnych wiele kolumn, użyj wyrażenia STRUCT: najpierw uporządkuje według pierwszego pola struktury, następnie według drugiego pola, jeśli wystąpi remis, i tak dalej w kolejności.

    Określone kolumny muszą być sortowalnymi typami danych.

    Ta klauzula jest wymagana.

  • COLUMNS

    Określa podzbiór kolumn do uwzględnienia w tabeli docelowej. Masz dwie możliwości:

    • Określ pełną listę kolumn do uwzględnienia: COLUMNS (userId, name, city).
    • Określ listę kolumn do wykluczenia: COLUMNS * EXCEPT (operation, sequenceNum)

    Ta klauzula jest opcjonalna.

    Domyślnie wszystkie kolumny są dołączane do tabeli docelowej, gdy klauzula COLUMNS nie jest określona.

  • STORED AS

    Określa, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2.

    Ta klauzula jest opcjonalna.

    Wartość domyślna to SCD typ 1.

  • TRACK HISTORY ON

    Określa podzbiór kolumn wyjściowych do generowania rekordów historii, gdy istnieją jakiekolwiek zmiany w tych określonych kolumnach. Masz dwie możliwości:

    • Określ pełną listę kolumn do śledzenia: COLUMNS (userId, name, city).
    • Określ listę kolumn, które mają być wykluczone ze śledzenia: COLUMNS * EXCEPT (operation, sequenceNum)

    Ta klauzula jest opcjonalna. Wartością domyślną jest śledzenie historii wszystkich kolumn wyjściowych w przypadku jakichkolwiek zmian, co odpowiada TRACK HISTORY ON *.

Przykłady

-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
  target
FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);