Megosztás a következőn keresztül:


STREAMELÉSI TÁBLA LÉTREHOZÁSA

A következőkre vonatkozik: jelölje be az igennel jelölt jelölőnégyzetet Databricks SQL

Létrehoz egy streamelési táblát, egy Delta-táblát, amely további támogatást nyújt a streameléshez vagy a növekményes adatfeldolgozáshoz.

A streamelési táblák csak a Delta Live Tablesben és a Databricks SQL-ben és a Unity Catalogban támogatottak. A támogatott Databricks Runtime Compute-alapú parancs futtatása csak a szintaxist elemzi. Lásd: Delta Live Tables-folyamat implementálása AZ SQL használatával.

Syntax

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] schedule_clause |
    WITH { ROW FILTER clause } } [...]

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ] }

Paraméterek

  • REFRESH

    Ha meg van adva, frissíti a táblát a lekérdezésben meghatározott forrásokból elérhető legfrissebb adatokkal. Csak a lekérdezés indítása előtt megérkezik új adatok feldolgozása. A parancs végrehajtása során a forrásokhoz hozzáadott új adatokat a rendszer a következő frissítésig figyelmen kívül hagyja. A CREATE VAGY REFRESH frissítési művelete teljes mértékben deklaratív. Ha egy frissítési parancs nem adja meg az eredeti táblalétrehozó utasítás összes metaadatait, a program törli a meg nem határozott metaadatokat.

  • HA NEM LÉTEZIK

    Létrehozza a streamelési táblát, ha nem létezik. Ha már létezik ilyen nevű tábla, a rendszer figyelmen kívül hagyja az CREATE STREAMING TABLE utasítást.

    Legfeljebb egyet IF NOT EXISTS OR REFRESHvagy egyet adhat meg.

  • table_name

    A létrehozandó tábla neve. A név nem tartalmazhat időbeli specifikációt. Ha a név nincs minősítve, a tábla az aktuális sémában jön létre.

  • table_specification

    Ez az opcionális záradék határozza meg az oszlopok listáját, azok típusait, tulajdonságait, leírását és oszlopkorlátjait.

    Ha nem definiál oszlopokat a táblázatsémában, meg kell adnia AS query.

    • column_identifier

      Az oszlop egyedi neve.

      • column_type

        Az oszlop adattípusát adja meg.

      • NOT NULL

        Ha meg van adva, az oszlop nem fogad el NULL értékeket.

      • MEGJEGYZÉS column_comment

        Sztringkonstans az oszlop leírásához.

      • column_constraint

        Fontos

        Ez a funkció a nyilvános előzetes verzióban érhető el.

        Elsődleges vagy idegenkulcs-korlátozást ad hozzá egy streamelési tábla oszlopához. A katalógus táblái nem támogatják a hive_metastore korlátozásokat.

      • MASZK záradék

        Fontos

        Ez a funkció a nyilvános előzetes verzióban érhető el.

        Oszlopmaszk-függvényt ad hozzá a bizalmas adatok anonimizálásához. Az oszlop minden további lekérdezése megkapja a függvény kiértékelésének eredményét az oszlopon az oszlop eredeti értéke helyett. Ez hasznos lehet részletes hozzáférés-vezérlési célokra, ahol a függvény megvizsgálhatja az invokáló felhasználó identitását vagy csoporttagságát, hogy eldöntse, újra ki kívánja-e használni az értéket.

      • KORLÁTOZÁS EXPECTATION_NAME VÁRHATÓ (expectation_expr) [ A JOGSÉRTÉS { SIKERTELEN FRISSÍTÉS | DROP ROW } ]

        Adatminőségi elvárásokat ad hozzá a táblához. Ezek az adatminőségi elvárások idővel nyomon követhetők és elérhetők a streamelési tábla eseménynaplójában. A FAIL UPDATE várakozás miatt a feldolgozás sikertelen lesz a tábla létrehozásakor és a tábla frissítésekor is. Ha DROP ROW a várakozás nem teljesül, a várakozás a teljes sort elveti.

        expectation_expr A tábla literálokból, oszlopazonosítókból és determinisztikus, beépített SQL-függvényekből vagy operátorokból állhat, kivéve:

        Emellett expr nem tartalmazhat alqueryt.

      • table_constraint

        Fontos

        Ez a funkció a nyilvános előzetes verzióban érhető el.

        Információs elsődleges kulcsot vagy információs idegenkulcs-korlátozásokat ad hozzá egy streamelési táblához. A katalógus táblái nem támogatják a hive_metastore fő korlátozásokat.

  • table_clauses

    Opcionálisan megadhatja a particionálást, a megjegyzéseket, a felhasználó által definiált tulajdonságokat és az új tábla frissítési ütemezését. Minden al záradék csak egyszer adható meg.

    • PARTICIONÁLT

      A tábla oszlopainak választható listája a tábla particionálásához.

    • MEGJEGYZÉS table_comment

      A STRING táblázatot leíró literál.

    • TBLPROPERTIES

      Igény szerint beállíthat egy vagy több felhasználó által definiált tulajdonságot.

      Ezzel a beállítással megadhatja az utasítás futtatásához használt Delta Live Tables futtatókörnyezeti csatornát. Állítsa be a tulajdonság értékét a pipelines.channel következőre "PREVIEW" : vagy "CURRENT". Az alapértelmezett érték "CURRENT". A Delta Live Tables-csatornákról további információt a Delta Live Tables futtatókörnyezeti csatornáiban talál.

    • ÜTEMEZÉS [ FRISSÍTÉS ] schedule_clause

      • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

        Fontos

        Ez a funkció a nyilvános előzetes verzióban érhető el.

        A rendszeres frissítés ütemezéséhez szintaxist használjon EVERY . Ha EVERY a szintaxis meg van adva, a streamelési táblázat vagy a materializált nézet rendszeres időközönként frissül a megadott időközönként a megadott érték alapján, például HOUR, HOURS, DAY, DAYS, WEEKvagy WEEKS. Az alábbi táblázat az elfogadott egész számértékeket sorolja fel a következőhöz number: .

        Időegység Egész számérték
        HOUR or HOURS 1 <= H <= 72
        DAY or DAYS 1 <= D <= 31
        WEEK or WEEKS 1 <= W <= 8

        Feljegyzés

        A belefoglalt időegység egyes és többes formái szemantikailag egyenértékűek.

      • CRON cron_string [ AT TIME ZONE timezone_id ]

        Frissítés ütemezése kvarc cron érték használatával. A rendszer érvényes time_zone_values fogad el. AT TIME ZONE LOCAL nem támogatott.

        Ha AT TIME ZONE hiányzik, a munkamenet időzónája lesz használatban. Ha AT TIME ZONE hiányzik, és a munkamenet időzónája nincs beállítva, hibaüzenet jelenik meg. SCHEDULE szemantikailag egyenértékű a SCHEDULE REFRESH.

      Az ütemezés a parancs részeként is megadható CREATE . Használja az ALTER STREAMING TABLE parancsot, vagy futtassa CREATE OR REFRESH a parancsot záradékkal SCHEDULE a streamelési tábla létrehozás utáni ütemezésének módosításához.

    • WITH ROW FILTER záradék

      Fontos

      Ez a funkció a nyilvános előzetes verzióban érhető el.

      Sorszűrő függvényt ad hozzá a táblához. A tábla minden további lekérdezése megkapja azoknak a soroknak a részhalmazát, amelyekben a függvény igaz logikai értéket ad ki. Ez hasznos lehet részletes hozzáférés-vezérlési célokra, ahol a függvény megvizsgálhatja a behívó felhasználó identitását vagy csoporttagságát, hogy eldöntse, szűr-e bizonyos sorokat.

  • AS-lekérdezés

    Ez a záradék feltölti a táblát a forrásból származó queryadatokkal. Ennek a lekérdezésnek streamelési lekérdezésnek kell lennie. Ez úgy érhető el, hogy hozzáadja a STREAM kulcsszót minden olyan relációhoz, amelyet növekményesen szeretne feldolgozni. Ha egy query és egy table_specification együttes értéket ad meg, a megadott table_specification táblázatsémának tartalmaznia kell a megadott oszlop által queryvisszaadott összes oszlopot, ellenkező esetben hibaüzenet jelenik meg. A lekérdezéskor megadotttable_specification, de a visszaadott null értékek által query nem visszaadott oszlopok.

