CREATE STREAMING TABLE (csővezetékek)

A streamelő táblák olyan táblák, amely támogatja a streamelést vagy a növekményes adatfeldolgozást. A streamelési táblákat csővezetékek támasztják alá. A streamelési táblák minden frissítésekor a forrástáblákhoz hozzáadott adatok hozzá lesznek fűzve a streamelési táblához. A streamelési táblákat manuálisan vagy ütemezés szerint frissítheti.

Ha többet szeretne megtudni a frissítések végrehajtásáról vagy ütemezéséről, olvassa el a Folyamatfrissítés futtatása című témakört.

Szemantika

CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ {flow_clause | AS query} ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ column_constraint ] [, ...]
    [ , table_constraint ] [...] )

   column_properties
      { NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]

table_clauses
  { USING DELTA
    PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    LOCATION path |
    COMMENT view_comment |
    TBLPROPERTIES clause |
    WITH { ROW FILTER clause } } [ ... ]
   } [ ... ]

flow_clause
  FLOW { { INSERT [ONCE] BY NAME query } |
  { AUTO CDC auto_cdc_flow_spec } }

Paraméterek

  • REFRESH

    Ha meg van adva, létrehozza a táblát, vagy frissít egy meglévő táblát és annak tartalmát.

  • PRIVÁT

    Létrehoz egy privát streamelési táblát.

    • Ezek nem lettek hozzáadva a katalógushoz, és csak a definiáló folyamaton belül érhetők el
    • Ugyanazzal a névvel rendelkezhetnek, mint egy meglévő objektum a katalógusban. A csővezetéken belül, ha egy privát streamelési tábla és egy katalógusbeli objektum neve megegyezik, a névre mutató hivatkozások a privát streamelési táblára kerülnek feloldásra.
    • A privát streamelési táblák csak a folyamat teljes élettartama alatt vannak megőrzve, nem csak egyetlen frissítéssel.

    A privát streamelési táblákat korábban a TEMPORARY paraméterrel hozták létre.

  • table_name

    Az újonnan létrehozott tábla neve. A teljes táblanévnek egyedinek kell lennie.

  • táblázat_specifikáció

    Ez az opcionális záradék határozza meg az oszlopok listáját, azok típusait, tulajdonságait, leírását és oszlopkorlátjait.

    • column_identifier

      Az oszlopneveknek egyedinek kell lenniük, és le kell képezniük a lekérdezés kimeneti oszlopait.

    • oszlop_típus

      Megadja az oszlop adattípusát. Nem minden adattípust, amelyet az Azure Databricks támogat, támogatnak a streamelési táblák.

    • column_comment

      Egy tetszőleges STRING literál, amely az oszlopot írja le. Ezt a beállítást a beállítással column_typeegyütt kell megadni. Ha az oszloptípus nincs megadva, a program kihagyja az oszlop megjegyzését.

    • column_constraint

      Kényszert ad hozzá, amely érvényesíti az adatokat, amint a táblába kerülnek. Lásd: Adatminőség kezelése folyamatelvárásokkal.

    • Maszk záradék

      Olyan oszlopmaszk funkciót ad hozzá, amely lehetővé teszi a bizalmas adatok anonimizálását.

      Lásd: Sorszűrők és oszlopmaszkok.

  • tábla_korlátozás

    Séma megadásakor megadhatja az elsődleges és az idegen kulcsokat. A korlátozások tájékoztató jellegűek, és nincsenek kényszerítve. Tekintse meg a CONSTRAINT záradékot az SQL nyelvi hivatkozásában.

    Megjegyzés:

    A táblakorlátozások meghatározásához a folyamatnak Unity Catalog-kompatibilis folyamatnak kell lennie.

  • táblázat_feltételek

    Opcionálisan megadhatja a tábla particionálási, megjegyzési és felhasználó által definiált tulajdonságait. Minden al záradék csak egyszer adható meg.

    • A DELTA HASZNÁLATA

      Megadja az adatformátumot. Az egyetlen lehetőség a DELTA.

      Ez a záradék opcionális, és alapértelmezés szerint a DELTA értéket veszi fel.

    • PARTÍCIÓVAL

      A tábla particionálásához használandó egy vagy több oszlop választható listája. Kölcsönösen kizárja egymást a CLUSTER BY-vel.

      A folyékony klaszterezés rugalmas, optimalizált megoldást biztosít a csoportosításhoz. Fontolja meg a CLUSTER BY használatát PARTITIONED BY helyett a folyamatvezetékek számára.

    • CLUSTER BY

      Engedélyezze a "liquid clustering" funkciót a táblában, és határozza meg a fürtözési kulcsként használni kívánt oszlopokat. Használjon automatikus folyékony fürtözést, CLUSTER BY AUTOés a Databricks intelligensen választja ki a fürtözési kulcsokat a lekérdezési teljesítmény optimalizálásához. Kölcsönösen kizárja egymást a PARTITIONED BY-vel.

      Lásd: Táblákhoz folyékony klaszterezés használata.

    • HELYSZÍN

      A táblaadatok opcionális tárolási helye. Ha nincs beállítva, a rendszer alapértelmezés szerint a folyamat tárolási helyére kerül.

    • MEGJEGYZÉS

      A táblázat leírásához választható szó szerinti STRING kifejezés.

    • TBLPROPERTIES

      A tábla táblatulajdonságainak választható listája.

    • VAL ROW FILTER

    Sorszűrő függvényt ad hozzá a táblához. A tábla jövőbeli lekérdezései azoknak a soroknak a részhalmazát kapják meg, amelyekre a függvény IGAZ értéket ad. Ez a részletes hozzáférés-vezérléshez hasznos, mert lehetővé teszi, hogy a függvény megvizsgálja a behívó felhasználó identitását és csoporttagságát, hogy eldöntse, szűr-e bizonyos sorokat.

    Lásd ROW FILTER záradék.

    • ÁRAMLÁS

      Igény szerint beágyazott folyamatot definiál a tábla létrehozásával. A folyamat állapotalapú lekérdezés, amely frissíti a tábla tartalmát. Ha FLOW nincs megadva, használhatja helyette, vagy külön definiálhatja AS query a folyamatokat.CREATE FLOW A következő folyamattípusok egyikét adhatja meg:

      • INSERT NÉV SZERINT

        Adatokat szúr be a táblába oszlopnév alapján. Ha a ONCE beállítás nincs megadva, a lekérdezésnek streamelési lekérdezésnek kell lennie. Használja a STREAM kulcsszót stream szemantikával, hogy olvasson a forrásból. Ha az olvasás egy meglévő rekord módosítását vagy törlését tapasztalja, hibaüzenet jelenik meg. A legbiztonságosabb, ha statikus vagy csak hozzáfűző forrásokból olvas.

        Megjegyzés:

        FLOW INSERT BY NAME a egyenértékű a használatával AS query. A következő két utasítás viselkedése azonos:

        CREATE OR REFRESH STREAMING TABLE raw_data
        AS SELECT * FROM STREAM read_files('abfss://my_path');
        
        CREATE OR REFRESH STREAMING TABLE raw_data
        FLOW INSERT BY NAME SELECT * FROM STREAM read_files('abfss://my_path');
        
      • EGYSZER

        Igény szerint a folyamatot egyszeri folyamatként, például visszatöltésként definiálja. Ha ONCE meg van adva, a lekérdezés nem streamelési lekérdezés, és a folyamat alapértelmezés szerint egyszer fut. Ha a tábla teljes frissítéssel frissül, a ONCE folyamat újra fut az adatok újbóli létrehozásához. ONCE csak a folyamatokra INSERT BY NAME vonatkozik.

      • AUTO CDC

        Fontos

        Elérhető a Databricks Runtime 17.3-ban és a PREVIEW Pipelines-csatornában.

        AUTO CDC Olyan folyamatot definiál, amely egy forrás adatrögzítési (CDC) rekordjait dolgozza fel a táblába. Akkor használható AUTO CDC , ha a forrásadatok CDC szemantikát tartalmaznak. Lásd az AUTO CDC API-k: Egyszerűsítse a változáskövető adatrögzítést a csővezetékekkel.

  • AS-lekérdezés

    Ez a záradék feltölti a táblát a queryadataival. Ennek a lekérdezésnek egy streamelési lekérdezésnek kell lennie. A STREAM kulcsszóval stream-szemantikát használhat a forrásból való olvasáshoz. Ha az olvasás egy meglévő rekord módosítását vagy törlését tapasztalja, hibaüzenet jelenik meg. A legbiztonságosabb, ha statikus vagy csak hozzáfűző forrásokból olvas. A módosítási véglegesítéseket tartalmazó adatok betöltéséhez hozzáadhatja az olvasási lehetőséget a SkipChangeCommits hibák kezeléséhez.

    Ha egy query-t és egy table_specification-et együtt ad meg, a table_specification-ben megadott táblasémának tartalmaznia kell a queryáltal visszaadott összes oszlopot, ellenkező esetben hibaüzenetet kap. A table_specification megadott, de query által nem visszaadott oszlopok null értékeket ad vissza lekérdezéskor.

    További információért az adatfolyamokról, tekintse meg a Folyamatokkal történő adatátalakítást.

    • Olvasási beállítások

      A lekérdezésben megadhatja az olvasási beállításokat az adatok forrásból való beolvasásának konfigurálásához. Megadhatja skipChangeCommits például, hogy a forrásadatokban szereplő módosítási véglegesítéseket átugorja. Az olvasási beállítások térképként vannak megadva a WITH lekérdezés záradékában. Például:

      SELECT * FROM STREAM source_table WITH (SKIPCHANGECOMMITS=TRUE, STARTINGVERSION=X)
      

      Ez =TRUE nem kötelező, így az alábbihoz hasonló logikai beállítást is megadhat:

      SELECT * FROM STREAM source_table WITH (SKIPCHANGECOMMITS)
      

      Megjegyzés:

      Az olvasási lehetőségek csak a Databricks Runtime 17.3-at vagy újabb verzióját támogatják.

      Az alábbi olvasási lehetőségek támogatottak a Delta esetében, az egyes lehetőségek részleteiért lásd a Delta tábla streamelési olvasásait és írásait.

      • maxFilesPerTrigger
      • maxBytesPerTrigger
      • startingVersion
      • startingTimestamp
      • readChangeFeed
      • withEventTimeOrder
      • skipChangeCommits

