Sdílet prostřednictvím


CREATE STREAMING TABLE

Platí pro:zaškrtnuto ano Databricks SQL

Vytvoří streamovací tabulku, tabulku Delta s dodatečnou podporou streamování nebo přírůstkového zpracování dat.

Streamované tabulky jsou podporovány pouze v deklarativních kanálech Lakeflow Spark a v Databricks SQL s Unity Catalog. Spuštěním tohoto příkazu na podporovaných výpočetních prostředcích Databricks Runtime se analyzuje pouze syntaxe. Viz Vývoj kódu deklarativních kanálů Sparku Lakeflow pomocí SQL.

Syntaxe

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    COMMENT table_comment |
    DEFAULT COLLATION UTF8_BINARY |
    TBLPROPERTIES clause |
    schedule |
    WITH { ROW FILTER clause } } [...]

schedule
  { SCHEDULE [ REFRESH ] schedule_clause |
    TRIGGER ON UPDATE [ AT MOST EVERY trigger_interval ] }

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ]}

Parametry

  • REFRESH

    Pokud je zadáno, aktualizuje tabulku nejnovějšími daty dostupnými ze zdrojů definovaných v dotazu. Pouze nová data, která přicházejí před zahájením dotazu, budou zpracována. Nová data, která se přidají do zdrojů během provádění příkazu, se ignorují až do další aktualizace. Operace aktualizace z příkazu CREATE OR REFRESH je plně deklarativní. Pokud příkaz aktualizace nezadá všechna metadata z původního příkazu pro vytvoření tabulky, odstraní se nezadaná metadata.

  • POKUD NEEXISTUJE

    Vytvoří tabulku streamování, pokud neexistuje. Pokud tabulka podle tohoto názvu již existuje, příkaz CREATE STREAMING TABLE bude ignorován.

    Můžete zadat nejvýše jednu z možností IF NOT EXISTS nebo OR REFRESH.

  • table_name

    Název tabulky, kterou chcete vytvořit. Název nesmí obsahovat dočasnou specifikaci ani specifikaci možností. Pokud název není kvalifikovaný, vytvoří se tabulka v aktuálním schématu.

  • specifikace_tabulek

    Tato volitelná klauzule definuje seznam sloupců, jejich typů, vlastností, popisů a omezení sloupců.

    Pokud nedefinujete sloupce ve schématu tabulky, je nutné zadat AS query.

    • column_identifier

      Jedinečný název sloupce

      • column_type

        Určuje datový typ sloupce.

      • NOT NULL

        Pokud je zadaný, sloupec nepřijímá hodnoty NULL.

      • COLUMN_COMMENT komentář

        Textový řetězec, který popisuje sloupec.

      • column_constraint

        Důležité

        Tato funkce je ve verzi Public Preview.

        Přidá omezení primárního klíče nebo cizího klíče ke sloupci ve streamované tabulce. Omezení nejsou podporována pro tabulky v katalogu hive_metastore.

      • Klauzule MASKA

        Přidá funkci masky sloupce pro anonymizaci citlivých dat. Všechny následné dotazy z tohoto sloupce obdrží výsledek vyhodnocení této funkce místo původní hodnoty sloupce. To může být užitečné pro jemné řízení přístupu, kde může funkce zkontrolovat identitu nebo členství ve skupinách volajícího uživatele a rozhodnout se, zda se má hodnota redigovat.

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ PŘI PORUŠENÍ { SELHAT UPDATE | SMAZAT ŘÁDEK } ]

        Přidá do tabulky očekávání kvality dat. Tato očekávání kvality dat je možné sledovat v průběhu času a k nim přistupovat prostřednictvím protokolu událostí streamované tabulky. Očekávání FAIL UPDATE způsobí selhání zpracování při tvorbě a aktualizaci tabulky. Očekávání DROP ROW způsobí, že pokud se očekávání nesplní, celý řádek bude vynechán.

        expectation_expr se můžou skládat z literálů, identifikátorů sloupců v tabulce a deterministických integrovaných funkcí nebo operátorů SQL s výjimkou:

        Také expr nesmí obsahovat žádný poddotaz.

      • omezení tabulky

        Důležité

        Tato funkce je ve verzi Public Preview.

        Přidá omezení informačního primárního klíče nebo informačního cizího klíče do streamovací tabulky. U tabulek v katalogu hive_metastore nejsou podporována klíčová omezení.

  • tabulka_podmínky

    Volitelně můžete zadat dělení, komentáře, uživatelem definované vlastnosti a plán aktualizace nové tabulky. Každou dílčí klauzuli lze zadat pouze jednou.

    • DĚLENO PODLE

      Volitelný seznam sloupců tabulky, podle kterých chcete tabulku rozdělit.

      Poznámka:

      Liquid Clustering poskytuje flexibilní a optimalizované řešení pro kategorizaci. Zvažte použití CLUSTER BY místo PARTITIONED BY pro tabulky streamování.

    • CLUSTER BY

      Volitelná klauzule pro shlukování podle podmnožiny sloupců. Pomocí automatického clusteringu liquid s CLUSTER BY AUTOa Databricks inteligentně vybírá klíče clusteringu pro optimalizaci výkonu dotazů. Viz Použití metody 'liquid clustering' pro tabulky.

      Kapalinové shlukování nelze kombinovat s PARTITIONED BY.

    • KOMENTÁŘ table_comment

      Literál, který popisuje tabulku, je STRING.

    • VÝCHOZÍ KOLACE UTF8_BINARY

      Platí pro:check označený jako ano Kontrola SQL Databricks označená jako ano Databricks Runtime 17.1 a vyšší

      Vynutí výchozí řazení streamovací tabulky na UTF8_BINARY. Tato klauzule je povinná, pokud schéma, ve kterém je tabulka vytvořena, má jinou výchozí kolaci než UTF8_BINARY. Výchozí kolace streamované tabulky se používá jako výchozí kolace v rámci query a pro typy sloupců.

    • TBLPROPERTIES

      Volitelně nastaví jednu nebo více uživatelem definovaných vlastností.

      Toto nastavení použijte k určení kanálu modulu runtime Deklarativní pipelines Lakeflow Spark, který se používá ke spuštění tohoto příkazu. Nastavte hodnotu vlastnosti pipelines.channel na "PREVIEW" nebo "CURRENT". Výchozí hodnota je "CURRENT". Další informace o kanálech deklarativních kanálů Spark Sparku najdete v tématu Kanály runtime deklarativních kanálů Spark Sparku.

    • rozvrh

      Plán může být buď příkaz, SCHEDULE nebo TRIGGER příkaz.

      • HARMONOGRAM [ REFRESH ] příkaz_plánu

        • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

          Pokud chcete naplánovat aktualizaci, která se pravidelně provádí, použijte EVERY syntaxi. Pokud je zadaná syntaxe EVERY, aktualizuje se streamovaná tabulka nebo materializované zobrazení pravidelně v zadaném intervalu na základě zadané hodnoty, například HOUR, HOURS, DAY, DAYS, WEEKnebo WEEKS. V následující tabulce jsou uvedeny celočíselné hodnoty pro number.

          Časová jednotka Celočíselná hodnota
          HOUR or HOURS 1 <= H <= 72
          DAY or DAYS 1 <= D <= 31
          WEEK or WEEKS 1 < = W < = 8

          Poznámka:

          Jednotné a množné číslo zahrnuté časové jednotky jsou sémanticky ekvivalentní.

        • CRON cron_string [ AT TIME ZONE timezone_id ]

          Chcete-li naplánovat aktualizaci pomocí hodnoty quartz cron. Platné time_zone_values jsou přijímány. AT TIME ZONE LOCAL není podporováno.

          Jestliže AT TIME ZONE chybí, použije se časové pásmo relace. Pokud AT TIME ZONE chybí a časové pásmo relace není nastavené, je vyvolána chyba. SCHEDULE je sémanticky ekvivalentní SCHEDULE REFRESH.

        Plán lze zadat jako součást CREATE příkazu. Pomocí ALTER STREAMING TABLE nebo spuštěním příkazu CREATE OR REFRESH s klauzulí SCHEDULE můžete po vytvoření změnit plán tabulky streamování.

      • TRIGGER ON UPDATE [ NEJČASTĚJI KAŽDÝ trigger_interval ]

        Důležité

        Funkce TRIGGER ON UPDATE je v beta verzi.

        Volitelně můžete tabulku nastavit tak, aby se aktualizovala, když se aktualizuje nadřazený zdroj dat, a to maximálně jednou za minutu. Nastavte hodnotu tak AT MOST EVERY , aby vyžadovala alespoň minimální čas mezi aktualizacemi.

        Nadřazené zdroje dat musí být externí nebo spravované tabulky Delta (včetně materializovaných zobrazení nebo streamovaných tabulek) nebo spravovaná zobrazení, jejichž závislosti jsou omezené na podporované typy tabulek.

        Povolení událostí souborů může zvýšit výkon triggerů a zvýšit některé limity pro aktualizace triggerů.

        Jedná se trigger_interval o příkaz INTERVAL , který je alespoň 1 minuta.

        TRIGGER ON UPDATE má následující omezení

        • Maximálně 10 nadřazených zdrojů dat na tabulku streamování při použití TRIGGER ON UPDATE.
        • Pomocí triggeru ON UPDATElze zadat maximálně 1000 streamovaných tabulek nebo materializovaných zobrazení.
        • Výchozí AT MOST EVERY hodnota klauzule je 1 minuta a nesmí být menší než 1 minuta.
  • Klauzule WITH ROW FILTER

    Přidá do tabulky funkci filtru řádků. Všechny následné dotazy z této tabulky obdrží podmnožinu řádků, ve kterých se funkce vyhodnotí jako PRAVDA. To může být užitečné pro podrobné řízení přístupu, kdy může funkce zkontrolovat identitu nebo členství ve skupinách vyvolávajícího uživatele a rozhodnout se, jestli se mají určité řádky filtrovat.

  • dotaz

    Tato klauzule naplní tabulku pomocí dat z query. Tento dotaz musí být streamovací. Toho lze dosáhnout přidáním klíčového STREAM slova do libovolného vztahu, který chcete zpracovat přírůstkově. Když zadáte query a table_specification dohromady, schéma tabulky zadané v table_specification musí obsahovat všechny sloupce vrácené query, jinak se zobrazí chyba. Všechny sloupce uvedené v table_specification, které nejsou vráceny query, vracejí null hodnoty při dotazu.

