STREAMELÉSI TÁBLA LÉTREHOZÁSA
A következőkre vonatkozik: Databricks SQL Databricks Runtime 13.3 LTS és újabb
Fontos
Ez a funkció a nyilvános előzetes verzióban érhető el.
Létrehoz egy streamelési táblát, egy Delta-táblát, amely további támogatást nyújt a streameléshez vagy a növekményes adatfeldolgozáshoz.
A streamelési táblák csak a Delta Live Tablesben és a Databricks SQL-ben és a Unity Catalogban támogatottak. A támogatott Databricks Runtime Compute-alapú parancs futtatása csak a szintaxist elemzi. Lásd: Delta Live Tables-folyamat implementálása AZ SQL használatával.
Syntax
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( [ column_identifier column_type [ NOT NULL ]
[ COMMENT column_comment ] [ column_constraint ]
] [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] } [...]
Paraméterek
REFRESH
Ha meg van adva, frissíti a táblát a lekérdezésben meghatározott forrásokból elérhető legfrissebb adatokkal. Csak a lekérdezés indítása előtt megérkezik új adatok feldolgozása. A parancs végrehajtása során a forrásokhoz hozzáadott új adatokat a rendszer a következő frissítésig figyelmen kívül hagyja.
HA NEM LÉTEZIK
Ha meg van adva, és már létezik ilyen nevű tábla, a rendszer figyelmen kívül hagyja az utasítást.
IF NOT EXISTS
nem használható együtt,REFRESH
ami azt jelentiCREATE OR REFRESH TABLE IF NOT EXISTS
, hogy nem engedélyezett.-
A létrehozandó tábla neve. A név nem tartalmazhat időbeli specifikációt. Ha a név nincs minősítve, a tábla az aktuális sémában jön létre.
table_specification
Ez az opcionális záradék határozza meg az oszlopok listáját, azok típusait, tulajdonságait, leírását és oszlopkorlátjait.
Ha nem definiál oszlopokat a táblázatsémában, meg kell adnia
AS query
.-
Az oszlop egyedi neve.
-
Az oszlop adattípusát adja meg.
NOT NULL
Ha meg van adva, az oszlop nem fogad el
NULL
értékeket.MEGJEGYZÉS column_comment
Sztringkonstans az oszlop leírásához.
-
Fontos
Ez a funkció a nyilvános előzetes verzióban érhető el.
Elsődleges vagy idegenkulcs-korlátozást ad hozzá egy streamelési tábla oszlopához. A katalógus táblái nem támogatják a
hive_metastore
korlátozásokat. KORLÁTOZÁS EXPECTATION_NAME VÁRHATÓ (expectation_expr) [ A JOGSÉRTÉS { SIKERTELEN FRISSÍTÉS | DROP ROW } ]
Adatminőségi elvárásokat ad hozzá a táblához. Ezek az adatminőségi elvárások idővel nyomon követhetők és elérhetők a streamelési tábla eseménynaplójában. A
FAIL UPDATE
várakozás miatt a feldolgozás sikertelen lesz a tábla létrehozásakor és a tábla frissítésekor is. HaDROP ROW
a várakozás nem teljesül, a várakozás a teljes sort elveti.expectation_expr
A tábla literálokból, oszlopazonosítókból és determinisztikus, beépített SQL-függvényekből vagy operátorokból állhat, kivéve:- Függvények összesítése
- Elemzési ablakfüggvények
- Rangsorolási ablakfüggvények
- Táblaértékelő generátorfüggvények
- Függvények összesítése
-
Fontos
Ez a funkció a nyilvános előzetes verzióban érhető el.
Információs elsődleges kulcsot vagy információs idegenkulcs-korlátozásokat ad hozzá egy streamelési táblához. A katalógus táblái nem támogatják a
hive_metastore
fő korlátozásokat.
-
-
table_clauses
Opcionálisan megadhatja a particionálást, a megjegyzéseket, a felhasználó által definiált tulajdonságokat és az új tábla frissítési ütemezését. Minden al záradék csak egyszer adható meg.
-
A tábla oszlopainak választható listája a tábla particionálásához.
MEGJEGYZÉS table_comment
A
STRING
táblázatot leíró literál.-
Igény szerint beállíthat egy vagy több felhasználó által definiált tulajdonságot.
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ]
Ha meg van adva, ütemezi a streamelési táblát vagy a materializált nézetet, hogy az adatokat a megadott kvarc cron ütemezéssel frissítse. Csak time_zone_values fogadunk el.
AT TIME ZONE LOCAL
nem támogatott. HaAT TIME ZONE
hiányzik, a munkamenet időzónája lesz használatban. HaAT TIME ZONE
hiányzik, és a munkamenet időzónája nincs beállítva, hibaüzenet jelenik meg.SCHEDULE
szemantikailag egyenértékű aSCHEDULE REFRESH
.A szintaxis nem használható a
SCHEDULE
Delta Live Tables folyamatdefiníciójában.A
SCHEDULE
záradék parancsbanCREATE OR REFRESH
nem engedélyezett. Az ütemezés a parancs részeként is megadhatóCREATE
. Az ALTER STREAMING TABLE használatával módosíthatja a streamelőtáblák létrehozás utáni ütemezését.
-
-
Ez a záradék feltölti a táblát a forrásból származó
query
adatokkal. Ennek a lekérdezésnek streamelési lekérdezésnek kell lennie. Ez úgy érhető el, hogy hozzáadja aSTREAM
kulcsszót minden olyan relációhoz, amelyet növekményesen szeretne feldolgozni. Ha egyquery
és egytable_specification
együttes értéket ad meg, a megadotttable_specification
táblázatsémának tartalmaznia kell a megadott oszlop általquery
visszaadott összes oszlopot, ellenkező esetben hibaüzenet jelenik meg. A lekérdezéskor megadotttable_specification
, de a visszaadottnull
értékek általquery
nem visszaadott oszlopok.Ez a záradék a Databricks SQL-ben létrehozott streamelési táblákhoz szükséges, de a Delta Live Tablesben nem szükséges. Ha ez a záradék nincs megadva a Delta Live Tablesben, hivatkoznia kell erre a táblára a DLT-folyamat egyik
APPLY CHANGES
parancsában. Lásd: Adatrögzítés módosítása az SQL-vel a Delta Live Tablesben.
A streamelő táblák és más táblák közötti különbségek
A streamelő táblák állapotalapú táblák, amelyek úgy vannak kialakítva, hogy az egyes sorokat csak egyszer kezeljék, amikor egyre növekvő adatkészletet dolgoz fel. Mivel a legtöbb adathalmaz folyamatosan nő az idő függvényében, a streamelési táblák a legtöbb betöltési számítási feladathoz jól használhatók. A streamelési táblák optimálisak olyan folyamatokhoz, amelyek adatfrissítést és alacsony késést igényelnek. A streamelési táblák nagy léptékű átalakításokhoz is hasznosak lehetnek, mivel az eredmények növekményesen kiszámíthatók az új adatok érkezésekor, így az eredmények naprakészek maradnak anélkül, hogy az összes forrásadatot teljesen újra kellene komponálnunk minden frissítéssel. A streamelési táblák csak hozzáfűző adatforrásokhoz készültek.
A streamelő táblák további parancsokat fogadnak el, például REFRESH
a lekérdezésben megadott forrásokban elérhető legfrissebb adatokat feldolgozó parancsokat. A megadott lekérdezés módosításai csak a korábban nem feldolgozott adatok meghívásával REFRESH
jelennek meg az új adatokon. A meglévő adatok módosításainak alkalmazásához végre kell hajtania REFRESH TABLE <table_name> FULL
egy FULL REFRESH
. A teljes frissítések újra feldolgozzák a forrásban elérhető összes adatot a legújabb definícióval. Nem ajánlott teljes frissítéseket meghívni olyan forrásokra, amelyek nem őrzik meg az adatok teljes előzményeit, vagy rövid megőrzési időszakuk van (például Kafka), mivel a teljes frissítés csonkolja a meglévő adatokat. Előfordulhat, hogy nem tudja helyreállítani a régi adatokat, ha az adatok már nem érhetők el a forrásban.
Korlátozások
Csak a táblatulajdonosok frissíthetik a streamelő táblákat a legújabb adatok lekéréséhez.
ALTER TABLE
parancsok nem engedélyezettek a streamelési táblákon. A tábla definícióját és tulajdonságait az utasítássalALTER STREAMING TABLE
kell módosítani.Az időutazási lekérdezések nem támogatottak.
A táblaséma fejlesztése DML-parancsokkal, például
INSERT INTO
, ésMERGE
nem támogatott.A streamelési táblákban a következő parancsok nem támogatottak:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
A deltamegosztás nem támogatott.
A tábla átnevezése vagy a tulajdonos módosítása nem támogatott.
Az olyan táblakorlátozások, mint például
PRIMARY KEY
FOREIGN KEY
a nem támogatottak.A létrehozott oszlopok, identitásoszlopok és alapértelmezett oszlopok nem támogatottak.
Példák
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
Kapcsolódó cikkek
Visszajelzés
https://aka.ms/ContentUserFeedback.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ:Visszajelzés küldése és megtekintése a következőhöz: