Megosztás a következőn keresztül:


CREATE STREAMING TABLE

A következőkre vonatkozik:jelölje be az igennel jelölt jelölőnégyzetet Databricks SQL

Létrehoz egy streamelési táblát, egy Delta-táblát, amely további támogatást nyújt a streameléshez vagy a növekményes adatfeldolgozáshoz.

A streamelési táblák csak a Lakeflow Spark Deklaratív folyamatokban, valamint a Databricks SQL-ben és a Unity Catalogban támogatottak. A támogatott Databricks Runtime Compute-alapú parancs futtatása csak a szintaxist elemzi. Lásd Lakeflow Spark deklaratív csatornák kódja SQL-lel.

Szintaxis

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    COMMENT table_comment |
    DEFAULT COLLATION UTF8_BINARY |
    TBLPROPERTIES clause |
    schedule |
    WITH { ROW FILTER clause } } [...]

schedule
  { SCHEDULE [ REFRESH ] schedule_clause |
    TRIGGER ON UPDATE [ AT MOST EVERY trigger_interval ] }

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ]}

Paraméterek

  • REFRESH

    Ha meg van adva, frissíti a táblát a lekérdezésben meghatározott forrásokból elérhető legfrissebb adatokkal. Csak azokat az új adatokat dolgozzuk fel, amelyek a lekérdezés elkezdése előtt érkeznek meg. A parancs végrehajtása során a forrásokhoz hozzáadott új adatokat a rendszer a következő frissítésig figyelmen kívül hagyja. A CREATE VAGY REFRESH frissítési művelete teljes mértékben deklaratív. Ha egy frissítési parancs nem adja meg az eredeti táblalétrehozó utasítás összes metaadatait, a program törli a meg nem határozott metaadatokat.

  • HA NEM LÉTEZIK

    Létrehozza a streamelési táblát, ha nem létezik. Ha már létezik ilyen nevű tábla, a CREATE STREAMING TABLE utasítás figyelmen kívül lesz hagyva.

    Legfeljebb az egyiket adhatja meg: IF NOT EXISTS vagy OR REFRESH.

  • table_name

    A létrehozandó tábla neve. A név nem tartalmazhat időbeli specifikációt vagy beállításspecifikációt. Ha a név nincs minősítve, a tábla az aktuális sémában jön létre.

  • táblázat_specifikáció

    Ez az opcionális záradék határozza meg az oszlopok listáját, azok típusait, tulajdonságait, leírását és oszlopkorlátjait.

    Ha nem definiál oszlopokat a táblázatsémában, meg kell adnia AS query.

    • column_identifier

      Az oszlop egyedi neve.

      • oszloptípus

        Megadja az oszlop adattípusát.

      • NOT NULL

        Ha meg van adva, az oszlop nem fogadja el NULL értékeket.

      • MEGJEGYZÉS column_comment

        Karakterlánc-literál az oszlop leírására.

      • column_constraint

        Fontos

        Ez a funkció a nyilvános előzetes verzióban érhető el.

        Elsődleges vagy idegenkulcs-korlátozást ad hozzá egy adatfolyam-tábla oszlopához. A hive_metastore katalógus táblái nem támogatják a korlátozásokat.

      • MASZK záradék

        Hozzáad egy oszlopmaszk-függvényt a bizalmas adatok anonimizálására. Az oszlop minden további lekérdezése megkapja a függvény kiértékelésének eredményét az oszlopon az oszlop eredeti értéke helyett. Ez hasznos lehet részletes hozzáférés-vezérlési célokra, ahol a függvény megvizsgálhatja az invokáló felhasználó identitását vagy csoporttagságát, hogy eldöntse, elrejti-e az értéket.

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ SÉRTÉS ESETÉN { HIBA UPDATE | SOR ELHAGYÁSA } ]

        Adatminőségi elvárásokat ad hozzá a táblához. Ezek az adatminőségi elvárások idővel nyomon követhetők és elérhetők a streamelési tábla eseménynaplójában. A FAIL UPDATE elvárás miatt a feldolgozás sikertelen lesz a tábla létrehozásakor és a tábla frissítésekor is. Ha DROP ROW a várakozás nem teljesül, az egész sor elvetésre kerül.

        expectation_expr állhat literálokból, táblán belüli oszlopazonosítókból és determinista, beépített SQL-függvényekből vagy operátorokból, kivéve:

        Emellett expr nem tartalmazhat al-lekérdezést.

      • tábla_korlátozás

        Fontos

        Ez a funkció a nyilvános előzetes verzióban érhető el.

        Információs elsődleges kulcsot vagy információs idegenkulcs-korlátozásokat ad hozzá egy streamelési táblához. A kulcskorlátozások nem támogatottak a hive_metastore katalógusban lévő táblák esetében.

  • táblázat_feltételek

    Opcionálisan megadhatja a particionálást, a megjegyzéseket, a felhasználó által definiált tulajdonságokat és az új tábla frissítési ütemezését. Minden al záradék csak egyszer adható meg.

    • FEL�OSZTVA VALAMI SZERINT

      A tábla oszlopainak választható listája a tábla particionálásához.

      Feljegyzés

      A folyadék alapú klaszterezés rugalmas, optimalizált megoldást biztosít. Fontolja meg a streamelési táblák esetében a CLUSTER BY helyett a PARTITIONED BY használatát.

    • CLUSTER BY

      Opcionális feltétel az oszlopok egy részhalmaza alapján történő csoportosításhoz. Használjon automatikus folyékony fürtözést, CLUSTER BY AUTOés a Databricks intelligensen választja ki a fürtözési kulcsokat a lekérdezési teljesítmény optimalizálásához. Lásd: Táblákhoz folyékony klaszterezés használata.

      A folyékony fürtözés nem kombinálható a következővel PARTITIONED BY: .

    • MEGJEGYZÉS table_comment

      Egy STRING kifejezés a táblázat leírásához.

    • ALAPÉRTELMEZETT RENDEZÉSI UTF8_BINARY

      A következőre vonatkozik:yes Databricks SQL check mark yes Databricks Runtime 17.1 és újabb

      Kényszeríti a streamelési tábla alapértelmezett rendezést a következőre UTF8_BINARY: . Ez a záradék kötelező, ha a táblázatot létrehozó séma nem alapértelmezett rendezéssel UTF8_BINARYrendelkezik. A streaming tábla alapértelmezett rendezése alapértelmezetten van használva az query-ban és az oszloptípusoknál.

    • TBLPROPERTIES

      Igény szerint beállíthat egy vagy több felhasználó által definiált tulajdonságot.

      Ezzel a beállítással megadhatja az utasítás futtatásához használt Lakeflow Spark Deklaratív folyamatok futtatókörnyezeti csatornát. Állítsa a pipelines.channel tulajdonság értékét "PREVIEW" vagy "CURRENT"értékre. Az alapértelmezett érték "CURRENT". További információ a Lakeflow Spark Deklaratív folyamatok csatornáiról: Lakeflow Spark Deklaratív folyamatok futtatókörnyezeti csatornái.

    • menetrend

      Az ütemezés lehet utasítás SCHEDULE vagy TRIGGER utasítás.

      • ÜTEMTERV [ REFRESH ] ütemezési kikötés

        • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

          A rendszeres frissítés ütemezéséhez használja EVERY szintaxist. Ha EVERY szintaxis van megadva, a streamelési táblázat vagy a materializált nézet rendszeres időközönként frissül a megadott érték alapján, például HOUR, HOURS, DAY, DAYS, WEEKvagy WEEKS. Az alábbi táblázat a numberelfogadott egész számértékeket sorolja fel.

          Időegység Egész számérték
          HOUR or HOURS 1 <= H <= 72
          DAY or DAYS 1 <= D <= 31
          WEEK or WEEKS 1 <= W <= 8

          Feljegyzés

          A belefoglalt időegység egyes és többes formái szemantikailag egyenértékűek.

        • CRON cron_string [ AT TIME ZONE timezone_id ]

          Frissítés ütemezése kvarc cron érték használatával. Érvényes time_zone_values elfogadhatók. AT TIME ZONE LOCAL nem támogatott.

          Ha AT TIME ZONE hiányzik, a munkamenet időzónája lesz használatban. Ha AT TIME ZONE hiányzik, és a munkamenet időzónája nincs beállítva, hibaüzenet jelenik meg. SCHEDULE szemantikailag egyenértékű a SCHEDULE REFRESH.

        Az ütemezés a parancs részeként is megadható CREATE . Használja ALTER STREAMING TABLE vagy futtassa a CREATE OR REFRESH parancsot SCHEDULE záradékkal a streamelési tábla létrehozás utáni ütemezésének módosításához.

      • TRIGGER BEKAPCSOLVA UPDATE [ LEGFELJEBB MINDEN TRIGGER_INTERVAL ]

        Fontos

        A TRIGGER ON UPDATE funkció bétaverzióban érhető el.

        Ha szeretné, állítsa a táblát frissítésre egy felsőbb rétegbeli adatforrás frissítésekor, legfeljebb percenként egyszer. Állítson be egy értéket AT MOST EVERY , amely legalább egy minimális időt igényel a frissítések között.

        A felsőbb rétegbeli adatforrásoknak külső vagy felügyelt Delta-tábláknak kell lenniük (beleértve a materializált nézeteket vagy streamelő táblákat), vagy felügyelt nézeteknek, amelyek függőségei a támogatott táblatípusokra korlátozódnak.

        A fájlesemények engedélyezésével az eseményindítók teljesítményesebbek lehetnek, és növelik a triggerfrissítések korlátait.

        Az trigger_intervalINTERVALLUM utasítás legalább 1 perc.

        TRIGGER ON UPDATE az alábbi korlátozásokkal rendelkezik:

        • A TRIGGER ON UPDATEhasználata esetén streamelési táblánként legfeljebb 10 streamelő adatforrást lehet használni.
        • A TRIGGER ON UPDATEhasználatával legfeljebb 1000 streamelési táblázat vagy materializált nézet adható meg.
        • A AT MOST EVERY záradék alapértelmezés szerint 1 perc, és nem lehet kevesebb, mint 1 perc.
  • A ROW FILTER záradékkal

    Sorszűrő függvényt ad hozzá a táblához. A tábla minden további lekérdezése megkapja azoknak a soroknak a részhalmazát, ahol a függvény logikai értéke IGAZ. Ez hasznos lehet részletes hozzáférés-vezérlési célokra, ahol a függvény megvizsgálhatja a behívó felhasználó identitását vagy csoporttagságát, hogy eldöntse, szűr-e bizonyos sorokat.

  • AS-lekérdezés

    Ez a záradék feltölti a táblát a queryadataival. Ennek a lekérdezésnek egy streamelési lekérdezésnek kell lennie. Ez úgy érhető el, hogy hozzáadja a STREAM kulcsszót minden olyan relációhoz, amelyet növekményesen szeretne feldolgozni. Ha egy query-t és egy table_specification-et együtt ad meg, a table_specification-ben megadott táblasémának tartalmaznia kell a queryáltal visszaadott összes oszlopot, ellenkező esetben hibaüzenetet kap. A table_specification megadott, de query által nem visszaadott oszlopok null értékeket ad vissza lekérdezéskor.

