CREATE STREAMING TABLE
Gilt für: Databricks SQL Databricks Runtime 13.3 LTS und höher
Wichtig
Dieses Feature befindet sich in der Public Preview.
Erstellt eine Streamingtabelle. Das ist eine Delta-Tabelle mit zusätzlicher Unterstützung von Streaming oder inkrementeller Datenverarbeitung.
Streamingtabellen werden nur in Delta Live Tables und in Databricks SQL mit Unity Catalog unterstützt. Wenn dieser Befehl für unterstützte Databricks Runtime-Computeressourcen ausgeführt wird, wird nur die Syntax analysiert. Weitere Informationen finden Sie unter Implementieren einer Delta Live Tables-Pipeline mit SQL.
Syntax
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] |
WITH { ROW FILTER clause } } [...]
Parameter
REFRESH
Bei Angabe dieser Option wird die Tabelle mit den neuesten Daten aktualisiert, die aus den in der Abfrage definierten Quellen verfügbar sind. Nur neue Daten, die vor dem Start der Abfrage eingehen, werden verarbeitet. Neue Daten, die den Quellen während der Ausführung des Befehls hinzugefügt werden, werden bis zur nächsten Aktualisierung ignoriert.
IF NOT EXISTS
Wenn dies angegeben wird und bereits eine Tabelle gleichen Namens vorhanden ist, wird die Anweisung ignoriert.
IF NOT EXISTS
kann nicht zusammen mitREFRESH
verwendet werden, was bedeutet, dassCREATE OR REFRESH TABLE IF NOT EXISTS
nicht zulässig ist.-
Der Name der zu erstellenden Tabelle. Der Name darf keine temporale Spezifikation enthalten. Wenn der Name ohne Qualifizierung angegeben wird, wird die Tabelle im aktuellen Schema erstellt.
table_specification
Diese optionale Klausel definiert die Liste der Spalten, deren Typen, Eigenschaften, Beschreibungen und Spalteneinschränkungen.
Wenn Sie keine Spalten im Tabellenschema definieren, müssen Sie
AS query
angeben.-
Ein eindeutiger Name für die Spalte.
-
Gibt den Datentyp der Spalte an.
NOT NULL
Bei Angabe dieser Option akzeptiert die Spalte keine
NULL
-Werte.COMMENT column_comment
Ein Zeichenfolgenliteral zum Beschreiben der Spalte.
-
Wichtig
Dieses Feature befindet sich in der Public Preview.
Fügt der Spalte in einer Streamingtabelle eine Primärschlüssel- oder Fremdschlüsseleinschränkung hinzu. Einschränkungen werden nicht für Tabellen im
hive_metastore
-Katalog unterstützt. -
Wichtig
Dieses Feature befindet sich in der Public Preview.
Fügt eine Spaltenmaskierungsfunktion hinzu, um vertrauliche Zeichenfolgenwerte zu anonymisieren. Alle zukünftigen Abfragen aus dieser Spalte erhalten das Ergebnis der Auswertung dieser Funktion über der Spalte anstelle des ursprünglichen Werts der Spalte. Dies kann für eine fein abgestufte Zugriffssteuerung nützlich sein, bei der die Funktion die Identität oder Gruppenmitgliedschaften der aufrufenden Benutzer überprüfen kann, um zu entscheiden, ob der Wert zurückgezogen werden soll.
CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]
Fügt der Tabelle Datenqualitätserwartungen hinzu. Diese Datenqualitätserwartungen können im Laufe der Zeit nachverfolgt werden, und der Zugriff darauf ist über das Ereignisprotokoll der Streamingtabelle möglich. Die Erwartung
FAIL UPDATE
bewirkt, dass die Verarbeitung nicht erfolgreich ist, wenn die Tabelle sowohl erstellt als auch aktualisiert wird. Die ErwartungDROP ROW
bewirkt, dass die gesamte Zeile gelöscht wird, wenn die Erwartung nicht erfüllt wird.expectation_expr
kann aus Literalen, Spaltenbezeichnern innerhalb der Tabelle und deterministischen, integrierten SQL-Funktionen oder -Operatoren bestehen, mit Ausnahme von:- Aggregatfunktionen
- Analysefensterfunktionen
- Fensterrangfunktionen
- Tabellenwertgenerator-Funktionen
Darüber hinaus darf
expr
keine Unterabfrage enthalten.- Aggregatfunktionen
-
Wichtig
Dieses Feature befindet sich in der Public Preview.
Fügt einer Streamingtabelle eine informative Primärschlüssel- oder Fremdschlüsseleinschränkung hinzu. Schlüsseleinschränkungen werden nicht für Tabellen im
hive_metastore
-Katalog unterstützt.
-
-
table_clauses
Geben Sie optional Partitionierung, Kommentare, benutzerdefinierte Eigenschaften und einen Aktualisierungszeitplan für die neue Tabelle an. Jede Unterklausel kann nur einmal angegeben werden.
-
Eine optionale Liste der Spalten der Tabelle, nach denen die Tabelle partitioniert werden soll.
COMMENT table_comment
Ein
STRING
-Literal zum Beschreiben der Tabelle.-
Legt optional eine oder mehrere benutzerdefinierte Eigenschaften fest.
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ]
Falls angegeben, legt dieser Parameter fest, dass die Daten der Streamingtabelle oder der materialisierten Sicht nach dem angegebenen quarz cron-Zeitplan aktualisiert werden. Es werden nur time_zone_values akzeptiert.
AT TIME ZONE LOCAL
wird nicht unterstützt. FehltAT TIME ZONE
, wird die Sitzungszeitzone verwendet. WennAT TIME ZONE
fehlt und die Sitzungszeitzone nicht festgelegt ist, wird ein Fehler ausgelöst.SCHEDULE
ist semantisch äquivalent mitSCHEDULE REFRESH
.Sie können die Syntax
SCHEDULE
nicht in einer Delta Live Tables-Pipelinedefinition verwenden.Die
SCHEDULE
-Klausel ist in einemCREATE OR REFRESH
-Befehl nicht zulässig. Der Zeitplan kann als Teil desCREATE
-Befehls bereitgestellt werden. Verwenden Sie ALTER STREAMING TABLE, um den Zeitplan einer Streamingtabelle nach der Erstellung zu ändern.WITH ROW FILTER-Klausel
Wichtig
Dieses Feature befindet sich in der Public Preview.
Fügt der Tabelle eine Zeilenfilterfunktion hinzu. Alle zukünftigen Abfragen aus dieser Tabelle erhalten eine Teilmenge der Zeilen, für die die Funktion als boolescher TRUE-Wert ausgewertet wird. Dies kann für eine fein abgestufte Zugriffssteuerung nützlich sein, bei der die Funktion die Identität oder Gruppenmitgliedschaften der aufrufenden Benutzer*innen überprüfen kann, um zu entscheiden, ob bestimmte Spalten gefiltert werden sollen.
-
AS query
Diese Klausel füllt die Tabelle mit den Daten aus
query
auf. Diese Abfrage muss eine Streamingabfrage sein. Dies kann erreicht werden, indem Sie jeder Beziehung, die Sie inkrementell verarbeiten möchten, das SchlüsselwortSTREAM
hinzufügen. Bei gemeinsamer Angabe vonquery
undtable_specification
muss das intable_specification
angegebene Tabellenschema alle vonquery
zurückgegebenen Spalten enthalten. Andernfalls tritt ein Fehler auf. Alle Spalten, die intable_specification
angegeben, aber nicht vonquery
zurückgegeben wurden, geben bei der Abfragenull
-Werte zurück.Diese Klausel ist für Streamingtabellen erforderlich, die in Databricks SQL erstellt wurden. In Delta Live Tables ist sie dagegen nicht erforderlich. Wenn diese Klausel in Delta Live Tables nicht angegeben wird, müssen Sie auf diese Tabelle in einem
APPLY CHANGES
-Befehl in Ihrer DLT-Pipeline verweisen. Weitere Informationen finden Sie unter Change data capture with SQL in Delta Live Tables.
Unterschiede zwischen Streamingtabellen und anderen Tabellen
Streamingtabellen sind zustandsbehaftete Tabellen, die so konzipiert sind, dass jede Zeile nur einmal verarbeitet wird, während Sie ein wachsendes Dataset verarbeiten. Da die meisten Datasets im Laufe der Zeit kontinuierlich wachsen, eignen sich Streamingtabellen für die meisten Erfassungsworkloads. Streamingtabellen sind optimal für Pipelines, die Datenfrische und geringe Wartezeiten erfordern. Streamingtabellen können auch für Transformationen im großen Stil nützlich sein, da die Ergebnisse beim Eintreffen neuer Daten inkrementell berechnet werden können, um die Ergebnisse auf dem neuesten Stand zu halten, ohne dass alle Quelldaten mit jedem Update vollständig neu verarbeitet werden müssen. Streamingtabellen sind für Datenquellen mit reinem Anfügen konzipiert.
Streamingtabellen akzeptieren zusätzliche Befehle wie REFRESH
. Mit diesem Befehl werden die neuesten Daten verarbeitet, die in den in der Abfrage angegebenen Quellen verfügbar sind. Änderungen an der angegebenen Abfrage werden nur in neuen Daten berücksichtigt, indem REFRESH
aufgerufen wird, aber nicht in zuvor verarbeiteten Daten. Sollen die Änderungen auch auf vorhandene Daten angewendet werden, müssen Sie REFRESH TABLE <table_name> FULL
ausführen, um eine vollständige Aktualisierung (FULL REFRESH
) zu erreichen. Vollständige Aktualisierungen verarbeiten alle in der Quelle verfügbaren Daten mit der neuesten Definition erneut. Es wird nicht empfohlen, vollständige Aktualisierungen für Quellen aufzurufen, die nicht den gesamten Datenverlauf beibehalten oder kurze Aufbewahrungszeiträume aufweisen (z. B. Kafka), da durch eine vollständige Aktualisierung die vorhandenen Daten abgeschnitten werden. Möglicherweise können Sie alte Daten nicht wiederherstellen, wenn die Daten in der Quelle nicht mehr verfügbar sind.
Zeilenfilter und Spaltenmasken
Wichtig
Dieses Feature befindet sich in der Public Preview.
Mit Zeilenfiltern können Sie eine Funktion angeben, die als Filter gilt, wenn ein Tabellenscan Zeilen abruft. Mit diesen Filtern können Sie sicherstellen, dass nachfolgende Abfragen nur Zeilen zurückgeben, für die das Filterprädikat zu TRUE ausgewertet wird.
Mit Spaltenmasken können Sie die Werte einer Spalte immer filtern, wenn ein Tabellenscan Zeilen abruft. Alle zukünftigen Abfragen dieser Spalte erhalten das Ergebnis der Auswertung dieser Funktion für die Spalte, das den ursprünglichen Spaltenwert ersetzt.
Weitere Informationen zur Verwendung von Zeilenfiltern und Spaltenmasken finden Sie unter Filtern vertraulicher Tabellendaten mit Zeilenfiltern und Spaltenmasken.
Verwalten von Zeilenfiltern und Spaltenmasken
Zeilenfilter und Spaltenmasken für Streamingtabellen sollten der CREATE OR REFRESH
-Anweisung hinzugefügt, aktualisiert oder gelöscht werden.
Behavior
- Als Definierer aktualisieren: Wenn die
CREATE OR REFRESH
- oderREFRESH
-Anweisungen eine Streamingtabelle aktualisieren, werden Zeilenfilterfunktionen mit den Berechtigungen des Definierers (als Tabellenbesitzer) ausgeführt. Dies bedeutet, dass die Tabellenaktualisierung den Sicherheitskontext des Benutzers verwendet, der die Streamingtabelle erstellt hat. - Abfrage: Während die meisten Filter mit den Berechtigungen des Definierers ausgeführt werden, sind Funktionen wie
CURRENT_USER
undIS_MEMBER
, die den Benutzerkontext überprüfen, die Ausnahme. Diese Funktionen werden als Aufrufer ausgeführt. Durch diese Vorgehensweise werden benutzerspezifische Datensicherheit und Zugriffssteuerungen basierend auf dem Kontext des aktuellen Benutzers erzwungen.
Einblick
Verwenden Sie DESCRIBE EXTENDED
, INFORMATION_SCHEMA
oder den Katalog-Explorer, um die vorhandenen Zeilenfilter und Spaltenmasken zu untersuchen, die für eine bestimmte Streamingtabelle gelten. Mit dieser Funktionalität können Benutzer Datenzugriffs- und Schutzmaßnahmen für Streamingtabellen überwachen und überprüfen.
Begrenzungen
Nur Tabellenbesitzer*innen können Streamingtabellen aktualisieren, um die neuesten Daten abzurufen.
ALTER TABLE
-Befehle sind für Streamingtabellen nicht zulässig. Die Definition und die Eigenschaften der Tabelle sollten über dieCREATE OR REFRESH
- oderALTER STREAMING TABLE
-Anweisung geändert werden.Zeitreiseabfragen werden nicht unterstützt.
Die Weiterentwicklung des Tabellenschemas durch DML-Befehle wie
INSERT INTO
undMERGE
wird nicht unterstützt.Folgende Befehle werden für Streamingtabellen nicht unterstützt:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
Delta Sharing wird nicht unterstützt.
Das Umbenennen der Tabelle oder das Ändern des Besitzers bzw. der Besitzerin wird nicht unterstützt.
Tabelleneinschränkungen wie
PRIMARY KEY
undFOREIGN KEY
werden nicht unterstützt.Generierte Spalten, Identitätsspalten und Standardspalten werden nicht unterstützt.
Beispiele
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')
Verwandte Artikel
Feedback
https://aka.ms/ContentUserFeedback.
Bald verfügbar: Im Laufe des Jahres 2024 werden wir GitHub-Issues stufenweise als Feedbackmechanismus für Inhalte abbauen und durch ein neues Feedbacksystem ersetzen. Weitere Informationen finden Sie unterFeedback senden und anzeigen für