Szükséges engedélyek

A folyamat futtató felhasználójának a következő engedélyekkel kell rendelkeznie:

  • SELECT engedély az alaptáblákra, amelyekre a streamelési tábla hivatkozik.
  • USE CATALOG jogosultsággal a szülőkatalóguson, valamint USE SCHEMA jogosultsággal a szülősémán.
  • CREATE MATERIALIZED VIEW jogosultság a streaming tábla sémájára.

Ahhoz, hogy a felhasználó frissíthesse a streamelési táblában definiált folyamatot, a következőt kell megkövetelnie:

  • USE CATALOG jogosultsággal a szülőkatalóguson, valamint USE SCHEMA jogosultsággal a szülősémán.
  • A streamelési tábla tulajdonjoga vagy REFRESH jogosultság a streamelési táblán.
  • A streamelési tábla tulajdonosának jogosultsággal SELECT kell rendelkeznie a streamelési tábla által hivatkozott alaptáblák felett.

Ahhoz, hogy egy felhasználó le tudja kérdezni az eredményként kapott streamelési táblát, a következőre van szükség:

  • USE CATALOG jogosultsággal a szülőkatalóguson, valamint USE SCHEMA jogosultsággal a szülősémán.
  • SELECT jogosultság a streamelési tábla felett.

Korlátozások

  • Csak a táblatulajdonosok frissíthetik a streamelő táblákat a legújabb adatok lekéréséhez.
  • ALTER TABLE parancsok nem engedélyezettek a streamelési táblákon. A tábla definícióját és tulajdonságait a CREATE OR REFRESH vagy ALTER STREAMING TABLE utasítással kell módosítani.
  • A táblaséma DML-parancsokkal (például INSERT INTO) történő fejlesztése és a MERGE nem támogatott.
  • A streamelési táblákban a következő parancsok nem támogatottak:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • A tábla átnevezése vagy a tulajdonos megváltoztatása nem támogatott.
  • A létrehozott oszlopok, identitásoszlopok és alapértelmezett oszlopok nem támogatottak.

Példák

-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")

-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;

-- Define a streaming table with an inline append flow:
CREATE OR REFRESH STREAMING TABLE raw_data
FLOW INSERT BY NAME SELECT * FROM STREAM read_files('abfss://my_path');

-- Define a streaming table with an inline AUTO CDC flow:
CREATE OR REFRESH STREAMING TABLE target
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
SEQUENCE BY sequenceNum
STORED AS SCD TYPE 1;