Udostępnij za pośrednictwem


Używanie strumienia danych zmian w Delta Lake na platformie Azure Databricks

Zestawienie zmian danych umożliwia usłudze Azure Databricks śledzenie zmian na poziomie wiersza między wersjami tabeli delty. Po włączeniu na tabeli Delta, środowisko uruchomieniowe rejestruje zdarzenia zmiany dla wszystkich danych zapisanych w tabeli. Obejmuje to dane wierszy wraz z metadanymi wskazującymi, czy określony wiersz został wstawiony, usunięty lub zaktualizowany.

Ważne

Zestawienie zmian danych działa razem z historią tabel w celu udostępnienia informacji o zmianach. Ponieważ klonowanie tabeli delty tworzy oddzielną historię, zestawienie danych zmian w sklonowanych tabelach nie jest zgodne z oryginalną tabelą.

Przyrostowe przetwarzanie zmian danych

Firma Databricks zaleca użycie funkcji zmian danych w połączeniu z Structured Streaming, aby stopniowo przetwarzać zmiany w tabelach Delta. Musisz użyć Strukturalnego Przesyłania Strumieniowego w Azure Databricks, aby automatycznie śledzić wersje przepływu danych o zmianach w tabeli.

Uwaga

Deklaratywne Potoki Lakeflow udostępniają funkcje do łatwego propagowania danych o zmianach i przechowywania wyników jako tabeli SCD (powoli zmieniający się wymiar) typu 1 lub 2. Zobacz Interfejsy API AUTO CDC: Uproszczenie przechwytywania danych zmian za pomocą deklaratywnych potoków Lakeflow.

Aby odczytać strumień danych o zmianach z tabeli, musisz włączyć go dla tej tabeli. Zobacz Włączanie strumienia danych zmian.

Ustaw opcję readChangeFeed na true podczas konfigurowania strumienia względem tabeli, aby odczytać strumień danych zmian, jak pokazano w poniższym przykładzie składni.

Pyton

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Skala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Domyślnie strumień zwraca najnowszą migawkę tabeli przy pierwszym uruchomieniu jako INSERT oraz przyszłe zmiany jako dane zmian.

Zatwierdzenie danych następuje jako część transakcji Delta Lake i stają się dostępne jednocześnie, gdy nowe dane zostają zatwierdzone w tabeli.

Opcjonalnie możesz określić wersję początkową. Zobacz Czy należy określić wersję początkową?.

Kanał danych zmian obsługuje również przetwarzanie wsadowe, które wymaga określenia wersji początkowej. Zobacz Przegląd zmian w zapytaniach wsadowych.

Opcje, takie jak limity szybkości (maxFilesPerTrigger, maxBytesPerTrigger) i excludeRegex są również obsługiwane podczas odczytywania danych zmiany.

Ograniczanie szybkości może być atomowe dla wersji innych niż początkowa wersja migawki. Oznacza to, że cała wersja zatwierdzenia będzie ograniczona lub zostanie zwrócone całe zatwierdzenie.

Czy należy określić wersję początkową?

Opcjonalnie możesz określić wersję początkową, jeśli chcesz zignorować zmiany, które wystąpiły przed określoną wersją. Wersję można określić przy użyciu znacznika czasu lub numeru identyfikatora wersji zarejestrowanego w dzienniku transakcji delty.

Uwaga

Do odczytu wsadowego wymagana jest wersja początkowa, a wiele wzorców wsadowych może skorzystać z ustawienia opcjonalnej wersji końcowej.

Podczas konfigurowania obciążeń przesyłania strumieniowego związanych z pasem danych zmian ważne jest, aby zrozumieć, jak określenie wersji początkowej wpływa na przetwarzanie.

Wiele obciążeń przesyłania strumieniowego, zwłaszcza nowych potoków przetwarzania danych, korzysta z domyślnego zachowania. W przypadku zachowania domyślnego pierwsza partia jest przetwarzana, gdy strumień najpierw rejestruje wszystkie istniejące rekordy w tabeli jako INSERT operacje w strumieniu danych zmian.

Jeśli tabela docelowa zawiera już wszystkie zapisy z odpowiednimi zmianami do pewnego momentu, określ wersję początkową, aby uniknąć przetwarzania stanu tabeli źródłowej jako INSERT zdarzeń.

Następująca przykładowa składnia odzyskiwania po niepowodzeniu przesyłania strumieniowego, w którym punkt kontrolny został uszkodzony. W tym przykładzie przyjęto założenie, że spełnione są następujące warunki:

  1. Strumień danych o zmianach został włączony w tabeli źródłowej podczas jej tworzenia.
  2. Docelowa tabela podrzędna przetworzyła wszystkie zmiany do wersji 75 włącznie.
  3. Historia wersji tabeli źródłowej jest dostępna dla wersji 70 lub nowszych.

Pyton

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Skala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

