Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Při práci s velkými objemy dat potřebujete kanál, který může zpracovávat pouze nové a změněné záznamy místo opětovného zpracování celé datové sady. Tomu se říká přírůstkové ETL. V Databricks SQL můžete vytvářet přírůstkové kanály ETL pomocí streamovaných tabulek a materializovaných zobrazení, aniž byste museli psát procedurální kód nebo plánovat ruční aktualizace.
Tento kurz vás provede běžným vzorem: sledování změn produktů v průběhu času. Vytvoříte zdrojovou tabulku, zachytíte události změn, vytvoříte tabulku dimenzí, která zachovává úplnou historii každého produktu, a přidáte agregovanou vrstvu sestavování na vrch.
Klíčovou funkcí v tomto kurzu je AUTO CDC. V tradičním skladu byste napsali složité MERGE INTO příkazy pro odsouhlasení vložení, aktualizace a odstranění událostí do cílové tabulky. Tento přístup je náchylný k chybám, zejména když události přicházejí mimo pořadí.
AUTO CDC se o to stará za vás. Deklarujete obchodní klíč, sloupec sekvencování a to, jestli chcete SCD Type 1 (pouze nejnovější hodnota) nebo SCD Type 2 (úplná historie) a Azure Databricks použije správnou logiku sloučení automaticky. Přehled CDC najdete v tématu Automatizovaná rozhraní API pro CDC: Zjednodušení zachytávání změn údajů pomocí kanálů.
Na konci tohoto kurzu budete mít:
- Vytvoření zdrojové tabulky sledující změny pomocí kanálu změn dat.
- Prozkoumali nezpracovaná data změn, abyste porozuměli streamu událostí CDC.
- Použit
AUTO CDCk vytvoření tabulky dimenze typu 2 SCD z těchto událostí. - Zpracování mazacích událostí postupně prostřednictvím pipeline.
- Vytvořili jsme materializované zobrazení, které přírůstkově udržuje agregovanou sestavu.
- Nakonfigurováno
SCHEDULE REFRESH EVERY 1 DAYtak, aby se změny automaticky šířily prostřednictvím potrubí.
Požadavky
K dokončení tohoto kurzu musíte splnit následující požadavky:
- Pracovní prostor Azure Databricks s povoleným katalogem Unity Catalog.
- sklad SQL (bez serveru nebo pro).
- Máte oprávnění k vytvoření výpočetního prostředku nebo přístupu k výpočetnímu prostředku.
- Bezserverové výpočetní prostředky povolené pro váš účet. Viz Funkce s omezenou regionální dostupností.
Krok 1: Nastavení katalogu a schématu
Otevřete editor SQL Databricks a nastavte pracovní katalog a schéma. Musíte mít oprávnění k USE katalogu a schématu, které vyberete:
USE CATALOG <your-catalog>;
USE SCHEMA <your-schema>;
Krok 2: Vytvoření zdrojové tabulky a načtení dat
Vytvořte products tabulku s povoleným Kanálem změn v Delta Lake na Azure Databricks (CDF). CDF je funkce Delta Lake, která zaznamenává každou vložení, aktualizaci a odstranění jako dotazovatelný protokol změn. To se podobá datovému proudu CDC z transakčního zdrojového systému, kromě toho, že změny se zaznamenávají přímo v tabulce Delta místo z externího protokolu. CDF zde použijete pro generování událostí změn, které bude využívat následující kanál.
Vytvořte tabulku a načtěte počáteční záznamy:
CREATE OR REPLACE TABLE products ( product_id INT, product_name STRING, category STRING, warehouse STRING ) TBLPROPERTIES (delta.enableChangeDataFeed = true); INSERT INTO products VALUES (1, 'Spoon', 'Cutlery', 'Seattle'), (2, 'Fork', 'Cutlery', 'Portland'), (3, 'Knife', 'Cutlery', 'Denver'), (4, 'Chair', 'Furniture', 'Austin'), (5, 'Table', 'Furniture', 'Chicago'), (6, 'Lamp', 'Lighting', 'Boston'), (7, 'Mug', 'Kitchenware', 'Seattle'), (8, 'Plate', 'Kitchenware', 'Atlanta'), (9, 'Bowl', 'Kitchenware', 'Dallas'), (10, 'Glass', 'Kitchenware', 'Phoenix');Simulujte upstreamové změny, včetně nových produktů, přesunu skladu a opětovného přiřazení kategorie:
INSERT INTO products VALUES (11, 'Napkin', 'Dining', 'San Francisco'), (12, 'Coaster', 'Dining', 'New York'); UPDATE products SET warehouse = 'Los Angeles' WHERE product_id = 1; UPDATE products SET category = 'Dining' WHERE product_id = 2;
Krok 3: Dotaz na datový tok změn
Před vytvořením následného kanálu pomáhá podívat se na surové události změn, abyste pochopili, co AUTO CDC bude zpracovávat. Funkce table_changes() čte protokol CDF a vrací všechny zachycené operace spolu se sloupci metadat:
SELECT
product_id, product_name, warehouse,
_change_type, _commit_version
FROM table_changes('products', 1)
ORDER BY _commit_version, product_id;
Například Spoon má tři události: insert (Seattle), update_preimage (Seattle) a update_postimage (Los Angeles).
Všimněte si, že logická změna (například přesunutí Spoon do jiného skladu) vytváří více událostí: preimage a postimage. V tradičním skladu byste napsali MERGE příkaz pro odsouhlasení všech těchto událostí do cílové tabulky, zpracování vložení, aktualizací a odstranění pomocí samostatné logiky a zajištění, že se události použijí ve správném pořadí. To je přesně složitost, která AUTO CDC eliminuje v dalším kroku.
Krok 4: Sestavení dimenze TYPU 2 SCD pomocí AUTO CDC
Důležité
AUTO CDC je v beta verzi. Vyžaduje Databricks Runtime 17.3 nebo vyšší.
Streamovaná tabulka zpracovává data přírůstkově. Při každé aktualizaci čte pouze nové řádky od posledního spuštění, takže nemusí znovu zpracovat úplnou datovou sadu. Díky tomu je vhodná pro velké objemy nebo často se měnící zdroje.
AUTO CDC přidá zpracování zachytávání dat změn nad streamovací tabulku. Místo psaní příkazu MERGE INTO, který ručně zpracovává vkládání, aktualizace a odstranění, deklarujete sloupec obchodního klíče a sekvencování a necháte Azure Databricks použít správnou logiku.
AUTO CDC také zpracovává události mimo pořadí automaticky, což je běžný problém při zpracování MERGE INTO událostí přicházejících z distribuovaných systémů nebo dávkových zatížení s překrývajícími se časovými razítky.
Následující příkaz vytvoří tabulku SCD Type 2, která zachovává úplnou historii verzí každého produktu. Každá verze získá __START_AT a __END_AT časové razítko.
NULL A __END_AT označuje aktuální verzi.
CREATE OR REFRESH STREAMING TABLE products_history
SCHEDULE REFRESH EVERY 1 DAY
FLOW AUTO CDC
FROM STREAM products WITH (readChangeFeed = true)
KEYS (product_id)
APPLY AS DELETE WHEN _change_type = 'delete'
SEQUENCE BY _commit_timestamp
COLUMNS * EXCEPT (_change_type, _commit_version, _commit_timestamp)
STORED AS SCD TYPE 2;
-
SCHEDULE REFRESH EVERY 1 DAY: Aktualizuje tabulku podle denního plánu. -
FLOW AUTO CDC: deklaruje tok CDC. Azure Databricks automaticky uplatňuje sémantiku vložení, aktualizace a odstranění. -
KEYS (product_id): obchodní klíč. Události se stejným klíčem jsou sloučeny do verzovaných řádků. -
APPLY AS DELETE WHEN _change_type = 'delete': Zavře aktuální verzi, když přijde událost odstranění. To vám umožní definovat podmínku, která identifikuje událost odstranění. -
SEQUENCE BY _commit_timestamp: vytvoří řazení událostí. Zpracovává doručení mimo objednávku správně. -
STORED AS SCD TYPE 2: uchovává úplnou historii.AUTO CDCpodporuje SCD Type 1 i SCD Type 2.
Dotaz na tabulku dimenzí:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
- Lžička: dvě verze. Seattle (uzavřeno,
__END_ATnastavení) a Los Angeles (aktuální,__END_AT = NULL). - Fork: dvě verze. Kategorie příborů (uzavřená) a Kategorie Stravování (aktuální).
- Ubrousek a tácek: jedna verze každého (nově vložená,
__END_AT = NULL). - Všechny ostatní produkty: každá z nich má jednu verzi (
__END_AT = NULL).
Krok 5: Proces odstranění prostřednictvím kanálu
Nyní simulujte dva ukončené produkty jejich odstraněním ze zdrojové tabulky:
DELETE FROM products WHERE product_id = 9;
DELETE FROM products WHERE product_id = 10;
Tyto události odstranění jsou zaznamenány v protokolu CDF, ale streamovací tabulka je ještě neviděla. Aktualizujte streamovací tabulku, aby zpracovávala nové události:
REFRESH STREAMING TABLE products_history;
Zadejte dotaz na tabulku dimenzí a ověřte, že se použily odstranění:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
Mísa a sklo jsou nyní označeny jako ukončené pomocí __END_AT sady, což je označuje jako vyřazené. Všechny ostatní aktuální produkty zůstávají beze změny. Streamovaná tabulka zpracovala pouze nové události odstranění bez opětovného zpracování vložení a aktualizací z předchozí aktualizace.
Krok 6: Vytvoření agregovaného materializovaného zobrazení
Teď, když máte dimenzionální tabulku, která je aktuální se zdrojovými změnami, můžete přidat vrstvu pro tvorbu reportů.
Materializované zobrazení ukládá předem vypočítané výsledky dotazu jako fyzickou tabulku. Na rozdíl od běžného zobrazení, které dotaz znovu spustí při každém čtení z něj, materializované zobrazení zachová výsledky a přepočítá pouze řádky ovlivněné nadřazenými změnami při každé aktualizaci. Díky tomu se dobře hodí pro řídicí panely a sestavy, kde záleží na výkonu dotazů.
CREATE OR REPLACE MATERIALIZED VIEW products_by_category
SCHEDULE REFRESH EVERY 1 DAY
AS
SELECT
category,
COUNT(*) AS active_products
FROM products_history
WHERE __END_AT IS NULL
GROUP BY category;
SCHEDULE REFRESH EVERY 1 DAY znamená, že toto zobrazení se aktualizuje podle denního plánu. V kombinaci se stejným plánem ve streamovací tabulce teď máte třífázový kanál, kde dochází ke kaskádovým změnám ve zdrojové tabulce přes dimenzi k agregaci v každém aktualizačním cyklu. Neexistuje žádná ruční aktualizace k provedení.
SELECT * FROM products_by_category ORDER BY active_products DESC;
Krok 7: Ověření ucelené kaskády
Pokud chcete ověřit kaskádu celého kanálu, proveďte změnu zdrojové tabulky:
UPDATE products SET warehouse = 'Seattle' WHERE product_id = 3;
Nůž se přesune z Denveru do Seattlu. Tato jediná změna DML aktivuje kaskádu celého pipeline a demonstruje, jak tyto tři etapy spolupracují:
-
productszaznamenává událost změny prostřednictvím CDF. -
products_historyzpracuje událost a přidá novou verzi pro Nůž. -
products_by_categorypřepočítá pouze dotčený řádek Cutlery.
Ověřit:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
WHERE product_id = 3
ORDER BY __START_AT;
SELECT * FROM products_by_category ORDER BY active_products DESC;
Vyčištění
K vyčištění prostředků vytvořených v tomto kurzu použijte následující SQL:
DROP MATERIALIZED VIEW IF EXISTS products_by_category;
DROP STREAMING TABLE IF EXISTS products_history;
DROP TABLE IF EXISTS products;