Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Databricks empfiehlt die Verwendung von Streamingtabellen zum Erfassen von Daten mit Databricks SQL. Eine Streamingtabelle ist eine bei Unity Catalog registrierte Tabelle mit zusätzlicher Unterstützung für Streaming oder inkrementelle Datenverarbeitung. Für jede Streamingtabelle wird automatisch eine Pipeline erstellt. Sie können Streamingtabellen für das inkrementelle Laden von Daten von Kafka und vom Cloudobjektspeicher verwenden.
Hinweis
Informationen darüber, wie Delta Lake-Tabellen als Streamingquellen und Streaming-Senken genutzt werden, finden Sie unter Delta-Tabellen-Streaming: Lese- und Schreibvorgänge.
Anforderungen
Um Streamingtabellen zu verwenden, müssen Sie die folgenden Anforderungen erfüllen.
Anforderungen an den Arbeitsbereich:
Streamingtabellen, die in Databricks SQL erstellt wurden, werden von serverlosen deklarativen Lakeflow-Pipelines unterstützt. Ihr Arbeitsbereich muss serverlose Pipelines unterstützen, um diese Funktionalität zu verwenden.
- Ein Azure Databricks-Konto mit aktiviertem serverlosen Konto. Weitere Informationen finden Sie unter Aktivieren von serverlosen SQL-Warehouses.
- Ein Arbeitsbereich, für den Unity Catalog aktiviert ist. Weitere Informationen finden Sie unter "Erste Schritte mit Unity-Katalog".
Computeanforderungen:
Sie müssen eine der folgenden Voraussetzungen verwenden:
- Ein SQL-Warehouse, das den
Current
-Kanal verwendet. - Berechnungen mit dem Standardzugriffsmodus (früher gemeinsam genutzter Zugriffsmodus) auf Databricks Runtime 13.3 LTS oder höher.
Berechnen Sie mit dem dedizierten Zugriffsmodus (früher Einzelbenutzer-Zugriffsmodus) auf Databricks Runtime 15.4 LTS oder höher.
Unter Databricks Runtime 15.3 und unten können Sie keine dedizierte Berechnung verwenden, um Streamingtabellen abzufragen, die im Besitz anderer Benutzer sind. Sie können dedizierte Berechnungskapazität auf Databricks Runtime 15.3 und früheren Versionen nur verwenden, wenn Sie die Streamingtabelle besitzen. Der Ersteller der Tabelle ist der Besitzer.
Databricks Runtime 15.4 LTS und höher unterstützen das Abfragen von Tabellen, die durch Lakeflow Declarative Pipelines erzeugt wurden, auf einem dedizierten Compute, auch wenn Sie nicht der Tabellenbesitzer sind. Sie könnten für serverlose Rechnerressourcen belastet werden, wenn Sie dedizierte Rechnerkapazitäten verwenden, um Datenfiltervorgänge auszuführen. Siehe Feingranulare Zugriffssteuerung auf dedizierten Rechenressourcen.
Berechtigungsanforderungen:
- Die Berechtigungen
USE CATALOG
undUSE SCHEMA
für den Katalog und das Schema, wo Sie die Streamingtabelle erstellen. - Die
CREATE TABLE
-Berechtigung für das Schema, in dem Sie die Streamingtabelle erstellen. - Berechtigungen für den Zugriff auf die Tabellen oder Speicherorte, die die Quelldaten für Ihre Streamingtabelle bereitstellen.
Erstellen von Streamingtabellen
Eine Streamingtabelle wird durch eine SQL-Abfrage in Databricks SQL definiert. Wenn Sie eine Streamingtabelle erstellen, werden die daten, die sich derzeit in den Quelltabellen befinden, verwendet, um die Streamingtabelle zu erstellen. Danach aktualisieren Sie die Tabelle, in der Regel nach einem Zeitplan, um alle hinzugefügten Daten in den Quelltabellen abzurufen, um sie an die Streamingtabelle anzufügen.
Wenn Sie eine Streamingtabelle erstellen, werden Sie als Besitzer der Tabelle betrachtet.
Um eine Streamingtabelle aus einer vorhandenen Tabelle zu erstellen, verwenden Sie die CREATE STREAMING TABLE Anweisung wie im folgenden Beispiel:
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT product, price FROM STREAM raw_data;
In diesem Fall wird die Streamingtabelle sales
aus bestimmten Spalten der raw_data
Tabelle mit einem Zeitplan erstellt, der jede Stunde aktualisiert werden soll. Die verwendete Abfrage muss eine Streamingabfrage sein. Verwenden Sie das STREAM
Schlüsselwort, um Streamingsemantik zum Lesen aus der Quelle zu verwenden.
Wenn Sie eine Streamingtabelle mit der CREATE OR REFRESH STREAMING TABLE
Anweisung erstellen, beginnen die anfängliche Datenaktualisierung und -population sofort. Diese Vorgänge verbrauchen keine DBSQL Warehouse Compute. Stattdessen basieren Streamingtabellen auf serverlosen, Lakeflow-deklarativen Pipelines zur Erstellung und Aktualisierung. Eine dedizierte serverlose Pipeline wird automatisch vom System für jede Streamingtabelle erstellt und verwaltet.
Laden von Dateien mit automatischem Ladeprogramm
Um eine Streamingtabelle aus Dateien in einem Volume zu erstellen, verwenden Sie das automatische Laden. Verwenden Sie "Auto Loader" mit "Lakeflow Declarative Pipelines" für die meisten Datenaufnahmeaufgaben aus dem Cloudobjektspeicher. Auto Loader und Lakeflow Declarative Pipelines sind so konzipiert, dass sie fortlaufend und idempotent ständig wachsende Daten laden, sobald diese im Cloudspeicher eintreffen.
Verwenden Sie die read_files
Funktion, um auto Loader in Databricks SQL zu verwenden. Die folgenden Beispiele zeigen die Verwendung von Auto Loader zum Lesen eines Volumens von JSON-Dateien in eine Streamingtabelle:
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/path/to/data",
format => "json"
);
Um Daten aus cloudbasiertem Speicher zu lesen, können Sie auch das automatische Laden verwenden:
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
Weitere Informationen zum automatischen Laden finden Sie unter "Was ist auto Loader?" Weitere Informationen zur Verwendung des automatischen Ladeprogramms in SQL mit Beispielen finden Sie unter Laden von Daten aus objektspeicher.
Erfassung per Streaming aus anderen Quellen
Ein Beispiel für die Erfassung aus anderen Quellen, einschließlich Kafka, finden Sie unter Laden von Daten mit Lakeflow Declarative Pipelines.
Nur neue Daten aufnehmen
Standardmäßig liest die read_files
-Funktion alle vorhandenen Daten im Quellverzeichnis während der Tabellenerstellung und verarbeitet dann neu eingehende Datensätze mit jeder Aktualisierung.
Um das Aufnehmen von Daten zu vermeiden, die zum Zeitpunkt der Tabellenerstellung bereits im Quellverzeichnis vorhanden sind, legen Sie die Option includeExistingFiles
auf false
fest. Dies bedeutet, dass nur Daten verarbeitet werden, die nach der Tabellenerstellung im Verzeichnis eingehen. Beispiel:
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
'/path/to/files',
includeExistingFiles => false
);
Festlegen des Laufzeitkanals
Streamingtabellen, die mit SQL-Lagerhäusern erstellt wurden, werden automatisch mithilfe einer Pipeline aktualisiert. Lakeflow Declarative Pipelines verwenden standardmäßig die Laufzeit im current
Kanal. Lesen Sie die Versionshinweise zu Lakeflow Declarative Pipelines und den Releaseupgradeprozess , um mehr über den Veröffentlichungsprozess zu erfahren.
Databricks empfiehlt die Verwendung des current
Kanals für Produktionsworkloads. Neue Features werden zuerst im preview
Kanal veröffentlicht. Sie können eine Pipeline auf den Vorschaukanal von Lakeflow Declarative Pipelines setzen, um neue Features zu testen, indem Sie preview
als Tabelleneigenschaft angeben. Sie können diese Eigenschaft angeben, wenn Sie die Tabelle erstellen oder nachdem die Tabelle mithilfe einer ALTER-Anweisung erstellt wurde.
Das folgende Codebeispiel zeigt, wie Sie den Kanal in einer CREATE-Anweisung auf eine Vorschau festlegen:
CREATE OR REFRESH STREAMING TABLE sales
TBLPROPERTIES ('pipelines.channel' = 'preview')
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM raw_data;
Ausblenden vertraulicher Daten
Von Bedeutung
Dieses Feature befindet sich in der Public Preview.
Sie können Streamingtabellen verwenden, um vertrauliche Daten von Benutzern auszublenden, die auf die Tabelle zugreifen. Ein Ansatz besteht darin, die Abfrage so zu definieren, dass vertrauliche Spalten oder Zeilen vollständig ausgeschlossen werden. Alternativ können Sie Spaltenmasken oder Zeilenfilter basierend auf den Berechtigungen des Abfragebenutzers anwenden. Beispielsweise könnten Sie die tax_id
Spalte für Benutzer ausblenden, die sich nicht in der Gruppe HumanResourcesDept
befinden. Verwenden Sie dazu die ROW FILTER
und MASK
Syntax während der Erstellung der Streamingtabelle. Weitere Informationen finden Sie unter Zeilenfilter und Spaltenmasken.
Aktualisieren einer Streamingtabelle
Aktualisierungen können automatisch geplant werden, wenn Sie die Streamingtabelle erstellen. Sie können Streamingtabellen auch manuell aktualisieren. Auch wenn Sie über eine geplante Aktualisierung verfügen, können Sie jederzeit eine manuelle Aktualisierung aufrufen. Aktualisierungen werden von derselben Pipeline behandelt, die automatisch zusammen mit der Streamingtabelle erstellt wurde.
So aktualisieren Sie eine Streamingtabelle:
REFRESH STREAMING TABLE sales;
Sie können den Status der neuesten Aktualisierung mit DESCRIBE TABLE EXTENDED überprüfen.
Hinweis
Nur der Tabellenbesitzer kann eine Streamingtabelle aktualisieren, um die neuesten Daten abzurufen. Der Benutzer, der die Tabelle erstellt, ist der Besitzer, und der Besitzer kann nicht geändert werden. Möglicherweise müssen Sie Ihre Streamingtabelle aktualisieren, bevor Sie Zeitreise-Abfragen verwenden.
Funktionsweise der Aktualisierung
Eine Aktualisierung einer Streamingtabelle wertet nur neue Zeilen aus, die seit der letzten Aktualisierung eingegangen sind, und fügt nur die neuen Daten an.
Jede Aktualisierung verwendet die aktuelle Definition der Streamingtabelle, um diese neuen Daten zu verarbeiten. Das Ändern einer Streamingtabellendefinition berechnet vorhandene Daten nicht automatisch neu. Wenn eine Änderung nicht mit vorhandenen Daten kompatibel ist (z. B. ändern eines Datentyps), schlägt die nächste Aktualisierung mit einem Fehler fehl.
In den folgenden Beispielen wird erläutert, wie sich Änderungen an einer Streamingtabellendefinition auf das Aktualisierungsverhalten auswirken:
- Durch das Entfernen eines Filters werden zuvor gefilterte Zeilen nicht erneut verarbeitet.
- Das Ändern von Spaltenprojektionen wirkt sich nicht darauf aus, wie vorhandene Daten verarbeitet wurden.
- Verknüpfungen mit statischen Momentaufnahmen verwenden den Momentaufnahmezustand zum Zeitpunkt der anfänglichen Verarbeitung. Spät ankommende Daten, die mit der aktualisierten Momentaufnahme übereinstimmen würden, werden ignoriert. Dies kann dazu führen, dass Fakten ignoriert werden, wenn Dimensionen verspätet sind.
- Das Ändern des CAST einer vorhandenen Spalte führt zu einem Fehler.
Wenn sich Ihre Daten auf eine Weise ändern, die in der vorhandenen Streamingtabelle nicht unterstützt werden kann, können Sie eine vollständige Aktualisierung durchführen.
Vollständiges Aktualisieren einer Streamingtabelle
Vollständige Aktualisierungen verarbeiten alle in der Quelle verfügbaren Daten mit der neuesten Definition erneut. Es wird nicht empfohlen, vollständige Aktualisierungen für Quellen aufzurufen, die den gesamten Verlauf der Daten nicht beibehalten oder kurze Aufbewahrungsfristen haben, z. B. Kafka, da die vollständige Aktualisierung die vorhandenen Daten abschneidet. Möglicherweise können Sie alte Daten nicht wiederherstellen, wenn die Daten in der Quelle nicht mehr verfügbar sind.
Beispiel:
REFRESH STREAMING TABLE sales FULL;
Ändern des Zeitplans für eine Streamingtabelle
Sie können einen automatischen Aktualisierungszeitplan für Ihre Streamingtabelle ändern (oder festlegen). Die folgenden Beispiele zeigen Ihnen, wie Sie einen Zeitplan mithilfe von ALTER STREAMING TABLE erstellen:
ALTER STREAMING TABLE sales
ADD SCHEDULE every 1 hour;
Ein Beispiel für Abfragen von Aktualisierungszeitplänen finden Sie unter ALTER STREAMING TABLE.
Nachverfolgen des Status einer Aktualisierung
Sie können den Status einer Aktualisierung einer Streamingtabelle anzeigen, indem Sie die Pipeline anzeigen, die die Streamingtabelle in der Benutzeroberfläche der Deklarativen Pipelines von Lakeflow verwaltet, oder indem Sie die vom Befehl für die Streamingtabelle zurückgegebenen DESCRIBE EXTENDED
anzeigen.
DESCRIBE TABLE EXTENDED <table-name>;
Alternativ können Sie die Streamingtabelle im Katalog-Explorer anzeigen und den Aktualisierungsstatus dort anzeigen:
- Klicken Sie auf
Katalog in der Randleiste.
- Öffnen Sie im Katalog-Explorer-Baum links den Katalog, und wählen Sie das Schema aus, in dem sich die Streamingtabelle befindet.
- Öffnen Sie das Tabellenelement unter dem ausgewählten Schema, und klicken Sie auf die Streamingtabelle.
Von hier aus können Sie die Registerkarten unter dem Namen der Streamingtabelle verwenden, um Informationen zur Streamingtabelle anzuzeigen und zu bearbeiten, einschließlich:
- Status und Verlauf aktualisieren
- Das Tabellenschema
- Beispieldaten (erfordert eine aktive Berechnung)
- Erlaubnisse
- Datenherkunft, einschließlich Tabellen und Pipelines, von denen diese Streamingtabelle abhängt
- Einblicke in die Nutzung
- Monitore, die Sie für diese Streaming-Tabelle erstellt haben
Steuern des Zugriffs auf Streamingtabellen
Streamingtabellen unterstützen umfassende Zugriffskontrollen, um die gemeinsame Nutzung von Daten zu ermöglichen, ohne potenziell private Daten offenzulegen. Ein Besitzer einer Streamingtabelle oder ein Benutzer mit dem MANAGE
-Recht kann anderen Benutzern SELECT
-Rechte gewähren. Benutzer mit SELECT
Zugriff auf die Streamingtabelle benötigen SELECT
keinen Zugriff auf die Tabellen, auf die von der Streamingtabelle verwiesen wird. Diese Zugriffskontrolle ermöglicht das Teilen von Daten und steuert gleichzeitig den Zugriff auf die zugrunde liegenden Daten.
Gewähren von Berechtigungen für eine Streamingtabelle
Verwenden Sie die GRANT Anweisung, um Zugriff auf eine Streamingtabelle zu gewähren:
GRANT <privilege_type> ON <st_name> TO <principal>;
Dies privilege_type
kann sein:
SELECT
– Die benutzende Person kann für die Streamingtabelle Folgendes ausführen:SELECT
.REFRESH
– Die benutzende Person kann für die Streamingtabelle Folgendes ausführen:REFRESH
. Aktualisierungen werden mit den Berechtigungen des Besitzers ausgeführt.
Im folgenden Beispiel wird eine Streamingtabelle erstellt und Benutzern Auswahl- und Aktualisierungsberechtigungen gewährt:
CREATE MATERIALIZED VIEW st_name AS SELECT * FROM source_table;
-- Grant read-only access:
GRANT SELECT ON st_name TO read_only_user;
-- Grand read and refresh access:
GRANT SELECT ON st_name TO refresh_user;
GRANT REFRESH ON st_name TO refresh_user;
Weitere Informationen zum Gewähren von Berechtigungen für sicherungsfähige Unity Catalog-Objekte finden Sie unter Unity Catalog-Berechtigungen und sicherungsfähige Objekte.
Widerrufen von Berechtigungen einer Streamingtabelle
Verwenden Sie die Anweisung REVOKE, um den Zugriff auf eine Streamingtabelle zu widerrufen.
REVOKE privilege_type ON <st_name> FROM principal;
Wenn SELECT
Berechtigungen für eine Quelltabelle vom Besitzer der Streamingtabelle oder einem anderen Benutzer entzogen werden, der MANAGE
oder SELECT
Berechtigungen für die Streamingtabelle erhalten hat, oder die Quelltabelle gelöscht wird, kann der Besitzer der Streamingtabelle oder der Benutzer, dem Zugriff gewährt wurde, weiterhin die Streamingtabelle abfragen. Es kommt jedoch zu folgendem Verhalten:
- Die Person, die die Streamingtabelle besitzt, oder andere Personen, die den Zugriff auf eine Streamingtabelle verloren haben, können für diese Streamingtabelle nicht mehr
REFRESH
ausführen, und die Streamingtabelle wird veraltet. - Wenn die Automatisierung per Zeitplan erfolgt, schlägt der nächste geplante
REFRESH
-Vorgang fehl oder wird nicht ausgeführt.
Im folgenden Beispiel wird SELECT
die read_only_user
-Berechtigung entzogen:
REVOKE SELECT ON st_name FROM read_only_user;
Dauerhaftes Löschen von Datensätzen aus einer Streamingtabelle
Von Bedeutung
Die Unterstützung für die REORG
-Anweisung mit Streamingtabellen befindet sich in der Public Preview.
Hinweis
- Die Verwendung einer
REORG
Anweisung mit einer Streamingtabelle erfordert Databricks Runtime 15.4 und höher. - Obwohl Sie die
REORG
Anweisung mit einer beliebigen Streamingtabelle verwenden können, ist sie nur erforderlich, wenn Datensätze aus einer Streamingtabelle mit aktivierten Löschvektoren gelöscht werden. Der Befehl hat keine Auswirkung, wenn er mit einer Streamingtabelle ohne aktivierte Löschvektoren verwendet wird.
Um Datensätze physisch aus dem zugrunde liegenden Speicher für eine Streamingtabelle mit aktivierten Löschvektoren zu löschen, z. B. für die DSGVO-Compliance, müssen zusätzliche Schritte ausgeführt werden, um sicherzustellen, dass ein VACUUM Vorgang in den Daten der Streamingtabelle ausgeführt wird.
So löschen Sie Datensätze physisch aus dem zugrunde liegenden Speicher:
- Aktualisieren von Datensätzen oder Löschen von Datensätzen aus der Streamingtabelle.
- Führen Sie eine
REORG
Anweisung für die Streamingtabelle aus, die denAPPLY (PURGE)
Parameter angibt. Beispiel:REORG TABLE <streaming-table-name> APPLY (PURGE);
. - Warten Sie, bis der Datenaufbewahrungszeitraum der Streamingtabelle überschritten wird. Der Standardzeitraum für die Datenaufbewahrung beträgt sieben Tage, kann jedoch mit der
delta.deletedFileRetentionDuration
Tabelleneigenschaft konfiguriert werden. Siehe Konfigurieren der Datenaufbewahrung für Zeitreiseabfragen. - Führen Sie
REFRESH
für die Streamingtabelle aus. Siehe Aktualisieren einer Streamingtabelle. Innerhalb von 24 Stunden nach demREFRESH
Vorgang werden Wartungsaufgaben von Lakeflow Declarative Pipelines, einschließlich des Vorgangs, derVACUUM
erforderlich ist, um sicherzustellen, dass Datensätze dauerhaft gelöscht werden, automatisch ausgeführt.
Überwachen der Ausführung mithilfe des Abfrageverlaufs
Sie können die Abfrageverlaufsseite verwenden, um auf Abfragedetails und Abfrageprofile zuzugreifen, die Ihnen helfen können, schlecht ausgeführte Abfragen und Engpässe in den Deklarativen Pipelines von Lakeflow zu identifizieren, mit denen Ihre Streamingtabellenaktualisierungen ausgeführt werden. Eine Übersicht über die Art der informationen, die in Abfragehistorien und Abfrageprofilen verfügbar sind, finden Sie unter "Abfrageverlauf " und "Abfrageprofil".
Von Bedeutung
Dieses Feature befindet sich in der Public Preview. Arbeitsbereichsadministratoren und -administratorinnen können dieses Feature über die Seite Vorschau aktivieren. Siehe Verwalten von Azure Databricks Previews.
Alle Anweisungen im Zusammenhang mit Streamingtabellen werden im Abfrageverlauf angezeigt. Sie können das Dropdown-Menü "Statement" verwenden, um einen beliebigen Befehl auszuwählen und die zugehörigen Abfragen zu überprüfen. Auf alle CREATE
Anweisungen folgt eine REFRESH
Anweisung, die asynchron in einer Pipeline ausgeführt wird. Die REFRESH
Anweisungen enthalten in der Regel detaillierte Abfragepläne, die Einblicke in die Optimierung der Leistung bieten.
Führen Sie die folgenden Schritte aus, um auf REFRESH
Anweisungen in der Query History UI zuzugreifen:
- Klicken Sie auf das
Öffnen Sie in der linken Randleiste die Benutzeroberfläche für den Abfrageverlauf .
- Aktivieren Sie das Kontrollkästchen REFRESH im Dropdownfilter Anweisung.
- Klicken Sie auf den Namen der Abfrage-Anweisung, um Zusammenfassungsdetails wie die Dauer der Abfrage und aggregierte Metriken anzuzeigen.
- Klicken Sie auf " Abfrageprofil anzeigen", um das Abfrageprofil zu öffnen. Details zum Navigieren im Abfrageprofil finden Sie im Abfrageprofil .
- Optional können Sie die Verknüpfungen im Abschnitt "Abfragequelle " verwenden, um die zugehörige Abfrage oder Pipeline zu öffnen.
Sie können auch mithilfe von Links im SQL-Editor oder aus einem Notizbuch, das an ein SQL Warehouse angefügt ist, auf Abfragedetails zugreifen.