SKAPA DIREKTUPPSPELNINGSTABELL
Gäller för: Databricks SQL Databricks Runtime 13.3 LTS och senare
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion. Om du vill registrera dig för åtkomst fyller du i det här formuläret.
Skapar en strömmande tabell, en Delta-tabell med extra stöd för direktuppspelning eller inkrementell databearbetning.
Direktuppspelningstabeller stöds endast i Delta Live Tables och i Databricks SQL med Unity Catalog. Om du kör det här kommandot på Databricks Runtime-beräkning som stöds parsas endast syntaxen. Se Implementera en Delta Live Tables-pipeline med SQL.
Syntax
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( [ column_identifier column_type [ NOT NULL ]
[ COMMENT column_comment ] [ column_constraint ]
] [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] } [...]
Parametrar
UPPDATERA
Om det anges uppdaterar tabellen med de senaste tillgängliga data från källorna som definierats i frågan. Endast nya data som tas emot innan frågan startar bearbetas. Nya data som läggs till i källorna under körningen av kommandot ignoreras till nästa uppdatering.
OM INTE FINNS
Om det anges och det redan finns en tabell med samma namn ignoreras -instruktionen.
IF NOT EXISTS
kan inte användas tillsammans medREFRESH
, vilket innebär attCREATE OR REFRESH TABLE IF NOT EXISTS
det inte är tillåtet.-
Namnet på tabellen som ska skapas. Namnet får inte innehålla någon temporal specifikation. Om namnet inte är kvalificerat skapas tabellen i det aktuella schemat.
table_specification
Den här valfria satsen definierar listan över kolumner, deras typer, egenskaper, beskrivningar och kolumnbegränsningar.
Om du inte definierar kolumner i tabellschemat måste du ange
AS query
.-
Ett unikt namn för kolumnen.
-
Anger kolumnens datatyp .
INTE NULL
Om det anges accepterar
NULL
kolumnen inte värden.KOMMENTAR column_comment
En strängliteral som beskriver kolumnen.
-
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Lägger till en primärnyckel eller sekundärnyckelbegränsning i kolumnen i en strömmande tabell. Begränsningar stöds inte för tabeller i
hive_metastore
katalogen. BEGRÄNSNING expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | SLÄPP RAD } ]
Lägger till datakvalitetsförväntningar i tabellen. Dessa förväntningar på datakvalitet kan spåras över tid och nås via strömningstabellens händelselogg. En
FAIL UPDATE
förväntan gör att bearbetningen misslyckas när både tabellen skapas och tabellen uppdateras. EnDROP ROW
förväntan gör att hela raden tas bort om förväntningarna inte uppfylls.expectation_expr
kan bestå av literaler, kolumnidentifierare i tabellen och deterministiska, inbyggda SQL-funktioner eller operatorer förutom:- Mängdfunktioner
- Analysfönsterfunktioner
- Funktioner för rangordningsfönster
- Generatorfunktioner för tabellvärde
Får inte heller
expr
innehålla någon underfråga.- Mängdfunktioner
-
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Lägger till en informations primärnyckel eller informationsmässiga begränsningar för sekundärnyckel i en strömmande tabell. Viktiga begränsningar stöds inte för tabeller i
hive_metastore
katalogen.
-
-
table_clauses
Du kan också ange partitionering, kommentarer, användardefinierade egenskaper och ett uppdateringsschema för den nya tabellen. Varje undersats kan bara anges en gång.
-
En valfri lista över kolumner i tabellen som tabellen ska partitioneras efter.
KOMMENTAR table_comment
En
STRING
literal för att beskriva tabellen.-
Du kan också ange en eller flera användardefinierade egenskaper.
SCHEMA [ UPPDATERA ] CRON cron_string [ AT TIME ZONE timezone_id ]
Om det tillhandahålls schemalägger du strömningstabellen eller den materialiserade vyn för att uppdatera sina data med det angivna cron-schemat för quartz . Endast time_zone_values accepteras.
AT TIME ZONE LOCAL
stöds inte. OmAT TIME ZONE
den saknas används tidszonen för sessionen. OmAT TIME ZONE
är frånvarande och sessionens tidszon inte har angetts utlöses ett fel.SCHEDULE
är semantiskt likvärdigt medSCHEDULE REFRESH
.Du kan inte använda syntaxen
SCHEDULE
i en pipelinedefinition för Delta Live Tables.Satsen
SCHEDULE
tillåts inte i ettCREATE OR REFRESH
kommando. Schemat kan anges som en del avCREATE
kommandot. Använd ALTER STREAMING TABLE för att ändra schemat för en strömningstabell när du har skapat den.
-
-
Den här satsen fyller i tabellen med hjälp av data från
query
. Den här frågan måste vara en direktuppspelningsfråga . Detta kan uppnås genom att lägga till nyckelordet iSTREAM
valfri relation som du vill bearbeta stegvis. När du anger enquery
och entable_specification
tillsammans måste tabellschemat som anges itable_specification
innehålla alla kolumner som returneras avquery
, annars får du ett fel. Alla kolumner som anges itable_specification
men inte returneras avquery
returvärdennull
när du frågar.Den här satsen krävs för strömmande tabeller som skapats i Databricks SQL, men krävs inte i Delta Live Tables. Om den här satsen inte anges i Delta Live Tables måste du referera till den här tabellen i ett
APPLY CHANGES
kommando i DLT-pipelinen. Se Ändra datainsamling med SQL i Delta Live Tables.
Skillnader mellan strömmande tabeller och andra tabeller
Direktuppspelningstabeller är tillståndskänsliga tabeller som endast är utformade för att hantera varje rad en gång när du bearbetar en växande datauppsättning. Eftersom de flesta datauppsättningar växer kontinuerligt över tid är strömmande tabeller bra för de flesta inmatningsarbetsbelastningar. Strömmande tabeller är optimala för pipelines som kräver data färskhet och låg svarstid. Strömningstabeller kan också vara användbara för omfattande skalningstransformeringar, eftersom resultaten kan beräknas stegvis när nya data anländer, vilket håller resultaten uppdaterade utan att helt omkomplera alla källdata med varje uppdatering. Strömmande tabeller är utformade för datakällor som endast är tilläggstabeller.
Strömmande tabeller accepterar ytterligare kommandon, till exempel REFRESH
, som bearbetar de senaste data som är tillgängliga i de källor som anges i frågan. Ändringar i den angivna frågan återspeglas bara på nya data genom att anropa en REFRESH
, inte tidigare bearbetade data. Om du även vill tillämpa ändringarna på befintliga data måste du köra REFRESH TABLE <table_name> FULL
för att utföra en FULL REFRESH
. Fullständiga uppdateringar bearbetar om alla data som är tillgängliga i källan med den senaste definitionen. Vi rekommenderar inte att du anropar fullständiga uppdateringar på källor som inte behåller hela datahistoriken eller har korta kvarhållningsperioder, till exempel Kafka, eftersom den fullständiga uppdateringen trunkerar befintliga data. Du kanske inte kan återställa gamla data om data inte längre är tillgängliga i källan.
Begränsningar
Endast tabellägare kan uppdatera strömmande tabeller för att hämta de senaste data.
ALTER TABLE
kommandon tillåts inte för strömmande tabeller. Tabellens definition och egenskaper bör ändras via -instruktionenALTER STREAMING TABLE
.Frågor om tidsresor stöds inte.
Utveckla tabellschemat via DML-kommandon som
INSERT INTO
, ochMERGE
stöds inte.Följande kommandon stöds inte i strömmande tabeller:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
Deltadelning stöds inte.
Det går inte att byta namn på tabellen eller ändra ägaren.
Tabellbegränsningar som
PRIMARY KEY
ochFOREIGN KEY
stöds inte.Genererade kolumner, identitetskolumner och standardkolumner stöds inte.
Exempel
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');