Freigeben über


Verwenden von Streamingtabellen in Databricks SQL

Databricks empfiehlt die Verwendung von Streamingtabellen zum Erfassen von Daten mit Databricks SQL. Eine Streamingtabelle ist eine bei Unity Catalog registrierte Tabelle mit zusätzlicher Unterstützung für Streaming oder inkrementelle Datenverarbeitung. Für jede Streamingtabelle wird automatisch eine Pipeline erstellt. Sie können Streamingtabellen für das inkrementelle Laden von Daten von Kafka und vom Cloudobjektspeicher verwenden.

Hinweis

Informationen darüber, wie Delta Lake-Tabellen als Streamingquellen und Streaming-Senken genutzt werden, finden Sie unter Delta-Tabellen-Streaming: Lese- und Schreibvorgänge.

Anforderungen

Um Streamingtabellen zu verwenden, müssen Sie die folgenden Anforderungen erfüllen.

Anforderungen an den Arbeitsbereich:

Streamingtabellen, die in Databricks SQL erstellt wurden, werden von serverlosen deklarativen Lakeflow-Pipelines unterstützt. Ihr Arbeitsbereich muss serverlose Pipelines unterstützen, um diese Funktionalität zu verwenden.

Computeanforderungen:

Sie müssen eine der folgenden Voraussetzungen verwenden:

  • Ein SQL-Warehouse, das den Current-Kanal verwendet.
  • Berechnungen mit dem Standardzugriffsmodus (früher gemeinsam genutzter Zugriffsmodus) auf Databricks Runtime 13.3 LTS oder höher.
  • Berechnen Sie mit dem dedizierten Zugriffsmodus (früher Einzelbenutzer-Zugriffsmodus) auf Databricks Runtime 15.4 LTS oder höher.

    Unter Databricks Runtime 15.3 und unten können Sie keine dedizierte Berechnung verwenden, um Streamingtabellen abzufragen, die im Besitz anderer Benutzer sind. Sie können dedizierte Berechnungskapazität auf Databricks Runtime 15.3 und früheren Versionen nur verwenden, wenn Sie die Streamingtabelle besitzen. Der Ersteller der Tabelle ist der Besitzer.

    Databricks Runtime 15.4 LTS und höher unterstützen das Abfragen von Tabellen, die durch Lakeflow Declarative Pipelines erzeugt wurden, auf einem dedizierten Compute, auch wenn Sie nicht der Tabellenbesitzer sind. Sie könnten für serverlose Rechnerressourcen belastet werden, wenn Sie dedizierte Rechnerkapazitäten verwenden, um Datenfiltervorgänge auszuführen. Siehe Feingranulare Zugriffssteuerung auf dedizierten Rechenressourcen.

Berechtigungsanforderungen:

  • Die Berechtigungen USE CATALOG und USE SCHEMA für den Katalog und das Schema, wo Sie die Streamingtabelle erstellen.
  • Die CREATE TABLE-Berechtigung für das Schema, in dem Sie die Streamingtabelle erstellen.
  • Berechtigungen für den Zugriff auf die Tabellen oder Speicherorte, die die Quelldaten für Ihre Streamingtabelle bereitstellen.

Erstellen von Streamingtabellen

Eine Streamingtabelle wird durch eine SQL-Abfrage in Databricks SQL definiert. Wenn Sie eine Streamingtabelle erstellen, werden die daten, die sich derzeit in den Quelltabellen befinden, verwendet, um die Streamingtabelle zu erstellen. Danach aktualisieren Sie die Tabelle, in der Regel nach einem Zeitplan, um alle hinzugefügten Daten in den Quelltabellen abzurufen, um sie an die Streamingtabelle anzufügen.

Wenn Sie eine Streamingtabelle erstellen, werden Sie als Besitzer der Tabelle betrachtet.

Um eine Streamingtabelle aus einer vorhandenen Tabelle zu erstellen, verwenden Sie die CREATE STREAMING TABLE Anweisung wie im folgenden Beispiel:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT product, price FROM STREAM raw_data;

In diesem Fall wird die Streamingtabelle sales aus bestimmten Spalten der raw_data Tabelle mit einem Zeitplan erstellt, der jede Stunde aktualisiert werden soll. Die verwendete Abfrage muss eine Streamingabfrage sein. Verwenden Sie das STREAM Schlüsselwort, um Streamingsemantik zum Lesen aus der Quelle zu verwenden.

