Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Lakeflow Spark Declarative Pipelines (SDP) führt mehrere neue SQL-Schlüsselwörter und -Funktionen zum Definieren materialisierter Ansichten und Streamingtabellen in Pipelines ein. DIE SQL-Unterstützung für die Entwicklung von Pipelines basiert auf den Grundlagen von Spark SQL und bietet Unterstützung für strukturiertes Streaming.
Benutzer, die mit PySpark DataFrames vertraut sind, bevorzugen möglicherweise die Entwicklung von Pipelinecode mit Python. Python unterstützt umfangreichere Tests und Vorgänge, die schwierig mit SQL zu implementieren sind, z. B. Metaprogrammierungsoperationen. Siehe Entwickeln von Pipelinecode mit Python.
Eine vollständige Referenz zur SQL-Pipelinesyntax finden Sie in der Sql-Sprachreferenz für Pipelines.
Grundlagen von SQL für die Pipelineentwicklung
SQL-Code, der Pipeline-Datasets erstellt, verwendet die CREATE OR REFRESH Syntax zum Definieren materialisierter Ansichten und Streamingtabellen für Abfrageergebnisse.
Das STREAM Schlüsselwort gibt an, ob die datenquelle, auf die in einer SELECT Klausel verwiesen wird, mit Streamingsemantik gelesen werden soll.
Liest und schreibt standardmäßig im Katalog und Schema, die während der Pipelinekonfiguration angegeben wurden. Siehe Festlegen des Zielkatalogs und des Schemas.
Pipelinequellcode unterscheidet sich kritisch von SQL-Skripts: SDP wertet alle Datasetdefinitionen für alle Quellcodedateien aus, die in einer Pipeline konfiguriert sind, und erstellt ein Dataflowdiagramm, bevor Abfragen ausgeführt werden. Die Reihenfolge der Abfragen, die in den Quelldateien angezeigt werden, definiert die Reihenfolge der Codeauswertung, aber nicht die Reihenfolge der Abfrageausführung.
Erstellen einer materialisierten Ansicht mit SQL
Im folgenden Codebeispiel wird die grundlegende Syntax zum Erstellen einer materialisierten Ansicht mit SQL veranschaulicht:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Erstellen einer Streamingtabelle mit SQL
Im folgenden Codebeispiel wird die grundlegende Syntax zum Erstellen einer Streamingtabelle mit SQL veranschaulicht. Beim Lesen einer Quelle für eine Streamingtabelle gibt das STREAM Schlüsselwort an, dass die Streamingsemantik für die Quelle verwendet wird. Verwenden Sie das STREAM Schlüsselwort beim Erstellen einer materialisierten Ansicht nicht:
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Hinweis
Verwenden Sie das STREAM-Schlüsselwort, um Streamingsemantik zum Lesen aus der Quelle zu verwenden. Wenn beim Lesen eine Änderung oder löschung in einem vorhandenen Datensatz auftritt, wird ein Fehler ausgelöst. Es ist am sichersten, aus statischen oder nur angefügten Quellen zu lesen. Zum Einlesen von Daten mit Änderungs-Commits können Sie Python und die SkipChangeCommits Option zur Fehlerbehandlung verwenden.
Laden von Daten aus objektspeicher
Pipelines unterstützen das Laden von Daten aus allen Von Azure Databricks unterstützten Formaten. Siehe Datenformatoptionen.
Hinweis
In diesen Beispielen werden Daten verwendet, die im Pfad „/databricks-datasets“ verfügbar sind, der automatisch in Ihren Arbeitsbereich eingebunden wurde. Databricks empfiehlt die Verwendung von Volumepfaden oder Cloud-URIs, um auf daten zu verweisen, die im Cloudobjektspeicher gespeichert sind. Weitere Informationen finden Sie unter Was sind Unity Catalog-Volumes?.
Databricks empfiehlt die Verwendung von AutoLade- und Streamingtabellen beim Konfigurieren von inkrementellen Erfassungsworkloads für Daten, die im Cloudobjektspeicher gespeichert sind. Siehe Was ist Autoloader?.
SQL verwendet die read_files Funktion, um die Auto Loader-Funktionalität zu nutzen. Sie müssen das STREAM Schlüsselwort auch zum Konfigurieren eines Streaming-Lesevorgangs mit read_files verwenden.
Im Folgenden wird die Syntax von read_files in SQL beschrieben.
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
Die Optionen für das automatische Laden sind Schlüsselwertpaare. Ausführliche Informationen zu unterstützten Formaten und Optionen finden Sie unter "Optionen".
Im folgenden Beispiel wird eine Streamingtabelle aus JSON-Dateien mit dem automatischen Ladeprogramm erstellt:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
Die read_files Funktion unterstützt auch Batchsemantik zum Erstellen materialisierter Ansichten. Im folgenden Beispiel wird Batchsemantik verwendet, um ein JSON-Verzeichnis zu lesen und eine materialisierte Ansicht zu erstellen:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
Überprüfen von Daten mit Erwartungen
Sie können die Erwartungen verwenden, um Einschränkungen für die Datenqualität festzulegen und zu erzwingen. Weitere Informationen finden Sie unter Verwalten der Datenqualität mit Pipelineerwartungen.
Mit dem folgenden Code wird eine Erwartung namens valid_data definiert, die Datensätze abbricht, die während der Datenaufnahme NULL sind:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Führen Sie Abfragen für materialisierte Ansichten und Streaming-Tabellen aus, die in Ihrer Pipeline definiert sind.
Im folgenden Beispiel werden vier Datasets definiert:
- Eine Streamingtabelle namens
orders, die JSON-Daten lädt. - Eine materialisierte Ansicht namens
customers, die CSV-Daten lädt. - Eine materialisierte Ansicht namens
customer_orders, die Datensätze aus den Datensätzenordersundcustomersverknüpft, den Zeitstempel der Reihenfolge in ein Datum wandelt und die Feldercustomer_id,order_number,stateundorder_dateauswählt. - Eine materialisierte Ansicht namens
daily_orders_by_state, die die tägliche Zählung der Bestellungen für jedes Bundesland aggregiert.
Hinweis
Beim Abfragen von Ansichten oder Tabellen in Ihrer Pipeline können Sie den Katalog und das Schema direkt angeben oder die in Ihrer Pipeline konfigurierten Standardwerte verwenden. In diesem Beispiel werden die Tabellen orders, customersund customer_orders aus dem Standardkatalog und dem für Ihre Pipeline konfigurierten Schema geschrieben und gelesen.
Im Legacyveröffentlichungsmodus wird das LIVE Schema verwendet, um andere materialisierte Ansichten und Streamingtabellen abzufragen, die in Ihrer Pipeline definiert sind. In neuen Pipelines wird die LIVE-Schemasyntax stillschweigend ignoriert. Weitere Informationen finden Sie unter LIVE-Schema (Legacy).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
Definieren einer privaten Tabelle
Sie können die PRIVATE Klausel beim Erstellen einer materialisierten Ansicht oder einer Streamingtabelle verwenden. Wenn Sie eine private Tabelle erstellen, erstellen Sie die Tabelle, aber nicht die Metadaten für die Tabelle. Die PRIVATE Klausel weist SDP an, eine Tabelle zu erstellen, die für die Pipeline verfügbar ist, aber nicht außerhalb der Pipeline zugegriffen werden sollte. Um die Verarbeitungszeit zu reduzieren, bleibt eine private Tabelle über die gesamte Lebensdauer der Pipeline, die sie erstellt, bestehen, aber nicht nur bei einem einzelnen Update.
Private Tabellen können denselben Namen wie Tabellen im Katalog haben. Wenn Sie einen nicht qualifizierten Namen für eine Tabelle in einer Pipeline angeben, wird die private Tabelle verwendet, wenn sowohl eine private Tabelle als auch eine Katalogtabelle mit diesem Namen vorhanden ist.
Private Tabellen wurden zuvor als temporäre Tabellen bezeichnet.
Dauerhaftes Löschen von Datensätzen aus einer materialisierten Ansicht oder Streamingtabelle
Um Datensätze dauerhaft aus einer Streamingtabelle mit aktivierten Löschvektoren zu löschen, z. B. für die DSGVO-Compliance, müssen zusätzliche Vorgänge für die zugrunde liegenden Delta-Tabellen des Objekts ausgeführt werden. Informationen zum Sicherstellen des Löschens von Datensätzen aus einer Streamingtabelle finden Sie unter Dauerhaftes Löschen von Datensätzen aus einer Streamingtabelle.
Materialisierte Ansichten spiegeln immer die Daten in den zugrunde liegenden Tabellen wider, wenn sie aktualisiert werden. Um Daten in einer materialisierten Ansicht zu löschen, müssen Sie die Daten aus der Quelle löschen und die materialisierte Ansicht aktualisieren.
Parametrisieren von Werten, die beim Deklarieren von Tabellen oder Sichten mit SQL verwendet werden
Verwenden Sie SET, um einen Konfigurationswert in einer Abfrage anzugeben, die eine Tabelle oder Sicht deklariert (einschließlich der Spark-Konfigurationen). Jede Tabelle oder Ansicht, die Sie in einer Quelldatei definieren, hat Zugriff auf den definierten Wert, nachdem die SET Anweisung ausgeführt wurde. Alle Spark-Konfigurationen, die mithilfe der SET-Anweisung angegeben werden, werden beim Ausführen der Spark-Abfrage für eine beliebige Tabelle oder Ansicht nach der SET-Anweisung verwendet. Verwenden Sie zum Lesen eines Konfigurationswerts in einer Abfrage die Zeichenfolgen-Interpolationssyntax ${}. Im folgenden Beispiel wird ein Spark-Konfigurationswert mit dem Namen startDate festgelegt, und dieser Wert wird in einer Abfrage verwendet:
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
Um mehrere Konfigurationswerte anzugeben, verwenden Sie eine separate SET-Anweisung für jeden Wert.
Einschränkungen
Die PIVOT-Klausel wird nicht unterstützt. Der pivot-Vorgang in Spark erfordert das vorzeitige Laden von Eingabedaten, um das Ausgabeschema zu berechnen. Diese Funktion wird in Pipelines nicht unterstützt.
Hinweis
Die CREATE OR REFRESH LIVE TABLE-Syntax zum Erstellen einer materialisierten Sicht ist veraltet. Stattdessen verwenden Sie CREATE OR REFRESH MATERIALIZED VIEW.