Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W przypadku obsługi dużych ilości danych potrzebna jest ścieżka przetwarzania, która przetwarza się jedynie nowe i zmienione rekordy danych, zamiast ponownie przetwarzać cały zestaw danych. Jest to nazywane przyrostowym ETL. W usłudze Databricks SQL można tworzyć przyrostowe potoki ETL przy użyciu tabel strumieniowych i zmaterializowanych widoków bez konieczności pisania kodu proceduralnego ani ręcznego planowania odświeżeń.
Ten samouczek przeprowadzi Cię przez typowy wzorzec: śledzenie zmian produktów w czasie. Możesz utworzyć tabelę źródłową, przechwycić zdarzenia zmian, utworzyć tabelę wymiarów, która zachowuje pełną historię każdego produktu i dodać zagregowaną warstwę raportowania na górze.
Kluczową funkcją w tym samouczku jest AUTO CDC. W tradycyjnym magazynie danych można napisać złożone zapytania MERGE INTO do uzgadniania zdarzeń wstawiania, aktualizowania i usuwania w tabeli docelowej. Takie podejście jest podatne na błędy, zwłaszcza gdy zdarzenia docierają w niewłaściwej kolejności.
AUTO CDC obsługuje to za Ciebie. Deklarujesz klucz biznesowy, kolumnę sekwencjonowania oraz określasz, czy chcesz typ SCD 1 (tylko najnowsza wartość) czy SCD 2 (pełna historia), a Azure Databricks automatycznie stosuje odpowiednią logikę scalania. Aby zapoznać się z przeglądem usługi CDC, zapoznaj się z AUTO CDC API: Upraszczanie przechwytywania zmian danych za pomocą potoków.
Po ukończeniu tego samouczka będziesz mieć następujące elementy:
- Utworzono tabelę źródłową, która śledzi zmiany za pomocą zestawienia zmian danych.
- Sprawdzono surowe dane dotyczące zmiany, aby zrozumieć strumień zdarzeń CDC.
- Użyto
AUTO CDCdo utworzenia tabeli wymiaru SCD typu 2 z tych zdarzeń. - Zdarzenia usuwania przetwarzane przyrostowo w przepływie danych.
- Utworzono zmaterializowany widok, który przyrostowo utrzymuje raport agregujący.
- Skonfigurowano
SCHEDULE REFRESH EVERY 1 DAY, aby zmiany propagowały się automatycznie w potoku.
Wymagania
Aby ukończyć ten samouczek, musisz spełnić następujące wymagania:
- Obszar roboczy Azure Databricks z włączonym katalogiem Unity.
- Usługa SQL Warehouse (bezserwerowa lub pro).
- Mieć uprawnienia do tworzenia zasobu obliczeniowego lub dostępu do zasobu obliczeniowego.
- Bezserwerowe obliczenia włączone dla twojego konta. Zobacz funkcje o ograniczonej dostępności regionalnej.
Krok 1. Konfigurowanie katalogu i schematu
Otwórz edytor Databricks SQL i ustaw swój domyślny katalog oraz schemat. Musisz mieć uprawnienia do USE katalogu i wybranego schematu:
USE CATALOG <your-catalog>;
USE SCHEMA <your-schema>;
Krok 2. Tworzenie tabeli źródłowej i ładowanie danych
Utwórz tabelę products z włączonym Użyj źródła danych zmian usługi Delta Lake na Azure Databricks (CDF). CDF to funkcja usługi Delta Lake, która rejestruje każde wstawianie, aktualizowanie i usuwanie jako dziennik zmian z możliwością wykonywania zapytań. Jest to podobne do strumienia CDC z transakcyjnego systemu źródłowego, z wyjątkiem tego, że zmiany są przechwytywane bezpośrednio w tabeli Delta, a nie z dziennika zewnętrznego. Używasz tutaj CDF, aby generować zdarzenia zmiany, które będą wykorzystywane przez dalszy potok.
Utwórz tabelę i załaduj początkowe rekordy:
CREATE OR REPLACE TABLE products ( product_id INT, product_name STRING, category STRING, warehouse STRING ) TBLPROPERTIES (delta.enableChangeDataFeed = true); INSERT INTO products VALUES (1, 'Spoon', 'Cutlery', 'Seattle'), (2, 'Fork', 'Cutlery', 'Portland'), (3, 'Knife', 'Cutlery', 'Denver'), (4, 'Chair', 'Furniture', 'Austin'), (5, 'Table', 'Furniture', 'Chicago'), (6, 'Lamp', 'Lighting', 'Boston'), (7, 'Mug', 'Kitchenware', 'Seattle'), (8, 'Plate', 'Kitchenware', 'Atlanta'), (9, 'Bowl', 'Kitchenware', 'Dallas'), (10, 'Glass', 'Kitchenware', 'Phoenix');Zasymuluj zmiany nadrzędne, w tym nowe produkty, przenoszenie magazynu i ponowne przypisywanie kategorii:
INSERT INTO products VALUES (11, 'Napkin', 'Dining', 'San Francisco'), (12, 'Coaster', 'Dining', 'New York'); UPDATE products SET warehouse = 'Los Angeles' WHERE product_id = 1; UPDATE products SET category = 'Dining' WHERE product_id = 2;
Krok 3. Wykonywanie zapytań dotyczących zestawienia danych zmian
Przed utworzeniem potoku przetwarzania danych warto zapoznać się z nieprzetworzonymi zdarzeniami zmian, aby zrozumieć, co AUTO CDC będzie przetwarzać. Funkcja table_changes() odczytuje dziennik CDF i zwraca każdą przechwyconą operację wraz z kolumnami metadanych:
SELECT
product_id, product_name, warehouse,
_change_type, _commit_version
FROM table_changes('products', 1)
ORDER BY _commit_version, product_id;
Na przykład Spoon ma trzy wydarzenia: insert(Seattle), update_preimage(Seattle) i update_postimage(Los Angeles).
Zwróć uwagę, że pojedyncza zmiana logiczna (na przykład przeniesienie łyżki do innego magazynu) powoduje utworzenie wielu zdarzeń: preimage i postimage. W tradycyjnym magazynie napiszesz instrukcję MERGE, aby uzgodnić wszystkie te zdarzenia w tabeli docelowej, zajmować się wstawianiem, aktualizacją i usuwaniem przy użyciu oddzielnej logiki, oraz upewnić się, że zdarzenia są realizowane w odpowiedniej kolejności. Jest to dokładnie złożoność, która AUTO CDC eliminuje w następnym kroku.
Krok 4. Kompilowanie wymiaru typu SCD 2 za pomocą polecenia AUTO CDC
Ważna
AUTO CDC jest w wersji beta. Wymaga środowiska Databricks Runtime w wersji 17.3 lub nowszej.
Tabela strumieniowa przetwarza dane przyrostowo. Podczas każdego odświeżania odczytuje tylko nowe wiersze od ostatniego uruchomienia, więc nie musi ponownie przetwarzać pełnego zestawu danych. To sprawia, że dobrze nadaje się do dużych lub często zmieniających się źródeł.
AUTO CDC dodaje przetwarzanie przechwytywania zmian danych w górnej części tabeli przesyłania strumieniowego. Zamiast pisać instrukcję MERGE INTO, która ręcznie obsługuje operacje wstawiania, aktualizacji i usuwania, deklarujesz klucz biznesowy i kolumnę sekwencjonowania, a Azure Databricks stosować poprawną logikę.
AUTO CDC automatycznie obsługuje również zdarzenia poza kolejnością, co jest typowym problemem podczas przetwarzania zdarzeń przez MERGE INTO przychodzących z systemów rozproszonych lub obciążeń wsadowych z nakładającymi się znacznikami czasu.
Poniższa instrukcja tworzy tabelę SCD Type 2, która zachowuje pełną historię wersji każdego produktu. Każda wersja otrzymuje __START_AT i __END_AT sygnatury czasowe. Wartość NULL in __END_AT oznacza bieżącą wersję.
CREATE OR REFRESH STREAMING TABLE products_history
SCHEDULE REFRESH EVERY 1 DAY
FLOW AUTO CDC
FROM STREAM products WITH (readChangeFeed = true)
KEYS (product_id)
APPLY AS DELETE WHEN _change_type = 'delete'
SEQUENCE BY _commit_timestamp
COLUMNS * EXCEPT (_change_type, _commit_version, _commit_timestamp)
STORED AS SCD TYPE 2;
-
SCHEDULE REFRESH EVERY 1 DAY: odświeża tabelę zgodnie z harmonogramem dziennym. -
FLOW AUTO CDC: deklaruje to jako przepływ CDC. Azure Databricks automatycznie stosuje semantyka wstawiania, aktualizowania i usuwania. -
KEYS (product_id): klucz biznesowy. Zdarzenia z tym samym kluczem są scalane z wierszami w wersji. -
APPLY AS DELETE WHEN _change_type = 'delete': zamyka bieżącą wersję po nadejściu zdarzenia usuwania. Umożliwia to zdefiniowanie warunku identyfikującego zdarzenie usuwania. -
SEQUENCE BY _commit_timestamp: ustanawia kolejność zdarzeń. Poprawnie obsługuje przyloty poza zamówieniem. -
STORED AS SCD TYPE 2: zachowuje pełną historię.AUTO CDCobsługuje zarówno typ SCD 1, jak i typ SCD 2.
Zapytaj tabelę wymiarów.
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
- Łyżka: dwie wersje. Seattle (zamknięte,
__END_ATustawione) i Los Angeles (bieżące,__END_AT = NULL). - Fork: dwie wersje. Kategoria Sztućców (zamknięta) i Kategoria Jadalni (bieżąca).
- Serwetka i podstawka: po jednej wersji (nowo wstawiona,
__END_AT = NULL). - Wszystkie inne produkty: po jednej wersji każdy (
__END_AT = NULL).
Krok 5. Proces usuwania za pośrednictwem potoku
Teraz symuluj dwa wycofane produkty, usuwając je z tabeli źródłowej:
DELETE FROM products WHERE product_id = 9;
DELETE FROM products WHERE product_id = 10;
Te zdarzenia usuwania są rejestrowane w dzienniku CDF, ale tabela strumieniowa jeszcze ich nie zauważyła. Odśwież tabelę przesyłania strumieniowego, aby przetworzyć nowe zdarzenia:
REFRESH STREAMING TABLE products_history;
Wykonaj zapytanie w tabeli wymiarów, aby sprawdzić, czy usunięcia zostały zastosowane.
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
Bowl i Glass zostały ostatecznie wycofane z __END_AT, oznaczając je jako wycofane. Wszystkie inne bieżące produkty pozostają niezmienione. Tabela przesyłania strumieniowego przetworzyła wyłącznie nowe zdarzenia usuwania, bez ponownego przetwarzania wstawień i aktualizacji z poprzedniego odświeżania.
Krok 6. Tworzenie zagregowanego zmaterializowanego widoku
Teraz, gdy masz tabelę wymiarów, która pozostaje aktualna ze zmianami źródła, możesz dodać warstwę raportowania na górze.
Zmaterializowany widok przechowuje wstępnie obliczone wyniki zapytania jako tabelę fizyczną. W przeciwieństwie do zwykłego widoku, który ponownie wykonuje zapytanie za każdym razem, gdy z niego odczytujesz, zmaterializowany widok utrwala wyniki i tylko ponownie skompiluje wiersze, na które mają wpływ zmiany nadrzędne w każdym odświeżeniu. Dzięki temu jest odpowiedni dla pulpitów nawigacyjnych i raportów, gdzie wydajność zapytań ma znaczenie.
CREATE OR REPLACE MATERIALIZED VIEW products_by_category
SCHEDULE REFRESH EVERY 1 DAY
AS
SELECT
category,
COUNT(*) AS active_products
FROM products_history
WHERE __END_AT IS NULL
GROUP BY category;
SCHEDULE REFRESH EVERY 1 DAY oznacza, że ten widok odświeża się zgodnie z harmonogramem dziennym. W połączeniu z tym samym harmonogramem w tabeli strumieniowej masz teraz trzyetapowy potok, w którym zmiany w tabeli źródłowej są przekazywane kaskadowo przez wymiar i docierają do agregatu w każdym cyklu odświeżania. Nie ma ręcznego odświeżania do uruchomienia.
SELECT * FROM products_by_category ORDER BY active_products DESC;
Krok 7. Weryfikuj kaskadę od końca do końca
Aby sprawdzić pełną kaskadę potoku, wprowadź zmianę w tabeli źródłowej:
UPDATE products SET warehouse = 'Seattle' WHERE product_id = 3;
Nóż przenosi się z Denver do Seattle. Ta pojedyncza zmiana DML wyzwala pełną kaskadę potoku, pokazując, jak trzy etapy współpracują ze sobą:
-
productsrejestruje zdarzenie zmiany za pośrednictwem usługi CDF. -
products_historyprzetwarza zdarzenie i dodaje nową wersję dla aplikacji Knife. -
products_by_categoryprzelicza ponownie tylko dotknięty wiersz Cutlery.
Sprawdź:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
WHERE product_id = 3
ORDER BY __START_AT;
SELECT * FROM products_by_category ORDER BY active_products DESC;
Czyszczenie
Aby wyczyścić zasoby utworzone w tym samouczku, użyj następującego kodu SQL:
DROP MATERIALIZED VIEW IF EXISTS products_by_category;
DROP STREAMING TABLE IF EXISTS products_history;
DROP TABLE IF EXISTS products;