A streamelő táblák és más táblák közötti különbségek

A streamelő táblák állapotalapú táblák, amelyek úgy vannak kialakítva, hogy az egyes sorokat csak egyszer kezeljék, amikor egyre növekvő adatkészletet dolgoz fel. Mivel a legtöbb adathalmaz folyamatosan nő az idő függvényében, a streamelési táblák a legtöbb betöltési számítási feladathoz jól használhatók. A streamelési táblák optimálisak olyan folyamatokhoz, amelyek adatfrissítést és alacsony késést igényelnek. A streamelési táblák nagy léptékű átalakításokhoz is hasznosak lehetnek, mivel az eredmények növekményesen kiszámíthatók az új adatok érkezésekor, így az eredmények naprakészek maradnak anélkül, hogy az összes forrásadatot teljesen újra kellene komponálnunk minden frissítéssel. A streamelési táblák csak hozzáfűző adatforrásokhoz készültek.

A streamelő táblák további parancsokat fogadnak el, például REFRESHa lekérdezésben megadott forrásokban elérhető legfrissebb adatokat feldolgozó parancsokat. A megadott lekérdezés módosításai csak a korábban nem feldolgozott adatok meghívásával REFRESHjelennek meg az új adatokon. A meglévő adatok módosításainak alkalmazásához végre kell hajtania REFRESH TABLE <table_name> FULL egy FULL REFRESH. A teljes frissítések újra feldolgozzák a forrásban elérhető összes adatot a legújabb definícióval. Nem ajánlott teljes frissítéseket meghívni olyan forrásokra, amelyek nem őrzik meg az adatok teljes előzményeit, vagy rövid megőrzési időszakuk van (például Kafka), mivel a teljes frissítés csonkolja a meglévő adatokat. Előfordulhat, hogy nem tudja helyreállítani a régi adatokat, ha az adatok már nem érhetők el a forrásban.

