Delta Live Tables: SQL-Sprachreferenz

Dieser Artikel enthält Details für die SQL-Programmierschnittstelle für Delta Live Tables.

Sie können benutzerdefinierte Python-Funktionen (User-Defined Functions, UDFs) in Ihren SQL-Abfragen verwenden, aber Sie müssen diese UDFs in Python-Dateien definieren, bevor Sie sie in SQL-Quelldateien aufrufen. Weitere Informationen finden Sie unter Benutzerdefinierte Skalarfunktionen: Python.

Begrenzungen

Die PIVOT-Klausel wird nicht unterstützt. Der pivot-Vorgang in Spark erfordert Eager Loading von Eingabedaten, um das Schema der Ausgabe zu berechnen. Diese Funktion wird in Delta Live Tables nicht unterstützt.

Erstellen einer materialisierten Sicht oder Streamingtabelle für Delta Live Tables

Sie verwenden dieselbe grundlegende SQL-Syntax, wenn Sie entweder eine Streamingtabelle oder eine materialisierte Sicht (auch als LIVE TABLE bezeichnet) deklarieren.

Sie können Streamingtabellen nur mithilfe von Abfragen für einen Streaminglesevorgang deklarieren. Databricks empfiehlt die Verwendung des Autoloaders für die Streamingerfassung von Dateien aus dem Cloudobjektspeicher. Weitere Informationen finden Sie unter SQL-Syntax des Autoloaders.

Sie müssen die STREAM()-Funktion um einen Datasetnamen einschließen, wenn Sie andere Tabellen oder Sichten in Ihrer Pipeline als Streamingquelle angeben.

Im Folgenden wird die Syntax zum Deklarieren materialisierter Sichten und Streamingtabellen mit SQL beschrieben:

CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  AS select_statement

Erstellen einer Delta Live Tables-Sicht

Nachfolgend wird die Syntax zum Deklarieren von Sichten mit SQL beschrieben:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

SQL-Syntax des Autoloaders

Im Folgenden wird die Syntax für die Arbeit mit dem Autoloader in SQL beschrieben:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

Sie können unterstützte Formatoptionen mit dem Autoloader verwenden. Mithilfe der map()-Funktion können Sie eine beliebige Anzahl von Optionen an die cloud_files()-Methode übergeben. Die Optionen sind Schlüssel-Wert-Paare, bei denen die Schlüssel und Werte Zeichenfolgen sind. Ausführliche Informationen zur Unterstützung von Formaten und Optionen finden Sie unter Dateiformatoptionen.

Beispiel: Definieren von Tabellen

Sie können ein Dataset erstellen, indem Sie aus einer externen Datenquelle oder aus Datasets lesen, die in einer Pipeline definiert sind. Um aus einem internen Dataset zu lesen, stellen Sie dem Datasetnamen das Schlüsselwort LIVE voran. Im folgenden Beispiel werden zwei verschiedene Datasets definiert: eine Tabelle namens taxi_raw, die eine JSON-Datei als Eingabequelle verwendet, und eine Tabelle namens filtered_data, die die Tabelle taxi_raw als Eingabe verwendet:

CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Beispiel: Lesen aus einer Streamingquelle

Um Daten aus einer Streamingquelle zu lesen (z. B. Autoloader oder internes Dataset), definieren Sie eine STREAMING-Tabelle:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

Weitere Informationen zu Streamingdaten finden Sie unter Transformieren von Daten mit Delta Live Tables.

Steuern der Materialisierung von Tabellen

Tabellen bieten auch zusätzliche Steuerung ihrer Materialisierung:

  • Geben Sie an, wie Tabellen mit PARTITIONED BYpartitioniert werden. Sie können die Partitionierung verwenden, um Abfragen zu beschleunigen.
  • Sie können Tabelleneigenschaften mithilfe von TBLPROPERTIES festlegen. Weitere Informationen finden Sie unter Delta Live Tables-Tabelleneigenschaften.
  • Legen Sie mithilfe der LOCATION-Einstellung einen Speicherort fest. Standardmäßig werden Tabellendaten am Speicherort der Pipeline gespeichert, wenn LOCATION nicht festgelegt ist.
  • Sie können Generierte Spalten in Ihrer Schemadefinition verwenden. Weitere Informationen finden Sie unter Beispiel: Angeben des Schemas und der Partitionsspalten.

Hinweis

Für Tabellen mit einer Größe von weniger als 1 TB empfiehlt Databricks, Delta Live Tables die Strukturierung der Daten steuern zu lassen. Wenn Sie nicht davon ausgehen, dass die Größe Ihrer Tabelle über ein Terabyte hinausgeht, sollten Sie im Allgemeinen keine Partitionsspalten angeben.

Beispiel: Angeben des Schemas und der Partitionsspalten

Sie können optional ein Schema angeben, wenn Sie eine Tabelle definieren. Im folgenden Beispiel wird das Schema für die Zieltabelle angegeben, einschließlich der Verwendung von den von Delta Lake generierten Spalten und der Definition von Partitionsspalten für die Tabelle:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Standardmäßig leitet Delta Live Tables das Schema aus der table-Definition ab, wenn Sie kein Schema angeben.

Festlegen von Konfigurationswerten für eine Tabelle oder Sicht

Verwenden Sie SET, um einen Konfigurationswert für eine Tabelle oder Sicht anzugeben, einschließlich der Spark-Konfigurationen. Jede Tabelle oder Sicht, die Sie in einem Notebook nach der SET-Anweisung definieren, hat Zugriff auf den definierten Wert. Alle Spark-Konfigurationen, die mit der SET-Anweisung angegeben werden, werden verwendet, wenn die Spark-Abfrage für eine Tabelle oder Sicht ausgeführt wird, die auf die SET-Anweisung folgt. Verwenden Sie zum Lesen eines Konfigurationswerts in einer Abfrage die Zeichenfolgeninterpolationssyntax ${}. Im folgenden Beispiel wird ein Spark-Konfigurationswert mit dem Namen startDate festgelegt, und dieser Wert wird in einer Abfrage verwendet:

SET startDate='2020-01-01';

CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Um mehrere Konfigurationswerte anzugeben, verwenden Sie eine separate SET-Anweisung für jeden Wert.

SQL-Eigenschaften

CREATE TABLE oder VIEW
TEMPORARY

Erstellen Sie eine Tabelle, veröffentlichen Sie jedoch keine Metadaten für die Tabelle. Die Klausel TEMPORARY weist Delta Live Tables an, eine Tabelle zu erstellen, die für die Pipeline verfügbar ist, auf die aber nicht außerhalb der Pipeline zugegriffen werden sollte. Um die Verarbeitungszeit zu reduzieren, wird eine temporäre Tabelle für die Lebensdauer der Pipeline beibehalten, die sie erstellt, und nicht nur für ein einzelnes Update.
STREAMING

Erstellen einer Tabelle, die ein Eingabedataset als Stream liest. Das Eingabedataset muss eine Streamingdatenquelle sein, z. B. Autoloader oder eine STREAMING-Tabelle.
PARTITIONED BY

Eine optionale Liste einer oder mehrerer Spalten, die zum Partitionieren der Tabelle verwendet werden sollen
LOCATION

Ein optionaler Speicherort für Tabellendaten. Wenn diese Einstellung nicht festgelegt ist, verwendet das System standardmäßig den Speicherort der Pipeline.
COMMENT

Dies ist eine optionale Beschreibung für die Tabelle
TBLPROPERTIES

Eine optionale Liste der Tabelleneigenschaften für die Tabelle
select_statement

Eine Delta Live Tables-Abfrage, die das Dataset für die Tabelle definiert
CONSTRAINT-Klausel
EXPECT expectation_name

Definiert die Einschränkung expectation_name für die Datenqualität. Wenn die Einschränkung ON VIOLATION nicht definiert ist, fügen Sie dem Zieldataset Zeilen hinzu, die gegen die Einschränkung verstoßen.
ON VIOLATION

Optionale Aktion für fehlerhafte Zeilen:

* FAIL UPDATE: Die Pipelineausführung wird sofort beendet.
* DROP ROW: Der Datensatz wird abgelegt, und die Verarbeitung wird fortgesetzt.

Change Data Capture mit SQL in Delta Live Tables

Verwenden Sie die APPLY CHANGES INTO-Anweisung, um die CDC-Funktionalität von Delta Live Tables wie folgt zu verwenden:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Sie definieren Datenqualitätseinschränkungen für ein APPLY CHANGES-Ziel unter Verwendung derselben CONSTRAINT-Klausel wie für Abfragen, die nicht vom Typ APPLY CHANGES sind. Siehe Verwalten der Datenqualität mit Delta Live Tables.

Hinweis

Das Standardverhalten für INSERT- und UPDATE-Ereignisse ist das Ausführen eines Upserts von CDC-Ereignissen aus der Quelle: das Aktualisieren aller Zeilen in der Zieltabelle, die mit den angegebenen Schlüsseln übereinstimmen, oder das Einfügen einer neuen Zeile, wenn kein übereinstimmender Datensatz in der Zieltabelle vorhanden ist. Die Behandlung von DELETE-Ereignissen kann mit der APPLY AS DELETE WHEN-Bedingung angegeben werden.

