Sdílet prostřednictvím


Vývoj kódu deklarativních kanálů Sparku Lakeflow pomocí SQL

Deklarativní kanály Sparku Lakeflow (SDP) zavádí několik nových klíčových slov a funkcí SQL pro definování materializovaných zobrazení a streamovaných tabulek v kanálech. Podpora SQL pro vývoj kanálů vychází ze základů Spark SQL a přidává podporu funkcí strukturovaného streamování.

Uživatelé, kteří znají datové rámce PySpark, můžou preferovat vývoj kódu kanálu pomocí Pythonu. Python podporuje rozsáhlejší testování a operace, které jsou náročné na implementaci s SQL, jako jsou operace metaprogramování. Viz Vývoj kódu kanálu pomocíPythonu .

Úplnou referenci syntaxe Pipeline SQL najdete v referenční dokumentaci jazyka Pipeline SQL.

Základy SQL pro vývoj datových kanálů

Kód SQL, který vytváří datové sady kanálu, používá CREATE OR REFRESH syntaxi k definování materializovaných zobrazení a streamovaných tabulek proti výsledkům dotazu.

Klíčové slovo STREAM označuje, jestli má být zdroj dat odkazovaný v klauzuli SELECT přečten sémantikou streamování.

Ve výchozím nastavení se čtení a zápisy provádějí do katalogu a schématu zadaného během konfigurace kanálu. Viz Nastavení cílového katalogu a schématu.

Zdrojový kód kanálu se zásadně liší od skriptů SQL: SDP vyhodnocuje všechny definice datových sad ve všech souborech zdrojového kódu nakonfigurovaných v kanálu a před spuštěním jakýchkoli dotazů sestaví graf toku dat. Pořadí dotazů zobrazených ve zdrojových souborech definuje pořadí vyhodnocení kódu, ale ne pořadí provádění dotazů.

Vytvoření materializovaného zobrazení pomocí SQL

Následující příklad kódu ukazuje základní syntaxi pro vytvoření materializovaného zobrazení pomocí SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Vytvoření streamované tabulky pomocí SQL

Následující příklad kódu ukazuje základní syntaxi pro vytvoření streamovací tabulky pomocí SQL. Při čtení zdroje pro streamovací tabulku klíčové slovo STREAM označuje, že se pro zdroj používá sémantika streamování. Nepoužívejte klíčové slovo STREAM při vytváření materializovaného zobrazení.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Poznámka:

Pomocí klíčového slova STREAM můžete ke čtení ze zdroje použít sémantiku streamování. Pokud čtení narazí na změnu nebo odstranění existujícího záznamu, vyvolá se chyba. Je nejbezpečnější číst ze statických nebo doplňovacích zdrojů. K ingestci dat, která mají commity změn, můžete použít Python a možnost SkipChangeCommits pro zpracování chyb.

Načtení dat z úložiště objektů

Kanály podporují načítání dat ze všech formátů podporovaných službou Azure Databricks. Viz Možnosti formátu dat.

Poznámka:

Tyto příklady používají data dostupná v /databricks-datasets automaticky připojená k vašemu pracovnímu prostoru. Databricks doporučuje používat cesty svazků nebo cloudové identifikátory URI k odkazování na data uložená v cloudovém úložišti objektů. Viz Co jsou svazky katalogu Unity?.

Databricks doporučuje používat Auto Loader a streamovací tabulky při konfiguraci úloh pro inkrementální příjem dat z dat uložených v cloudovém objektovém úložišti. Podívejte se na Co je to Auto Loader?

SQL používá funkci read_files k aktivaci funkce Auto Loader. Musíte také použít klíčové slovo STREAM ke konfiguraci streamovaného čtení pomocí read_files.

Následující popisuje syntaxi pro read_files v SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM STREAM read_files(
    "<file-path>",
    [<option-key> => <option_value>, ...]
  )

Možnosti automatického zavaděče jsou páry klíčů a hodnot. Podrobnosti o podporovaných formátech a možnostech najdete v tématu Možnosti.

Následující příklad ukazuje, jak vytvořit streamovací tabulku ze souborů JSON pomocí Auto Loaderu:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
  "/databricks-datasets/retail-org/sales_orders",
  format => "json");

Funkce read_files také podporuje dávkovou sémantiku pro vytváření materializovaných zobrazení. Následující příklad používá sémantiku dávky ke čtení adresáře JSON a vytvoření materializovaného zobrazení:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
  "/databricks-datasets/retail-org/sales_orders",
  format => "json");