Rozdíly mezi streamovanými tabulkami a jinými tabulkami

Streamované tabulky jsou stavové tabulky navržené tak, aby zpracovávaly každý řádek pouze jednou při zpracování rostoucí datové sady. Vzhledem k tomu, že většina datových sad v průběhu času roste, jsou streamované tabulky vhodné pro většinu úloh příjmu dat. Tabulky streamování jsou optimální pro kanály, které vyžadují aktuálnost dat a nízkou latenci. Streamované tabulky můžou být také užitečné pro masivní transformace škálování, protože výsledky se dají postupně vypočítat při příchodu nových dat, přičemž výsledky budou aktuální, aniž by bylo nutné plně překompilovat všechna zdrojová data s každou aktualizací. Streamovací tabulky jsou navrženy pro datové zdroje, které umožňují pouze přidávání nových dat.

Streamované tabulky přijímají další příkazy, jako je REFRESH, které zpracovávají nejnovější data dostupná ve zdrojích poskytovaných v dotazu. Změny zadaného dotazu se projeví pouze u nových dat voláním REFRESH, nikoli dříve zpracovaných dat. Pokud chcete změny použít i u existujících dat, musíte spustit REFRESH TABLE <table_name> FULL, abyste provedli FULL REFRESH. Úplné aktualizace znovu zpracovávají všechna data dostupná ve zdroji s nejnovější definicí. Nedoporučuje se volat úplné aktualizace zdrojů, které nezachovají celou historii dat nebo mají krátké doby uchovávání, například Kafka, protože úplná aktualizace zkracuje stávající data. Pokud už data nejsou ve zdroji dostupná, možná nebudete moct obnovit stará data.

