Referenční informace k jazyku SQL delta Live Tables

Tento článek obsahuje podrobnosti o programovacím rozhraní SQL Delta Live Tables.

  • Informace o rozhraní Python API najdete v referenční dokumentaci jazyka Pythonu delta Live Tables.
  • Další informace o příkazech SQL naleznete v referenční dokumentaci jazyka SQL.

Uživatelem definované funkce Pythonu (UDF) můžete použít v dotazech SQL, ale tyto uživatelem definované funkce musíte definovat v souborech Pythonu, než je budete volat ve zdrojových souborech SQL. Viz uživatelem definované skalární funkce – Python.

Omezení

Klauzule PIVOT není podporována. Operace pivot ve Sparku vyžaduje dychtivé načítání vstupních dat pro výpočet schématu výstupu. Tato funkce není v dynamických tabulkách Delta podporovaná.

Vytvoření materializovaného zobrazení nebo streamované tabulky Delta Live Tables

Stejnou základní syntaxi SQL použijete při deklarování tabulky streamování nebo materializovaného zobrazení (označovaného také jako LIVE TABLE).

Streamované tabulky můžete deklarovat pouze pomocí dotazů, které se čtou proti zdroji streamování. Databricks doporučuje použít automatický zavaděč pro příjem streamovaných souborů z cloudového úložiště objektů. Viz syntaxe SQL automatického zavaděče.

Funkci kolem názvu datové sady musíte zahrnout STREAM() při zadávání jiných tabulek nebo zobrazení v kanálu jako zdroje streamování.

Následující popis syntaxe pro deklarování materializovaných zobrazení a streamovaných tabulek pomocí SQL:

CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  AS select_statement

Vytvoření zobrazení Delta Live Tables

Následující popis syntaxe pro deklarování zobrazení pomocí SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

Syntaxe SQL automatického zavaděče

Následující popis syntaxe pro práci s automatickým zavaděčem v SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

U automatického zavaděče můžete použít podporované možnosti formátování. map() Pomocí funkce můžete metodě předat libovolný počet možnostícloud_files(). Možnosti jsou páry klíč-hodnota, kde klíče a hodnoty jsou řetězce. Podrobnosti o formátech a možnostech podpory najdete v tématu Možnosti formátu souboru.

Příklad: Definování tabulek

Datovou sadu můžete vytvořit čtením z externího zdroje dat nebo datových sad definovaných v kanálu. Pokud chcete číst z interní datové sady, předpřipravené LIVE klíčové slovo na název datové sady. Následující příklad definuje dvě různé datové sady: tabulku s názvem taxi_raw , která přebírá soubor JSON jako vstupní zdroj a tabulku s názvem filtered_data , která přijímá taxi_raw tabulku jako vstup:

CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Příklad: Čtení ze zdroje streamování

Pokud chcete číst data ze zdroje streamování, například automatického zavaděče nebo interní datové sady, definujte STREAMING tabulku:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

Další informace o streamovaných datech najdete v tématu Transformace dat pomocí delta živých tabulek.

Řízení způsobu materializace tabulek

Tabulky také nabízejí další kontrolu nad jejich materializací:

  • Určete, jak se tabulky rozdělují pomocí PARTITIONED BY. K urychlení dotazů můžete použít dělení.
  • Vlastnosti tabulky můžete nastavit pomocí TBLPROPERTIES. Viz Vlastnosti tabulky Delta Live Tables.
  • Nastavte umístění úložiště pomocí LOCATION nastavení. Ve výchozím nastavení se data tabulky ukládají v umístění úložiště kanálu, pokud LOCATION nejsou nastavená.
  • V definici schématu můžete použít vygenerované sloupce . Viz příklad: Zadejte schéma a sloupce oddílů.

Poznámka:

U tabulek, které mají velikost menší než 1 TB, databricks doporučuje řídit organizaci dat Delta Live Tables. Pokud neočekáváte, že tabulka roste nad rámec terabajtu, obecně byste neměli zadávat sloupce oddílů.

Příklad: Zadání schématu a sloupců oddílů