Wenn Sie eine Streamingtabelle mit der CREATE OR REFRESH STREAMING TABLE Anweisung erstellen, beginnen die anfängliche Datenaktualisierung und -population sofort. Diese Vorgänge verbrauchen keine DBSQL Warehouse Compute. Stattdessen basieren Streamingtabellen auf serverlosen, Lakeflow-deklarativen Pipelines zur Erstellung und Aktualisierung. Eine dedizierte serverlose Pipeline wird automatisch vom System für jede Streamingtabelle erstellt und verwaltet.

Laden von Dateien mit automatischem Ladeprogramm

Um eine Streamingtabelle aus Dateien in einem Volume zu erstellen, verwenden Sie das automatische Laden. Verwenden Sie "Auto Loader" mit "Lakeflow Declarative Pipelines" für die meisten Datenaufnahmeaufgaben aus dem Cloudobjektspeicher. Auto Loader und Lakeflow Declarative Pipelines sind so konzipiert, dass sie fortlaufend und idempotent ständig wachsende Daten laden, sobald diese im Cloudspeicher eintreffen.

Verwenden Sie die read_files Funktion, um auto Loader in Databricks SQL zu verwenden. Die folgenden Beispiele zeigen die Verwendung von Auto Loader zum Lesen eines Volumens von JSON-Dateien in eine Streamingtabelle:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT * FROM STREAM read_files(
    "/Volumes/my_catalog/my_schema/my_volume/path/to/data",
    format => "json"
  );

Um Daten aus cloudbasiertem Speicher zu lesen, können Sie auch das automatische Laden verwenden:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

Weitere Informationen zum automatischen Laden finden Sie unter "Was ist auto Loader?" Weitere Informationen zur Verwendung des automatischen Ladeprogramms in SQL mit Beispielen finden Sie unter Laden von Daten aus objektspeicher.

Erfassung per Streaming aus anderen Quellen

Ein Beispiel für die Erfassung aus anderen Quellen, einschließlich Kafka, finden Sie unter Laden von Daten mit Lakeflow Declarative Pipelines.

Nur neue Daten aufnehmen

Standardmäßig liest die read_files-Funktion alle vorhandenen Daten im Quellverzeichnis während der Tabellenerstellung und verarbeitet dann neu eingehende Datensätze mit jeder Aktualisierung.

Um das Aufnehmen von Daten zu vermeiden, die zum Zeitpunkt der Tabellenerstellung bereits im Quellverzeichnis vorhanden sind, legen Sie die Option includeExistingFiles auf falsefest. Dies bedeutet, dass nur Daten verarbeitet werden, die nach der Tabellenerstellung im Verzeichnis eingehen. Beispiel:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM read_files(
    '/path/to/files',
    includeExistingFiles => false
  );

Festlegen des Laufzeitkanals

Streamingtabellen, die mit SQL-Lagerhäusern erstellt wurden, werden automatisch mithilfe einer Pipeline aktualisiert. Lakeflow Declarative Pipelines verwenden standardmäßig die Laufzeit im current Kanal. Lesen Sie die Versionshinweise zu Lakeflow Declarative Pipelines und den Releaseupgradeprozess , um mehr über den Veröffentlichungsprozess zu erfahren.

Databricks empfiehlt die Verwendung des current Kanals für Produktionsworkloads. Neue Features werden zuerst im preview Kanal veröffentlicht. Sie können eine Pipeline auf den Vorschaukanal von Lakeflow Declarative Pipelines setzen, um neue Features zu testen, indem Sie preview als Tabelleneigenschaft angeben. Sie können diese Eigenschaft angeben, wenn Sie die Tabelle erstellen oder nachdem die Tabelle mithilfe einer ALTER-Anweisung erstellt wurde.

Das folgende Codebeispiel zeigt, wie Sie den Kanal in einer CREATE-Anweisung auf eine Vorschau festlegen:

CREATE OR REFRESH STREAMING TABLE sales
  TBLPROPERTIES ('pipelines.channel' = 'preview')
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM raw_data;

Ausblenden vertraulicher Daten

Von Bedeutung

Dieses Feature befindet sich in der Public Preview.

Sie können Streamingtabellen verwenden, um vertrauliche Daten von Benutzern auszublenden, die auf die Tabelle zugreifen. Ein Ansatz besteht darin, die Abfrage so zu definieren, dass vertrauliche Spalten oder Zeilen vollständig ausgeschlossen werden. Alternativ können Sie Spaltenmasken oder Zeilenfilter basierend auf den Berechtigungen des Abfragebenutzers anwenden. Beispielsweise könnten Sie die tax_id Spalte für Benutzer ausblenden, die sich nicht in der Gruppe HumanResourcesDeptbefinden. Verwenden Sie dazu die ROW FILTER und MASK Syntax während der Erstellung der Streamingtabelle. Weitere Informationen finden Sie unter Zeilenfilter und Spaltenmasken.