W tym przykładzie należy również określić nową lokalizację punktu kontrolnego.

Ważne

Jeśli określisz wersję początkową, strumień nie może rozpocząć się od nowego punktu kontrolnego, jeśli wersja początkowa nie jest już obecna w historii tabeli. Usługa Delta Lake automatycznie czyści historyczne wersje, co oznacza, że wszystkie określone wersje początkowe zostaną ostatecznie usunięte.

Zobacz Czy mogę użyć zestawienia zmian danych, aby odtworzyć całą historię tabeli?.

Odczyt zmian w zapytaniach wsadowych

Składnia zapytań wsadowych umożliwia odczytywanie wszystkich zmian rozpoczynających się od określonej wersji lub odczytywanie zmian w określonym zakresie wersji.

Należy określić wersję jako liczbę całkowitą oraz znacznik czasu jako ciąg znaków w formacie yyyy-MM-dd[ HH:mm:ss[.SSS]].

Wersje początkowe i końcowe są uwzględniane w zapytaniach. Aby odczytać zmiany z określonej wersji początkowej do najnowszej wersji tabeli, określ tylko wersję początkową.

Jeśli podasz wersję niższą lub sygnaturę czasową starszą niż ta, która zarejestrowała zdarzenia zmiany — to znaczy, gdy strumień danych o zmianach został włączony — zostanie zgłoszony błąd wskazujący, że strumień danych o zmianach nie został włączony.

W poniższych przykładach składni pokazano, jak używać opcji wersji początkowej i końcowej przy odczytach wsadowych.

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Pyton

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Skala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Uwaga

Domyślnie, jeśli użytkownik przekaże wersję lub znacznik czasu, który przekracza ostatnie zatwierdzenie w tabeli, zostanie zgłoszony błąd timestampGreaterThanLatestCommit. W środowisku Databricks Runtime 11.3 LTS lub nowszym kanał danych zmiany może obsłużyć sytuację wersji spoza zakresu, jeśli użytkownik skonfiguruje następującą konfigurację na true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Jeśli podasz wersję początkową większą niż ostatnie zatwierdzenie w tabeli lub znacznik czasu rozpoczęcia nowszy niż ostatnie zatwierdzenie w tabeli, po włączeniu poprzedniej konfiguracji zwracany jest pusty wynik odczytu.

Jeśli podasz wersję końcową większą niż ostatnie zatwierdzenie w tabeli lub znacznik czasu zakończenia nowszy niż ostatnie zatwierdzenie w tabeli, po włączeniu poprzedniej konfiguracji w trybie odczytu wsadowego zostaną zwrócone wszystkie zmiany między wersją początkową a ostatnim zatwierdzeniem.

Jaki jest schemat zestawienia zmian danych?

Podczas odczytywania ze źródła danych zmian dla tabeli jest używany schemat najnowszej wersji tabeli.

Uwaga

Większość operacji zmiany schematu i ewolucji jest w pełni obsługiwana. Tabela z włączonym mapowaniem kolumn nie obsługuje wszystkich przypadków użycia i demonstruje różne zachowanie. Zobacz Zmienianie ograniczeń źródła danych dla tabel z włączonym mapowaniem kolumn.

Oprócz kolumn danych ze schematu tabeli delta zestawienie danych zawiera kolumny metadanych identyfikujące typ zdarzenia zmiany:

Nazwa kolumny Typ Wartości
_change_type Struna/Sznurek (if applicable) insert, update_preimage , update_postimagedelete(1)
_commit_version Długi Wersja dziennika Delta lub tabeli, która zawiera zmianę.
_commit_timestamp Sygnatura czasowa Sygnatura czasowa skojarzona podczas tworzenia zatwierdzenia.

(1)preimage jest wartością przed aktualizacją, postimage jest wartością po aktualizacji.

Uwaga

Nie można włączyć strumienia danych zmiany w tabeli, jeśli struktura zawiera kolumny o takich samych nazwach jak te dodane kolumny. Zmień nazwy kolumn w tabeli, aby rozwiązać ten konflikt przed próbą włączenia strumienia danych zmian.

Włącz strumień danych o zmianach

Zmiany w danych można odczytywać tylko dla tablic, które mają włączoną obsługę. Należy jawnie włączyć opcję zmiany strumienia danych przy użyciu jednej z następujących metod:

  • Nowa tabela: ustaw właściwość delta.enableChangeDataFeed = true tabeli w poleceniu CREATE TABLE .

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Istniejąca tabela: ustaw właściwość delta.enableChangeDataFeed = true tabeli w poleceniu ALTER TABLE .

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Wszystkie nowe tabele:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Ważne

Rejestrowane są tylko zmiany wprowadzone po włączeniu kanału danych o zmianach. Wcześniejsze zmiany w tabeli nie są przechwytywane.

Jak są rejestrowane zmiany danych?