Wichtig

Sie müssen eine Zielstreamingtabelle deklarieren, auf die Änderungen angewendet werden sollen. Optional können Sie das Schema für Ihre Zieltabelle angeben. Wenn Sie das Schema der APPLY CHANGES-Zieltabelle angeben, müssen Sie auch die Spalten __START_AT und __END_AT mit demselben Datentyp wie das Feld sequence_by angeben.

Weitere Informationen finden Sie unter Vereinfachte Change Data Capture mit der APPLY CHANGES API in Delta Live Tables.

Klauseln
KEYS

Die Spalte oder Kombination von Spalten, die eine Zeile in den Quelldaten eindeutig identifiziert. Damit wird ermittelt, welche CDC-Ereignisse für bestimmte Datensätze in der Zieltabelle gelten.

Diese Klausel ist erforderlich.
IGNORE NULL UPDATES

Ermöglicht das Erfassen von Updates, die eine Teilmenge der Zielspalten enthalten. Wenn ein CDC-Ereignis mit einer vorhandenen Zeile übereinstimmt und IGNORE NULL UPDATES angegeben ist, behalten Spalten mit einer null ihre vorhandenen Werte im Ziel bei. Dies gilt auch für geschachtelte Spalten mit dem Wert null.

Diese Klausel ist optional.

Die Standardeinstellung ist das Überschreiben vorhandener Spalten mit null-Werten.
APPLY AS DELETE WHEN

Gibt an, wann ein CDC-Ereignis als DELETE und nicht als Upsert behandelt werden soll. Um nicht sortierte Daten zu verarbeiten, wird die gelöschte Zeile vorübergehend als Tombstone in der zugrunde liegenden Delta-Tabelle beibehalten, und im Metastore wird eine Sicht erstellt, die diese Tombstones herausfiltert. Das Aufbewahrungsintervall kann konfiguriert werden mit:
pipelines.cdc.tombstoneGCThresholdInSecondsTabelleneigenschaft.

Diese Klausel ist optional.
APPLY AS TRUNCATE WHEN

Gibt an, wann ein CDC-Ereignis als TRUNCATE der gesamten Tabelle behandelt werden sollte. Da diese Klausel die vollständige Abschneidung der Zieltabelle auslöst, sollte sie nur in bestimmten Anwendungsfälle verwendet werden, die die Nutzung dieser Funktion erfordern.

Die APPLY AS TRUNCATE WHEN-Klausel wird nur für den SCD-Typ 1 unterstützt. Der SCD-Typ 2 unterstützt das Abschneiden nicht.

Diese Klausel ist optional.
SEQUENCE BY

Der Spaltenname, der die logische Reihenfolge der CDC-Ereignisse in den Quelldaten angibt. Delta Live Tables verwendet diese Sequenzierung, um Änderungsereignisse zu behandeln, die in nicht ordnungsgemäßer Reihenfolge eingehen.

Diese Klausel ist erforderlich.
COLUMNS

Gibt eine Teilmenge der Spalten an, die in die Zieltabelle eingeschlossen werden sollen. Sie haben folgende Möglichkeiten:

* Geben Sie die vollständige Liste der einzuschließenden Spalten an: COLUMNS (userId, name, city).
* Geben Sie eine Liste der auszuschließenden Spalten an: COLUMNS * EXCEPT (operation, sequenceNum).

Diese Klausel ist optional.

Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn die COLUMNS-Klausel nicht angegeben ist.
STORED AS

Gibt an, ob Datensätze als SCD-Typ 1 oder SCD-Typ 2 gespeichert werden sollen.

Diese Klausel ist optional.

Der Standardwert ist SCD-Typ 1.
TRACK HISTORY ON

Gibt eine Teilmenge der Ausgabespalten an, um Verlaufsdatensätze zu generieren, wenn Änderungen an diesen angegebenen Spalten vorgenommen werden. Sie haben folgende Möglichkeiten:

* Geben Sie die vollständige Liste der nachzuverfolgenden Spalten an: COLUMNS (userId, name, city).
* Geben Sie eine Liste von Spalten an, die von der Nachverfolgung ausgeschlossen werden sollen: COLUMNS * EXCEPT (operation, sequenceNum)

Diese Klausel ist optional. Der Standardwert ist das Nachverfolgen des Verlaufs für alle Ausgabespalten, wenn Änderungen vorhanden sind. Dies entspricht TRACK HISTORY ON *.