Aktualisieren einer Streamingtabelle

Aktualisierungen können automatisch geplant werden, wenn Sie die Streamingtabelle erstellen. Sie können Streamingtabellen auch manuell aktualisieren. Auch wenn Sie über eine geplante Aktualisierung verfügen, können Sie jederzeit eine manuelle Aktualisierung aufrufen. Aktualisierungen werden von derselben Pipeline behandelt, die automatisch zusammen mit der Streamingtabelle erstellt wurde.

So aktualisieren Sie eine Streamingtabelle:

REFRESH STREAMING TABLE sales;

Sie können den Status der neuesten Aktualisierung mit DESCRIBE TABLE EXTENDED überprüfen.

Hinweis

Nur der Tabellenbesitzer kann eine Streamingtabelle aktualisieren, um die neuesten Daten abzurufen. Der Benutzer, der die Tabelle erstellt, ist der Besitzer, und der Besitzer kann nicht geändert werden. Möglicherweise müssen Sie Ihre Streamingtabelle aktualisieren, bevor Sie Zeitreise-Abfragen verwenden.

Funktionsweise der Aktualisierung

Eine Aktualisierung einer Streamingtabelle wertet nur neue Zeilen aus, die seit der letzten Aktualisierung eingegangen sind, und fügt nur die neuen Daten an.

Jede Aktualisierung verwendet die aktuelle Definition der Streamingtabelle, um diese neuen Daten zu verarbeiten. Das Ändern einer Streamingtabellendefinition berechnet vorhandene Daten nicht automatisch neu. Wenn eine Änderung nicht mit vorhandenen Daten kompatibel ist (z. B. ändern eines Datentyps), schlägt die nächste Aktualisierung mit einem Fehler fehl.

In den folgenden Beispielen wird erläutert, wie sich Änderungen an einer Streamingtabellendefinition auf das Aktualisierungsverhalten auswirken:

  • Durch das Entfernen eines Filters werden zuvor gefilterte Zeilen nicht erneut verarbeitet.
  • Das Ändern von Spaltenprojektionen wirkt sich nicht darauf aus, wie vorhandene Daten verarbeitet wurden.
  • Verknüpfungen mit statischen Momentaufnahmen verwenden den Momentaufnahmezustand zum Zeitpunkt der anfänglichen Verarbeitung. Spät ankommende Daten, die mit der aktualisierten Momentaufnahme übereinstimmen würden, werden ignoriert. Dies kann dazu führen, dass Fakten ignoriert werden, wenn Dimensionen verspätet sind.
  • Das Ändern des CAST einer vorhandenen Spalte führt zu einem Fehler.

Wenn sich Ihre Daten auf eine Weise ändern, die in der vorhandenen Streamingtabelle nicht unterstützt werden kann, können Sie eine vollständige Aktualisierung durchführen.

Vollständiges Aktualisieren einer Streamingtabelle

Vollständige Aktualisierungen verarbeiten alle in der Quelle verfügbaren Daten mit der neuesten Definition erneut. Es wird nicht empfohlen, vollständige Aktualisierungen für Quellen aufzurufen, die den gesamten Verlauf der Daten nicht beibehalten oder kurze Aufbewahrungsfristen haben, z. B. Kafka, da die vollständige Aktualisierung die vorhandenen Daten abschneidet. Möglicherweise können Sie alte Daten nicht wiederherstellen, wenn die Daten in der Quelle nicht mehr verfügbar sind.

Beispiel:

REFRESH STREAMING TABLE sales FULL;

Ändern des Zeitplans für eine Streamingtabelle

Sie können einen automatischen Aktualisierungszeitplan für Ihre Streamingtabelle ändern (oder festlegen). Die folgenden Beispiele zeigen Ihnen, wie Sie einen Zeitplan mithilfe von ALTER STREAMING TABLE erstellen:

ALTER STREAMING TABLE sales
  ADD SCHEDULE every 1 hour;

Ein Beispiel für Abfragen von Aktualisierungszeitplänen finden Sie unter ALTER STREAMING TABLE.

Nachverfolgen des Status einer Aktualisierung

Sie können den Status einer Aktualisierung einer Streamingtabelle anzeigen, indem Sie die Pipeline anzeigen, die die Streamingtabelle in der Benutzeroberfläche der Deklarativen Pipelines von Lakeflow verwaltet, oder indem Sie die vom Befehl für die Streamingtabelle zurückgegebenen DESCRIBE EXTENDED anzeigen.

DESCRIBE TABLE EXTENDED <table-name>;