Delta Lake próbuje rejestrować zmiany danych w najbardziej wydajny sposób i może wykorzystać inne funkcje Delta Lake włączone na tabeli, aby zoptymalizować sposób, w jaki zmiany danych są przedstawiane w pamięci.

Włączenie strumienia danych zmiennych może spowodować niewielki wzrost kosztów przechowywania tabeli. Wynika to z faktu, że zmiany danych mogą być rejestrowane w osobnych plikach. Niektóre operacje, takie jak operacje wstawiania i usuwanie pełnej partycji, nie generują plików danych zmian, ponieważ usługa Azure Databricks może efektywnie obliczyć zestawienie danych zmian bezpośrednio z dziennika transakcji.

Wszystkie pliki, które rejestrują zmiany danych, są zgodne z zasadami przechowywania tabeli. Pliki zmian danych są usuwane po uruchomieniu polecenia VACUUM, a zmiany są rekonstruowane z dziennika transakcji zgodnie z polityką zatrzymywania punktów kontrolnych.

Nie należy próbować rekonstruować zestawienia zmian przez bezpośrednie wykonywanie zapytań dotyczących plików rejestrujących zmiany danych. Zawsze używaj interfejsów API Delta Lake podczas pracy ze strumieniem danych zmian.

Czy mogę użyć zestawienia zmian danych, aby odtworzyć całą historię tabeli?

Zestawienie danych zmian nie ma służyć jako trwały rekord wszystkich zmian w tabeli. Zmiana źródła danych rejestruje tylko zmiany, które występują po jej włączeniu.

Kanał danych zmian i Delta Lake umożliwiają zawsze odtworzenie pełnej migawki tabeli źródłowej, co oznacza, że możesz rozpocząć nowy proces odczytu strumieniowego względem tabeli z włączonym kanałem danych zmian i przechwycić bieżącą wersję tej tabeli oraz wszystkie zmiany, które następują później.

Rekordy w kanale danych zmian należy traktować jako przejściowe i dostępne tylko dla określonego okna przechowywania. Dziennik transakcji Delta usuwa wersje tabel i odpowiadające im wersje strumienia zmian danych w regularnych odstępach czasu. Po usunięciu wersji z dziennika transakcji nie można już odczytać zestawienia danych zmian dla tej wersji.

Jeśli przypadek użycia wymaga zachowania trwałej historii wszystkich zmian w tabeli, należy użyć logiki przyrostowej do zapisywania rekordów ze źródła danych zmian do nowej tabeli. W poniższym przykładzie kodu pokazano użycie funkcjonalności trigger.AvailableNow, która wykorzystuje inkrementalne przetwarzanie w Strukturalnym Przesyłaniu Strumieniowym, ale przetwarza dostępne dane jako obciążenie wsadowe. Można zaplanować to obciążenie asynchronicznie przy użyciu głównych potoków przetwarzania, aby utworzyć kopię zapasową strumienia danych zmian do celów audytowych lub pełnej odtwarzalności.

Pyton

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Skala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Zmienianie ograniczeń źródła danych dla tabel z włączonym mapowaniem kolumn

Po włączeniu mapowania kolumn w tabeli delty można usuwać lub zmieniać nazwy kolumn w tabeli bez ponownego zapisywania plików danych dla istniejących danych. Po włączeniu mapowania kolumn zmiana źródła danych ma ograniczenia po wprowadzeniu zmian schematu nie addytywnego, takich jak zmiana nazwy lub usunięcie kolumny, zmiana typu danych lub zmiany wartości null.

Ważne

  • Nie można odczytać strumienia danych zmian dla transakcji lub zakresu, w którym następuje nieaddytywna zmiana schematu przy użyciu semantyki wsadowej.
  • W środowisku Databricks Runtime 12.2 LTS i wcześniejszych wersjach tabele z włączonym mapowaniem kolumn, które doświadczyły nieaddytywnych zmian schematu, nie obsługują odczytów przesyłanych strumieniowo na kanale zmiany danych. Zobacz Przesyłanie strumieniowe za pomocą mapowania kolumn i zmian schematu.
  • W wersji Databricks Runtime 11.3 LTS i wcześniejszych nie można odczytać strumienia zmian danych dla tabel z włączonym mapowaniem kolumn, które przeszły zmianę nazwy lub usunięcie kolumn.

W środowisku Databricks Runtime 12.2 LTS lub nowszym można wykonywać operacje odczytu wsadowego dla strumienia danych zmian dla tabel z włączonym mapowaniem kolumn, które doświadczyły zmian schematu, które nie są addytywne. Zamiast używać schematu najnowszej wersji tabeli, operacje odczytu używają schematu końcowej wersji tabeli określonej w zapytaniu. Zapytania nadal kończą się niepowodzeniem, jeśli określony zakres wersji obejmuje zmianę schematu nie addytywnego.