Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Gilt für:
Databricks SQL
Erstellt eine Streamingtabelle. Das ist eine Delta-Tabelle mit zusätzlicher Unterstützung von Streaming oder inkrementeller Datenverarbeitung.
Streamingtabellen werden nur in Lakeflow Spark Declarative Pipelines und in Databricks SQL mit Unity Catalog unterstützt. Wenn dieser Befehl für unterstützte Databricks Runtime-Computeressourcen ausgeführt wird, wird nur die Syntax analysiert. Siehe Entwickeln von Lakeflow Spark Declarative Pipelines-Code mit SQL.
Syntax
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
COMMENT table_comment |
DEFAULT COLLATION UTF8_BINARY |
TBLPROPERTIES clause |
schedule |
WITH { ROW FILTER clause } } [...]
schedule
{ SCHEDULE [ REFRESH ] schedule_clause |
TRIGGER ON UPDATE [ AT MOST EVERY trigger_interval ] }
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ]}
Parameter
REFRESH
Bei Angabe dieser Option wird die Tabelle mit den neuesten Daten aktualisiert, die aus den in der Abfrage definierten Quellen verfügbar sind. Nur neue Daten, die vor dem Start der Abfrage eingehen, werden verarbeitet. Neue Daten, die den Quellen während der Ausführung des Befehls hinzugefügt werden, werden bis zur nächsten Aktualisierung ignoriert. Der Aktualisierungsvorgang aus CREATE OR REFRESH ist vollständig deklarativ. Wenn ein Aktualisierungsbefehl nicht alle Metadaten aus der ursprünglichen Tabellenerstellungsanweisung angibt, werden die nicht angegebenen Metadaten gelöscht.
WENN NICHT EXISTIERT
Erstellt die Streamingtabelle, sofern sie noch nicht vorhanden ist. Wenn bereits eine Tabelle mit diesem Namen vorhanden ist, wird die
CREATE STREAMING TABLE-Anweisung ignoriert.Sie können nur eines von
IF NOT EXISTSoderOR REFRESHangeben.-
Der Name der zu erstellenden Tabelle. Der Name darf keine zeitliche Spezifikation oder Optionsspezifikation enthalten. Wenn der Name ohne Qualifizierung angegeben wird, wird die Tabelle im aktuellen Schema erstellt.
Tabellenspezifikation
Diese optionale Klausel definiert die Liste der Spalten, deren Typen, Eigenschaften, Beschreibungen und Spalteneinschränkungen.
Wenn Sie keine Spalten im Tabellenschema definieren, müssen Sie
AS queryangeben.-
Ein eindeutiger Name für die Spalte.
-
Gibt den Datentyp der Spalte an.
NICHT NULL
Bei Angabe dieser Option akzeptiert die Spalte keine
NULL-Werte.KOMMENTAR column_comment
Ein Zeichenfolgenliteral zum Beschreiben der Spalte.
-
Wichtig
Dieses Feature befindet sich in der Public Preview.
Fügt der Spalte in einer Streamingtabelle eine Primärschlüssel- oder Fremdschlüsseleinschränkung hinzu. Einschränkungen werden nicht für Tabellen im
hive_metastore-Katalog unterstützt. -
Fügt eine Spaltenmaskierungsfunktion hinzu, um sensible Daten zu anonymisieren. Alle nachfolgenden Abfragen dieser Spalte erhalten das Ergebnis der Auswertung dieser Funktion anstelle des ursprünglichen Werts der Spalte. Dies kann für eine präzise Zugriffssteuerung nützlich sein, bei der die Funktion die Identität oder Gruppenmitgliedschaft der aufrufenden Benutzenden überprüfen kann, um zu entscheiden, ob der Wert zurückgezogen werden soll.
CONSTRAINT expectation_name ERWARTEN (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]
Fügt der Tabelle Datenqualitätserwartungen hinzu. Diese Erwartungen an die Datenqualität können im Laufe der Zeit nachverfolgt und über das Ereignisprotokoll der Streamingtabelle aufgerufen werden. Die Erwartung
FAIL UPDATEbewirkt, dass die Verarbeitung nicht erfolgreich ist, wenn die Tabelle sowohl erstellt als auch aktualisiert wird. Die ErwartungDROP ROWbewirkt, dass die gesamte Zeile gelöscht wird, wenn die Erwartung nicht erfüllt wird.expectation_exprkann aus Literalen, Spaltenbezeichnern innerhalb der Tabelle und deterministischen, integrierten SQL-Funktionen oder -Operatoren bestehen, mit Ausnahme von:-
Aggregatfunktionen
- Analysefensterfunktionen
- Rangfolgefensterfunktionen
- Tabellenwertgenerator-Funktionen
Darüber hinaus darf
exprkeine Unterabfrage enthalten.-
Aggregatfunktionen
-
Wichtig
Dieses Feature befindet sich in der Public Preview.
Fügt einer Streamingtabelle eine informative Primärschlüssel- oder Fremdschlüsseleinschränkung hinzu. Schlüsseleinschränkungen werden nicht für Tabellen im
hive_metastore-Katalog unterstützt.
-
-
Tabellenklauseln
Geben Sie optional Partitionierung, Kommentare, benutzerdefinierte Eigenschaften und einen Aktualisierungszeitplan für die neue Tabelle an. Jede Unterklausel kann nur einmal angegeben werden.
-
Eine optionale Liste der Spalten der Tabelle, nach denen die Tabelle partitioniert werden soll.
Hinweis
Flüssigkeitsclustering bietet eine flexible, optimierte Lösung für Clustering. Erwägen Sie die Verwendung von
CLUSTER BYanstelle vonPARTITIONED BYfür Streamingtabellen. -
Eine optionale Klausel zum Gruppieren nach einer Teilmenge von Spalten. Verwenden Sie automatische Flüssigclustering mit
CLUSTER BY AUTO, und Databricks wählt intelligent Clustering-Schlüssel aus, um die Abfrageleistung zu optimieren. Siehe Verwenden von Flüssigclustering für Tabellen.Flüssigkeitsclustering kann nicht mit
PARTITIONED BYkombiniert werden. KOMMENTAR table_comment
Ein
STRING-Literal zum Beschreiben der Tabelle.STANDARD-KOLLATIONIERUNG UTF8_BINARY
Gilt für:
Databricks SQL
Databricks Runtime 17.1 and aboveErzwingt die Standardsortierung der Streaming-Tabelle auf
UTF8_BINARY. Diese Klausel ist verpflichtend, wenn das Schema, in dem die Tabelle erstellt wird, eine andere Standardsortierung alsUTF8_BINARYaufweist. Die Standardkollation der Streamingtabelle wird als Standardkollation innerhalb vonquerysowie für Spaltentypen verwendet.-
Legt optional eine oder mehrere benutzerdefinierte Eigenschaften fest.
Verwenden Sie diese Einstellung, um den Laufzeitkanal lakeflow Spark Declarative Pipelines anzugeben, der zum Ausführen dieser Anweisung verwendet wird. Legen Sie den Wert der
pipelines.channel-Eigenschaft auf"PREVIEW"oder"CURRENT"fest. Standardwert:"CURRENT". Weitere Informationen zu Lakeflow Spark Declarative Pipelines-Kanälen finden Sie unter Lakeflow Spark Declarative Pipelines-Laufzeitkanäle. Zeitplan
Der Zeitplan kann entweder eine
SCHEDULEAnweisung oder eineTRIGGERAnweisung sein.ZEITPLAN [ REFRESH ] Planungsklausel
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }Verwenden Sie die
EVERY-Syntax, um eine Aktualisierung zu planen, die in regelmäßigen Abständen auftritt. Wenn dieEVERY-Syntax angegeben ist, wird die Streamingtabelle oder materialisierte Sicht basierend auf dem angegebenen Wert (z. B.HOUR,HOURS,DAY,DAYS,WEEKoderWEEKS) regelmäßig im angegebenen Intervall aktualisiert. In der folgenden Tabelle finden Sie zulässige ganzzahlige Werte fürnumber.Zeiteinheit Ganzzahliger Wert HOUR or HOURS1 <= H <= 72 DAY or DAYS1 <= D <= 31 WEEK or WEEKS1 <= W <= 8 Hinweis
Die Singular- und Pluralformen der enthaltenen Zeiteinheit sind semantisch gleichwertig.
CRON cron_string [ AT TIME ZONE timezone_id ]Zum Planen einer Aktualisierung mithilfe eines quartz cron-Werts. Gültige time_zone_values-Werte werden akzeptiert.
AT TIME ZONE LOCALwird nicht unterstützt.Fehlt
AT TIME ZONE, wird die Sitzungszeitzone verwendet. WennAT TIME ZONEfehlt und die Sitzungszeitzone nicht festgelegt ist, wird ein Fehler ausgelöst.SCHEDULEist semantisch äquivalent mitSCHEDULE REFRESH.
Der Zeitplan kann als Teil des
CREATE-Befehls bereitgestellt werden. Verwenden Sie ALTER STREAMING TABLE oder führen SieCREATE OR REFRESHBefehl mitSCHEDULEKlausel aus, um den Zeitplan einer Streamingtabelle nach der Erstellung zu ändern.AUSLÖSEN AUF UPDATE [ HÖCHSTENS JEDES trigger_interval ]
Wichtig
Das
TRIGGER ON UPDATEFeature befindet sich in der Betaversion.Legen Sie optional fest, dass die Tabelle aktualisiert wird, wenn eine upstream-Datenquelle mindestens einmal pro Minute aktualisiert wird. Legen Sie einen Wert fest, für
AT MOST EVERYden mindestens eine Mindestzeit zwischen Aktualisierungen erforderlich ist.Die upstream-Datenquellen müssen externe oder verwaltete Delta-Tabellen (einschließlich materialisierter Ansichten oder Streamingtabellen) oder verwaltete Ansichten sein, deren Abhängigkeiten auf unterstützte Tabellentypen beschränkt sind.
Durch das Aktivieren von Dateiereignissen können Trigger leistungsleistungsfähiger werden und einige der Grenzwerte für Triggerupdates erhöht werden.
Dies
trigger_intervalist eine INTERVAL-Anweisung , die mindestens 1 Minute beträgt.TRIGGER ON UPDATEhat die folgenden Einschränkungen:- Nicht mehr als 10 upstream-Datenquellen pro Streamingtabelle bei Verwendung von TRIGGER ON UPDATE.
- Maximal 1000 Streamingtabellen oder materialisierte Ansichten können mit TRIGGER ON UPDATEangegeben werden.
- Die
AT MOST EVERYKlausel ist standardmäßig auf 1 Minute festgelegt und darf nicht kleiner als 1 Minute sein.
-
WITH ROW FILTER-Klausel
Fügt der Tabelle eine Zeilenfilterfunktion hinzu. Alle nachfolgenden Abfragen aus dieser Tabelle erhalten eine Teilmenge der Zeilen, bei denen die Funktion zu boolean TRUE ausgewertet wird. Dies kann für eine präzise Zugriffssteuerung nützlich sein, bei der die Funktion die Identität oder Gruppenmitgliedschaft der aufrufenden Benutzenden überprüfen kann, um zu entscheiden, ob bestimmte Zeilen gefiltert werden sollen.
-
Diese Klausel füllt die Tabelle mit den Daten aus
queryauf. Diese Abfrage muss eine Streamingabfrage sein. Dies kann erreicht werden, indem Sie jeder Beziehung, die Sie inkrementell verarbeiten möchten, das SchlüsselwortSTREAMhinzufügen. Bei gemeinsamer Angabe vonqueryundtable_specificationmuss das intable_specificationangegebene Tabellenschema alle vonqueryzurückgegebenen Spalten enthalten. Andernfalls tritt ein Fehler auf. Alle Spalten, die intable_specificationangegeben, aber nicht vonqueryzurückgegeben wurden, geben bei der Abfragenull-Werte zurück.
Unterschiede zwischen Streamingtabellen und anderen Tabellen
Streamingtabellen sind zustandsbehaftete Tabellen, die so konzipiert sind, dass jede Zeile nur einmal verarbeitet wird, während Sie ein wachsendes Dataset verarbeiten. Da die meisten Datasets im Laufe der Zeit kontinuierlich wachsen, eignen sich Streamingtabellen für die meisten Erfassungsworkloads. Streamingtabellen sind optimal für Pipelines, die Datenfrische und geringe Wartezeiten erfordern. Streamingtabellen können auch für Transformationen im großen Stil nützlich sein, da die Ergebnisse beim Eintreffen neuer Daten inkrementell berechnet werden können, um die Ergebnisse auf dem neuesten Stand zu halten, ohne dass alle Quelldaten mit jedem Update vollständig neu verarbeitet werden müssen. Streaming-Tabellen sind für Datenquellen konzipiert, die nur hinzugefügt werden.
Streamingtabellen akzeptieren zusätzliche Befehle wie REFRESH. Mit diesem Befehl werden die neuesten Daten verarbeitet, die in den in der Abfrage angegebenen Quellen verfügbar sind. Änderungen an der angegebenen Abfrage werden nur in neuen Daten berücksichtigt, indem REFRESH aufgerufen wird, aber nicht in zuvor verarbeiteten Daten. Sollen die Änderungen auch auf vorhandene Daten angewendet werden, müssen Sie REFRESH TABLE <table_name> FULL ausführen, um eine Aktualisierung (FULL REFRESH) zu erreichen. Vollständige Aktualisierungen verarbeiten alle in der Quelle verfügbaren Daten mit der neuesten Definition erneut. Es wird nicht empfohlen, vollständige Aktualisierungen für Quellen aufzurufen, die den gesamten Verlauf der Daten nicht beibehalten oder kurze Aufbewahrungsfristen haben, z. B. Kafka, da die vollständige Aktualisierung die vorhandenen Daten abschneidet. Möglicherweise können Sie alte Daten nicht wiederherstellen, wenn die Daten in der Quelle nicht mehr verfügbar sind.
Zeilenfilter und Spaltenmasken
Mit Zeilenfiltern können Sie eine Funktion angeben, die als Filter gilt, wenn ein Tabellenscan Zeilen abruft. Mit diesen Filtern können Sie sicherstellen, dass nachfolgende Abfragen nur Zeilen zurückgeben, für die das Filterprädikat zu TRUE ausgewertet wird.
Mit Spaltenmasken können Sie die Werte einer Spalte maskieren, wann immer ein Tabellenscan Zeilen abruft. Alle zukünftigen Abfragen, die diese Spalte einbeziehen, erhalten das Ergebnis der Auswertung der Funktion über die Spalte, wobei der ursprüngliche Wert der Spalte ersetzt wird.
Weitere Informationen zur Verwendung von Zeilenfiltern und Spaltenformaten finden Sie unter Zeilenfilter und Spaltenformate.
Verwalten von Zeilenfiltern und Spaltenmasken
Zeilenfilter und Spaltenmasken für Streamingtabellen sollten der CREATE OR REFRESH-Anweisung hinzugefügt, aktualisiert oder gelöscht werden.
Verhalten
-
Als Definer aktualisieren: Wenn die
CREATE OR REFRESHoderREFRESHAnweisungen eine Streamingtabelle aktualisieren, werden Zeilenfilterfunktionen mit den Rechten des Definers (als Tabellenbesitzer) ausgeführt. Dies bedeutet, dass die Tabellenaktualisierung den Sicherheitskontext des Benutzers verwendet, der die Streamingtabelle erstellt hat. -
Abfrage: Während die meisten Filter mit den Rechten des Definers ausgeführt werden, sind Funktionen, die den Benutzerkontext (z
CURRENT_USER. B. undIS_MEMBER) überprüfen, Ausnahmen. Diese Funktionen werden als Aufrufer ausgeführt. Dieser Ansatz erzwingt benutzerspezifische Datensicherheit und Zugriffssteuerungen basierend auf dem Kontext des aktuellen Benutzers.
Beobachtbarkeit
Verwenden Sie DESCRIBE EXTENDED, INFORMATION_SCHEMA oder den Katalog-Explorer, um die vorhandenen Zeilenfilter und Spaltenmasken zu untersuchen, die für eine bestimmte Streamingtabelle gelten. Mit dieser Funktionalität können Benutzende Datenzugriffs- und Schutzmaßnahmen für Streamingtabellen überwachen und überprüfen.
Einschränkungen
- Nur Tabellenbesitzende können Streamingtabellen aktualisieren, um die neuesten Daten abzurufen.
-
ALTER TABLE-Befehle sind für Streamingtabellen nicht zulässig. Die Definition und die Eigenschaften der Tabelle sollten durch dieCREATE OR REFRESH- oder ALTER STREAMING TABLE-Anweisung geändert werden. - Die Weiterentwicklung des Tabellenschemas durch DML-Befehle wie
INSERT INTOundMERGEwird nicht unterstützt. - Die folgenden Befehle werden für Streamingtabellen nicht unterstützt:
CREATE TABLE ... CLONE <streaming_table>COPY INTOANALYZE TABLERESTORETRUNCATEGENERATE MANIFEST[CREATE OR] REPLACE TABLE
- Delta Sharing wird nicht unterstützt.
- Das Umbenennen der Tabelle oder das Ändern des Besitzers bzw. der Besitzerin wird nicht unterstützt.
- Tabelleneinschränkungen wie
PRIMARY KEYundFOREIGN KEYwerden für Streamingtabellen imhive_metastore-Katalog nicht unterstützt. - Generierte Spalten, Identitätsspalten und Standardspalten werden nicht unterstützt.
Beispiele
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Creates a streaming table that scheduled to refresh when upstream data is updated.
-- The refresh frequency of triggered_data is at most once an hour.
> CREATE STREAMING TABLE triggered_data
TRIGGER ON UPDATE AT MOST EVERY INTERVAL 1 hour
AS SELECT *
FROM STREAM source_stream_data;
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE EVERY 1 HOUR
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM STREAM sales;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')