A streaming táblák és más táblák közötti különbségek

A streaming táblák állapotmegőrző táblák, amelyek úgy vannak kialakítva, hogy az egyes sorokat csak egyszer kezeljék, amikor egy növekvő adatkészletet dolgoz fel. Mivel a legtöbb adathalmaz folyamatosan nő az idő függvényében, a streamelési táblák a legtöbb betöltési számítási feladathoz jól használhatók. A streamelési táblák optimálisak olyan folyamatokhoz, amelyek adatfrissítést és alacsony késést igényelnek. A streamelési táblák nagy léptékű átalakításokhoz is hasznosak lehetnek, mivel az eredmények növekményesen kiszámíthatók az új adatok érkezésekor, így az eredmények naprakészek maradnak anélkül, hogy az összes forrásadatot teljesen újra kellene komponálnunk minden frissítéssel. Az adatfolyam-táblákat kizárólag olyan adatforrásokhoz tervezték, amelyek csak kiegészítést tesznek lehetővé.

A streamelő táblák további parancsokat is elfogadnak, például REFRESH, amelyek a lekérdezésben megadott forrásokban elérhető legfrissebb adatokat dolgoznak fel. A megadott lekérdezés módosításai csak az új adatokon jelennek meg egy REFRESHmeghívásával; a korábban feldolgozott adatokra ezek nem vonatkoznak. A meglévő adatok módosításainak alkalmazásához futtatnia kell a REFRESH TABLE <table_name> FULL parancsot, hogy végrehajtson egy FULL REFRESH. A teljes frissítések újra feldolgozzák a forrásban elérhető összes adatot a legújabb definícióval. Nem ajánlott teljes frissítéseket meghívni olyan forrásokra, amelyek nem őrzik meg az adatok teljes előzményeit, vagy rövid megőrzési időszakuk van (például Kafka), mivel a teljes frissítés csonkolja a meglévő adatokat. Előfordulhat, hogy nem tudja helyreállítani a régi adatokat, ha az adatok már nem érhetők el a forrásban.

Sorszűrők és oszlopmaszkok

A sorszűrők lehetővé teszik egy olyan függvény megadását, amely szűrőként van alkalmazva, amikor egy táblavizsgálat sorokat olvas be. Ezek a szűrők biztosítják, hogy a későbbi lekérdezések csak olyan sorokat adjanak vissza, amelyek esetében a szűrő predikátum értéke igaz.

Az oszlopmaszkok lehetővé teszik az oszlopok értékeinek maszkolását, amikor egy táblázat beolvassa a sorokat. Az oszlopot érintő minden jövőbeli lekérdezés megkapja a függvény kiértékelésének eredményét az oszlopon, lecserélve az oszlop eredeti értékét.

További információ a sorszűrők és oszlopmaszkok használatáról: Sorszűrők és oszlopmaszkok.

Sorszűrők és oszlopmaszkok kezelése

A streamelési táblák sorszűrőit és oszlopmaszkjait a CREATE OR REFRESH utasítással kell hozzáadni, frissíteni vagy elvetni.

Működés

  • Frissítés definiálóként: Amikor a CREATE OR REFRESH sorok vagy REFRESH utasítások frissítenek egy streamelési táblát, a sorszűrő függvények a definiáló jogosultságaival futnak (táblatulajdonosként). Ez azt jelenti, hogy a táblafrissítés a streamelési táblát létrehozó felhasználó biztonsági környezetét használja.
  • Lekérdezés: Bár a legtöbb szűrő a definiáló jogosultságaival fut, a felhasználói környezetet ellenőrző függvények (például CURRENT_USER és IS_MEMBER) kivételek. Ezek a függvények a meghívó jogosultságaival futnak. Ez a megközelítés a felhasználóspecifikus adatbiztonságot és hozzáférés-vezérlést kényszeríti ki az aktuális felhasználó kontextusa alapján.