Volitelně můžete zadat schéma při definování tabulky. Následující příklad určuje schéma cílové tabulky, včetně použití vygenerovaných sloupců Delta Lake a definování sloupců oddílů pro tabulku:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Delta Live Tables ve výchozím nastavení odvodí schéma z table definice, pokud nezadáte schéma.

Příklad: Definování omezení tabulky

Poznámka:

Podpora tabulek Delta Live Tables je ve verzi Public Preview. Pokud chcete definovat omezení tabulek, musí být kanál s podporou katalogu Unity a nakonfigurovaný tak, aby používal kanál preview .

Při zadávání schématu můžete definovat primární a cizí klíče. Omezení jsou informativní a nevynucují se. Následující příklad definuje tabulku s omezením primárního a cizího klíče:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Nastavení konfiguračních hodnot pro tabulku nebo zobrazení

Slouží SET k zadání konfigurační hodnoty pro tabulku nebo zobrazení, včetně konfigurací Sparku. Libovolná tabulka nebo zobrazení, které definujete v poznámkovém bloku po SET příkazu, má přístup k definované hodnotě. Všechny konfigurace Sparku zadané pomocí příkazu SET se použijí při provádění dotazu Sparku pro libovolnou tabulku nebo zobrazení za příkazem SET. Ke čtení konfigurační hodnoty v dotazu použijte syntaxi ${}interpolace řetězců . Následující příklad nastaví hodnotu konfigurace Sparku s názvem startDate a použije ji v dotazu:

SET startDate='2020-01-01';

CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Pokud chcete zadat více hodnot konfigurace, použijte pro každou hodnotu samostatný SET příkaz.

Vlastnosti SQL

CREATE TABLE or VIEW
TEMPORARY

Vytvořte tabulku, ale nepublikujte metadata tabulky. Klauzule TEMPORARY dává Delta Live Tables pokyn k vytvoření tabulky, která je dostupná pro kanál, ale neměla by být přístupná mimo kanál. Aby se zkrátila doba zpracování, dočasná tabulka se zachová po celou dobu životnosti kanálu, který ho vytvoří, a ne jenom jednu aktualizaci.
STREAMING

Vytvořte tabulku, která čte vstupní datovou sadu jako datový proud. Vstupní datová sada musí být streamovaným zdrojem dat, například automatický zavaděč nebo STREAMING tabulka.
PARTITIONED BY

Volitelný seznam jednoho nebo více sloupců, které se mají použít k dělení tabulky.
LOCATION

Volitelné umístění úložiště pro data tabulky. Pokud není nastavená, systém ve výchozím nastavení nastaví umístění úložiště kanálu.
COMMENT

Volitelný popis tabulky.
column_constraint

Volitelné omezení primárního nebo cizího klíče ve sloupci.
table_constraint

Volitelné omezení primárního nebo cizího klíče v tabulce.
TBLPROPERTIES

Volitelný seznam vlastností tabulky pro tabulku.
select_statement

Dotaz Delta Live Tables, který definuje datovou sadu pro tabulku.
Klauzule CONSTRAINT
EXPECT expectation_name

Definujte omezení expectation_namekvality dat . Pokud ON VIOLATION není definováno omezení, přidejte řádky, které porušují omezení cílové datové sady.
ON VIOLATION

Nepovinná akce, která se má provést pro neúspěšné řádky:

* FAIL UPDATE: Okamžitě zastavte spuštění kanálu.
* DROP ROW: Zahoďte záznam a pokračujte ve zpracování.

Změna zachytávání dat pomocí SQL v rozdílových živých tabulkách

APPLY CHANGES INTO Příkaz použijte k používání funkcí CDC delta live tables, jak je popsáno v následujících příkladech:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Omezení kvality dat pro APPLY CHANGES cíl definujete pomocí stejné CONSTRAINT klauzule jako dotazyAPPLY CHANGES . Viz Správa kvality dat pomocí rozdílových živých tabulek.

Poznámka:

Výchozí chování INSERT a UPDATE události slouží k přenesení událostí CDC ze zdroje: aktualizujte všechny řádky v cílové tabulce, které odpovídají zadaným klíčům, nebo vložte nový řádek, pokud v cílové tabulce neexistuje odpovídající záznam. Zpracování událostí DELETE lze zadat pomocí APPLY AS DELETE WHEN podmínky.

Důležité

Pokud chcete použít změny, musíte deklarovat cílovou streamovací tabulku. Volitelně můžete zadat schéma cílové tabulky. Při zadávání schématu APPLY CHANGES cílové tabulky musíte zahrnout __START_AT__END_AT i sloupce se stejným datovým typem jako sequence_by pole.

Viz APPLY CHANGES API: Zjednodušení zachytávání dat změn v rozdílových živých tabulkách.

Klauzule
KEYS

Sloupec nebo kombinace sloupců, které jednoznačně identifikují řádek ve zdrojových datech. Slouží k identifikaci událostí CDC, které se vztahují na konkrétní záznamy v cílové tabulce.

Tato klauzule je povinná.
IGNORE NULL UPDATES

Povolit ingestování aktualizací obsahujících podmnožinu cílových sloupců Pokud se událost CDC shoduje s existujícím řádkem a je zadána možnost IGNOROVAT AKTUALIZACE s hodnotou NULL, sloupce se null zachovají své stávající hodnoty v cíli. To platí také pro vnořené sloupce s hodnotou null.

Tato klauzule je nepovinná.

Výchozí hodnotou je přepsání existujících sloupců null hodnotami.
APPLY AS DELETE WHEN

Určuje, kdy se má událost CDC považovat za událost DELETE , nikoli jako upsert. Aby bylo možné zpracovat data mimo pořadí, odstraněný řádek se dočasně zachová jako náhrobek v podkladové tabulce Delta a v metastoru se vytvoří zobrazení, které vyfiltruje tyto náhrobky. Interval uchovávání informací je možné nakonfigurovat pomocí
pipelines.cdc.tombstoneGCThresholdInSecondsvlastnost table.

Tato klauzule je nepovinná.
APPLY AS TRUNCATE WHEN

Určuje, kdy má být událost CDC považována za úplnou tabulku TRUNCATE. Vzhledem k tomu, že tato klauzule aktivuje úplné zkrácení cílové tabulky, měla by být použita pouze pro konkrétní případy použití vyžadující tuto funkci.

Klauzule APPLY AS TRUNCATE WHEN je podporována pouze pro SCD typu 1. ScD typu 2 nepodporuje zkrácení.

Tato klauzule je nepovinná.
SEQUENCE BY

Název sloupce určující logické pořadí událostí CDC ve zdrojových datech. Delta Live Tables používá toto sekvencování ke zpracování událostí změn, které přicházejí mimo pořadí.

Tato klauzule je povinná.
COLUMNS

Určuje podmnožinu sloupců, které se mají zahrnout do cílové tabulky. Máte tyto možnosti:

* Zadejte úplný seznam sloupců, které mají být zahrnuty: COLUMNS (userId, name, city).
* Zadejte seznam sloupců, které chcete vyloučit: COLUMNS * EXCEPT (operation, sequenceNum)

Tato klauzule je nepovinná.

Výchozí hodnota je zahrnout všechny sloupce v cílové tabulce, pokud COLUMNS není klauzule zadána.
STORED AS

Určuje, zda se mají ukládat záznamy jako SCD typu 1 nebo SCD typu 2.

Tato klauzule je nepovinná.

Výchozí hodnota je SCD typu 1.
TRACK HISTORY ON

Určuje podmnožinu výstupních sloupců pro generování záznamů historie, pokud dojde k nějakým změnám těchto zadaných sloupců. Máte tyto možnosti:

* Zadejte úplný seznam sloupců, které chcete sledovat: COLUMNS (userId, name, city).
* Zadejte seznam sloupců, které mají být vyloučeny ze sledování: COLUMNS * EXCEPT (operation, sequenceNum)

Tato klauzule je nepovinná. Výchozí hodnota je sledovat historii všech výstupních sloupců, pokud dojde ke změnám, které jsou ekvivalentní TRACK HISTORY ON *.