Filtry řádků a masky sloupců

Filtry řádků umožňují zadat funkci, která se použije jako filtr při každém načtení řádků v tabulce. Tyto filtry zajišťují, aby následné dotazy vracely pouze řádky, pro které se predikát filtru vyhodnotí jako true.

Masky sloupců umožňují maskovat hodnoty sloupce pokaždé, když tabulka načte řádky. Všechny budoucí dotazy týkající se tohoto sloupce obdrží výsledek vyhodnocení funkce nad sloupcem a nahrazení původní hodnoty sloupce.

Další informace o tom, jak používat filtry řádků a masky sloupců, najdete v tématu Filtry řádků a masky sloupců.

Správa filtrů řádků a masek sloupců

Filtry řádků a masky sloupců u streamovaných tabulek by měly být přidány, aktualizovány nebo vynechány prostřednictvím příkazu CREATE OR REFRESH.

Chování

  • Refresh as Definer: Když příkazy CREATE OR REFRESH a REFRESH aktualizují streamovací tabulku, funkce filtru řádků se spustí s právy definovatele (jako vlastník tabulky). To znamená, že aktualizace tabulky používá kontext zabezpečení uživatele, který vytvořil streamovací tabulku.
  • Dotaz: Zatímco většina filtrů běží s právy defineru, funkce, které kontrolují kontext uživatele (například CURRENT_USER a IS_MEMBER) jsou výjimky. Tyto funkce se spouští jako spouštěč. Tento přístup vynucuje zabezpečení dat a řízení přístupu specifické pro uživatele na základě kontextu aktuálního uživatele.