Alternativ können Sie die Streamingtabelle im Katalog-Explorer anzeigen und den Aktualisierungsstatus dort anzeigen:

  1. Klicken Sie auf das Symbol Katalog in der Randleiste.
  2. Öffnen Sie im Katalog-Explorer-Baum links den Katalog, und wählen Sie das Schema aus, in dem sich die Streamingtabelle befindet.
  3. Öffnen Sie das Tabellenelement unter dem ausgewählten Schema, und klicken Sie auf die Streamingtabelle.

Von hier aus können Sie die Registerkarten unter dem Namen der Streamingtabelle verwenden, um Informationen zur Streamingtabelle anzuzeigen und zu bearbeiten, einschließlich:

  • Status und Verlauf aktualisieren
  • Das Tabellenschema
  • Beispieldaten (erfordert eine aktive Berechnung)
  • Erlaubnisse
  • Datenherkunft, einschließlich Tabellen und Pipelines, von denen diese Streamingtabelle abhängt
  • Einblicke in die Nutzung
  • Monitore, die Sie für diese Streaming-Tabelle erstellt haben

Steuern des Zugriffs auf Streamingtabellen

Streamingtabellen unterstützen umfassende Zugriffskontrollen, um die gemeinsame Nutzung von Daten zu ermöglichen, ohne potenziell private Daten offenzulegen. Ein Besitzer einer Streamingtabelle oder ein Benutzer mit dem MANAGE-Recht kann anderen Benutzern SELECT-Rechte gewähren. Benutzer mit SELECT Zugriff auf die Streamingtabelle benötigen SELECT keinen Zugriff auf die Tabellen, auf die von der Streamingtabelle verwiesen wird. Diese Zugriffskontrolle ermöglicht das Teilen von Daten und steuert gleichzeitig den Zugriff auf die zugrunde liegenden Daten.

Gewähren von Berechtigungen für eine Streamingtabelle

Verwenden Sie die GRANT Anweisung, um Zugriff auf eine Streamingtabelle zu gewähren:

GRANT <privilege_type> ON <st_name> TO <principal>;

Dies privilege_type kann sein:

  • SELECT – Die benutzende Person kann für die Streamingtabelle Folgendes ausführen: SELECT.
  • REFRESH – Die benutzende Person kann für die Streamingtabelle Folgendes ausführen: REFRESH. Aktualisierungen werden mit den Berechtigungen des Besitzers ausgeführt.

Im folgenden Beispiel wird eine Streamingtabelle erstellt und Benutzern Auswahl- und Aktualisierungsberechtigungen gewährt:

CREATE MATERIALIZED VIEW st_name AS SELECT * FROM source_table;

-- Grant read-only access:
GRANT SELECT ON st_name TO read_only_user;

-- Grand read and refresh access:
GRANT SELECT ON st_name TO refresh_user;
GRANT REFRESH ON st_name TO refresh_user;

Weitere Informationen zum Gewähren von Berechtigungen für sicherungsfähige Unity Catalog-Objekte finden Sie unter Unity Catalog-Berechtigungen und sicherungsfähige Objekte.

Widerrufen von Berechtigungen einer Streamingtabelle

Verwenden Sie die Anweisung REVOKE, um den Zugriff auf eine Streamingtabelle zu widerrufen.

REVOKE privilege_type ON <st_name> FROM principal;

Wenn SELECT Berechtigungen für eine Quelltabelle vom Besitzer der Streamingtabelle oder einem anderen Benutzer entzogen werden, der MANAGE oder SELECT Berechtigungen für die Streamingtabelle erhalten hat, oder die Quelltabelle gelöscht wird, kann der Besitzer der Streamingtabelle oder der Benutzer, dem Zugriff gewährt wurde, weiterhin die Streamingtabelle abfragen. Es kommt jedoch zu folgendem Verhalten:

  • Die Person, die die Streamingtabelle besitzt, oder andere Personen, die den Zugriff auf eine Streamingtabelle verloren haben, können für diese Streamingtabelle nicht mehr REFRESH ausführen, und die Streamingtabelle wird veraltet.
  • Wenn die Automatisierung per Zeitplan erfolgt, schlägt der nächste geplante REFRESH-Vorgang fehl oder wird nicht ausgeführt.

Im folgenden Beispiel wird SELECT die read_only_user-Berechtigung entzogen:

REVOKE SELECT ON st_name FROM read_only_user;

Dauerhaftes Löschen von Datensätzen aus einer Streamingtabelle

Von Bedeutung

Die Unterstützung für die REORG-Anweisung mit Streamingtabellen befindet sich in der Public Preview.