Megfigyelhetőség

A DESCRIBE EXTENDED, a INFORMATION_SCHEMAvagy a Katalóguskezelő használatával vizsgálja meg az adott streamtáblára vonatkozó meglévő sorszűrőket és oszlopmaszkokat. Ez a funkció lehetővé teszi a felhasználók számára az adathozzáférési és védelmi intézkedések naplózását és felülvizsgálatát a streamelési táblákon.

Korlátozások

  • Csak a táblatulajdonosok frissíthetik a streamelő táblákat a legújabb adatok lekéréséhez.
  • ALTER TABLE parancsok nem engedélyezettek a streamelési táblákon. A tábla definícióját és tulajdonságait a CREATE OR REFRESH vagy ALTER STREAMING TABLE utasítással kell módosítani.
  • A táblaséma DML-parancsokkal (például INSERT INTO) történő fejlesztése és a MERGE nem támogatott.
  • A streamelési táblákban a következő parancsok nem támogatottak:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • A deltamegosztás nem támogatott.
  • A tábla átnevezése vagy a tulajdonos megváltoztatása nem támogatott.
  • Streaming táblákra vonatkozó táblakorlátozások, mint például a PRIMARY KEY és a FOREIGN KEY, nincsenek támogatva az hive_metastore katalógusban.
  • A létrehozott oszlopok, identitásoszlopok és alapértelmezett oszlopok nem támogatottak.

Példák

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Creates a streaming table that scheduled to refresh when upstream data is updated.
-- The refresh frequency of triggered_data is at most once an hour.
> CREATE STREAMING TABLE triggered_data
  TRIGGER ON UPDATE AT MOST EVERY INTERVAL 1 hour
  AS SELECT *
  FROM STREAM source_stream_data;

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE EVERY 1 HOUR
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM STREAM sales;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')