Pozorovatelnost

Pomocí DESCRIBE EXTENDED, INFORMATION_SCHEMAnebo Průzkumníka katalogu můžete prozkoumat existující filtry řádků a masky sloupců, které platí pro danou streamovací tabulku. Tato funkce umožňuje uživatelům auditovat a kontrolovat přístup k datům a míry ochrany u streamovaných tabulek.

Omezení

  • Nejnovější data můžou získat jenom vlastníci tabulek, kteří můžou aktualizovat streamované tabulky.
  • ALTER TABLE příkazy jsou u streamovaných tabulek zakázány. Definice a vlastnosti tabulky by se měly změnit prostřednictvím příkazu CREATE OR REFRESH nebo ALTER STREAMING TABLE.
  • Vývoj schématu tabulky pomocí příkazů DML, jako je INSERT INTO, a MERGE se nepodporuje.
  • U streamovaných tabulek se nepodporují následující příkazy:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Sdílení Delta není podporováno.
  • Přejmenování tabulky nebo změna vlastníka se nepodporuje.
  • Omezení tabulek, jako PRIMARY KEY a FOREIGN KEY, nejsou podporována pro streamované tabulky v katalogu hive_metastore.
  • Vygenerované sloupce, sloupce identit a výchozí sloupce se nepodporují.

Příklady

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Creates a streaming table that scheduled to refresh when upstream data is updated.
-- The refresh frequency of triggered_data is at most once an hour.
> CREATE STREAMING TABLE triggered_data
  TRIGGER ON UPDATE AT MOST EVERY INTERVAL 1 hour
  AS SELECT *
  FROM STREAM source_stream_data;

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE EVERY 1 HOUR
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM STREAM sales;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')