Sorszűrők és oszlopmaszkok

Fontos

Ez a funkció a nyilvános előzetes verzióban érhető el.

A sorszűrők lehetővé teszik egy olyan függvény megadását, amely szűrőként van alkalmazva, amikor egy táblavizsgálat sorokat olvas be. Ezek a szűrők biztosítják, hogy a későbbi lekérdezések csak olyan sorokat adjanak vissza, amelyek esetében a szűrő predikátum értéke igaz.

Az oszlopmaszkok lehetővé teszik az oszlopok értékeinek maszkolását, amikor egy táblázat beolvassa a sorokat. Az oszlopot érintő minden jövőbeli lekérdezés megkapja a függvény kiértékelésének eredményét az oszlopon, lecserélve az oszlop eredeti értékét.

A sorszűrők és oszlopmaszkok használatáról további információt a bizalmas táblázatadatok szűrése sorszűrők és oszlopmaszkok használatával című témakörben talál.

Sorszűrők és oszlopmaszkok kezelése

A streamelési táblák sorszűrőit és oszlopmaszkjait hozzá kell adni, frissíteni vagy elvetni az CREATE OR REFRESH utasítással.

Működés

  • Frissítés definiálóként: Amikor a CREATE OR REFRESH sorok vagy REFRESH utasítások frissítenek egy streamelési táblát, a sorszűrő függvények a definiáló jogosultságaival futnak (táblatulajdonosként). Ez azt jelenti, hogy a táblafrissítés a streamelési táblát létrehozó felhasználó biztonsági környezetét használja.
  • Lekérdezés: Bár a legtöbb szűrő a definiáló jogosultságaival fut, a felhasználói környezetet ellenőrző függvények (például CURRENT_USER és IS_MEMBER) kivételek. Ezek a függvények meghívóként futnak. Ez a megközelítés a felhasználóspecifikus adatbiztonságot és hozzáférés-vezérlést kényszeríti ki az aktuális felhasználó kontextusa alapján.

Megfigyelhetőség

INFORMATION_SCHEMAA Katalóguskezelővel DESCRIBE EXTENDEDvagy a Katalóguskezelővel megvizsgálhatja azokat a meglévő sorszűrőket és oszlopmaszkokat, amelyek egy adott streamelési táblára vonatkoznak. Ez a funkció lehetővé teszi a felhasználók számára az adathozzáférési és védelmi intézkedések naplózását és felülvizsgálatát a streamelési táblákon.

Korlátozások

  • Csak a táblatulajdonosok frissíthetik a streamelő táblákat a legújabb adatok lekéréséhez.

  • ALTER TABLE parancsok nem engedélyezettek a streamelési táblákon. A tábla definícióját és tulajdonságait az ALTER STREAMING TABLE utasítással CREATE OR REFRESH kell módosítani.

  • Az időutazási lekérdezések nem támogatottak.

  • A táblaséma fejlesztése DML-parancsokkal, például INSERT INTO, és MERGE nem támogatott.

  • A streamelési táblákban a következő parancsok nem támogatottak:

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • A deltamegosztás nem támogatott.

  • A tábla átnevezése vagy a tulajdonos módosítása nem támogatott.

  • Az olyan táblakorlátozások, mint például PRIMARY KEY FOREIGN KEY a nem támogatottak.

  • A létrehozott oszlopok, identitásoszlopok és alapértelmezett oszlopok nem támogatottak.

Példák

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM RANGE(10)

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')