Hinweis

  • Die Verwendung einer REORG Anweisung mit einer Streamingtabelle erfordert Databricks Runtime 15.4 und höher.
  • Obwohl Sie die REORG Anweisung mit einer beliebigen Streamingtabelle verwenden können, ist sie nur erforderlich, wenn Datensätze aus einer Streamingtabelle mit aktivierten Löschvektoren gelöscht werden. Der Befehl hat keine Auswirkung, wenn er mit einer Streamingtabelle ohne aktivierte Löschvektoren verwendet wird.

Um Datensätze physisch aus dem zugrunde liegenden Speicher für eine Streamingtabelle mit aktivierten Löschvektoren zu löschen, z. B. für die DSGVO-Compliance, müssen zusätzliche Schritte ausgeführt werden, um sicherzustellen, dass ein VACUUM Vorgang in den Daten der Streamingtabelle ausgeführt wird.

So löschen Sie Datensätze physisch aus dem zugrunde liegenden Speicher:

  1. Aktualisieren von Datensätzen oder Löschen von Datensätzen aus der Streamingtabelle.
  2. Führen Sie eine REORG Anweisung für die Streamingtabelle aus, die den APPLY (PURGE) Parameter angibt. Beispiel: REORG TABLE <streaming-table-name> APPLY (PURGE);.
  3. Warten Sie, bis der Datenaufbewahrungszeitraum der Streamingtabelle überschritten wird. Der Standardzeitraum für die Datenaufbewahrung beträgt sieben Tage, kann jedoch mit der delta.deletedFileRetentionDuration Tabelleneigenschaft konfiguriert werden. Siehe Konfigurieren der Datenaufbewahrung für Zeitreiseabfragen.
  4. Führen Sie REFRESH für die Streamingtabelle aus. Siehe Aktualisieren einer Streamingtabelle. Innerhalb von 24 Stunden nach dem REFRESH Vorgang werden Wartungsaufgaben von Lakeflow Declarative Pipelines, einschließlich des Vorgangs, der VACUUM erforderlich ist, um sicherzustellen, dass Datensätze dauerhaft gelöscht werden, automatisch ausgeführt.

Überwachen der Ausführung mithilfe des Abfrageverlaufs

Sie können die Abfrageverlaufsseite verwenden, um auf Abfragedetails und Abfrageprofile zuzugreifen, die Ihnen helfen können, schlecht ausgeführte Abfragen und Engpässe in den Deklarativen Pipelines von Lakeflow zu identifizieren, mit denen Ihre Streamingtabellenaktualisierungen ausgeführt werden. Eine Übersicht über die Art der informationen, die in Abfragehistorien und Abfrageprofilen verfügbar sind, finden Sie unter "Abfrageverlauf " und "Abfrageprofil".

Von Bedeutung

Dieses Feature befindet sich in der Public Preview. Arbeitsbereichsadministratoren und -administratorinnen können dieses Feature über die Seite Vorschau aktivieren. Siehe Verwalten von Azure Databricks Previews.

Alle Anweisungen im Zusammenhang mit Streamingtabellen werden im Abfrageverlauf angezeigt. Sie können das Dropdown-Menü "Statement" verwenden, um einen beliebigen Befehl auszuwählen und die zugehörigen Abfragen zu überprüfen. Auf alle CREATE Anweisungen folgt eine REFRESH Anweisung, die asynchron in einer Pipeline ausgeführt wird. Die REFRESH Anweisungen enthalten in der Regel detaillierte Abfragepläne, die Einblicke in die Optimierung der Leistung bieten.

Führen Sie die folgenden Schritte aus, um auf REFRESH Anweisungen in der Query History UI zuzugreifen:

  1. Klicken Sie auf das Symbol Öffnen Sie in der linken Randleiste die Benutzeroberfläche für den Abfrageverlauf .
  2. Aktivieren Sie das Kontrollkästchen REFRESH im Dropdownfilter Anweisung.
  3. Klicken Sie auf den Namen der Abfrage-Anweisung, um Zusammenfassungsdetails wie die Dauer der Abfrage und aggregierte Metriken anzuzeigen.
  4. Klicken Sie auf " Abfrageprofil anzeigen", um das Abfrageprofil zu öffnen. Details zum Navigieren im Abfrageprofil finden Sie im Abfrageprofil .
  5. Optional können Sie die Verknüpfungen im Abschnitt "Abfragequelle " verwenden, um die zugehörige Abfrage oder Pipeline zu öffnen.

Sie können auch mithilfe von Links im SQL-Editor oder aus einem Notizbuch, das an ein SQL Warehouse angefügt ist, auf Abfragedetails zugreifen.

Weitere Ressourcen