Ověření dat s očekáváními

Pomocí očekávání můžete nastavit a vynutit omezení kvality dat. Viz Spravujte kvalitu dat pomocí požadavků na datový potrubí.

Následující kód definuje očekávanou pojmenovanou valid_data, která během příjmu dat zahodí záznamy, které mají hodnotu null:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Provádění dotazů na materializovaná zobrazení a streamované tabulky definované v datovém potrubí

Následující příklad definuje čtyři datové sady:

  • Streamovaná tabulka s názvem orders, která načítá data JSON.
  • Materializované zobrazení s názvem customers, které načte data CSV.
  • Materializované zobrazení s názvem customer_orders, které spojuje záznamy z datových sad orders a customers, přetypuje časové razítko objednávky na datum a vybere pole customer_id, order_number, statea order_date.
  • Materializované zobrazení s názvem daily_orders_by_state, které agreguje denní počet objednávek pro každý stav.

Poznámka:

Při dotazování zobrazení nebo tabulek v kanálu můžete přímo zadat katalog a schéma nebo můžete použít výchozí hodnoty nakonfigurované v kanálu. V tomto příkladu se tabulky orders, customersa customer_orders zapisují a čtou z výchozího katalogu a schématu nakonfigurovaného pro váš kanál.

Starší způsob publikace používá schéma LIVE k dotazování jiných materializovaných zobrazení a streamovaných tabulek definovaných ve vašem datovém toku. V nových potrubích je syntaxe schématu LIVE ignorována bez upozornění. Viz LIVE schema (starší verze).

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;

Definování privátní tabulky

Klauzuli PRIVATE můžete použít při vytváření materializovaného zobrazení nebo tabulky streamování. Při vytváření privátní tabulky vytvoříte tabulku, ale nevytvoříte metadata tabulky. Klauzule PRIVATE dává SDP pokyn k vytvoření tabulky, která je dostupná pro kanál, ale neměla by být přístupná mimo kanál. Aby se zkrátila doba zpracování, soukromá tabulka přetrvává po celou dobu životnosti kanálu, který ji vytvoří, a netýká se to jen jedné samotné aktualizace.

Soukromé tabulky můžou mít stejný název jako tabulky v katalogu. Pokud zadáte nekvalifikovaný název tabulky v rámci kanálu a existuje jak soukromá tabulka, tak tabulka katalogu s tímto názvem, použije se soukromá tabulka.

Soukromé tabulky byly dříve označovány jako dočasné tabulky.

Trvalé odstranění záznamů z materializovaného zobrazení nebo tabulky streamování

Pokud chcete trvale odstranit záznamy ze streamované tabulky s povolenými vektory odstranění, jako je například dodržování gdpr, je nutné provést další operace u podkladových tabulek Delta objektu. Pokud chcete zajistit odstranění záznamů z tabulky streamování, přečtěte si téma Trvalé odstranění záznamů z tabulky streamování.

Materializovaná zobrazení vždy odrážejí data v podkladových tabulkách při jejich aktualizaci. Pokud chcete odstranit data v materializovaném zobrazení, musíte odstranit data ze zdroje a aktualizovat materializované zobrazení.

Parametrizace hodnot použitých při deklarování tabulek nebo zobrazení pomocí SQL

Pomocí SET můžete zadat hodnotu konfigurace v dotazu, který deklaruje tabulku nebo zobrazení, včetně konfigurací Sparku. Libovolná tabulka nebo zobrazení, které definujete ve zdrojovém souboru po příkazu SET, má přístup k definované hodnotě. Všechny konfigurace Sparku zadané pomocí příkazu SET se použijí při provádění dotazu Spark pro libovolnou tabulku nebo zobrazení za příkazem SET. Ke čtení konfigurační hodnoty v dotazu použijte syntaxi interpolace řetězců ${}. Následující příklad nastaví konfigurační hodnotu Sparku s názvem startDate a použije ji v dotazu:

SET startDate='2025-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Pokud chcete zadat více hodnot konfigurace, použijte pro každou hodnotu samostatný příkaz SET.

Omezení

Klauzule PIVOT není podporována. Operace pivot ve Sparku vyžaduje dychtivé načítání vstupních dat pro výpočet výstupního schématu. Tato funkce není v kanálech podporovaná.

Poznámka:

Syntaxe CREATE OR REFRESH LIVE TABLE pro vytvoření materializovaného zobrazení je zastaralá. Místo toho použijte CREATE OR REFRESH MATERIALIZED VIEW.