Tutorial: Delta Lake
In diesem Tutorial werden allgemeine Delta Lake-Vorgänge in Azure Databricks vorgestellt, einschließlich der folgenden:
- Erstellen einer Tabelle
- Upsert in eine Tabelle
- Lesen aus einer Tabelle
- Anzeigen des Tabellenverlaufs
- Abfragen einer früheren Version einer Tabelle
- Optimieren einer Tabelle
- Hinzufügen eines Indexes mit Z-Reihenfolge
- Bereinigen nicht referenzierter Dateien
Sie können den Python-, Scala- und SQL-Beispielcode in diesem Artikel in einem Notebook ausführen, das an eine Computeressource von Azure Databricks, wie z. B. ein Cluster, angefügt ist. Sie können den SQL-Code in diesem Artikel auch in einer Abfrage ausführen, die einem SQL-Warehouse in Databricks SQL zugeordnet ist.
Vorbereiten der Quelldaten
Dieses Tutorial basiert auf einem Dataset namens „Personen 10 M“. Es enthält 10 Millionen fiktive Datensätze, die Angaben zu Personen enthalten, wie Vorname und Nachnamen, Geburtsdatum und Gehalt. In diesem Tutorial wird davon ausgegangen, dass sich dieses Dataset in einem Unity-Katalogvolume befindet, das Ihrem Azure Databricks-Zielarbeitsbereich zugeordnet ist.
Gehen Sie wie folgt vor, um das Dataset „Personen 10 M“ für dieses Tutorial abzurufen:
- Wechseln Sie zur Seite Personen 10 M in Kaggle.
- Klicken Sie auf Download, um eine Datei namens
archive.zip
auf Ihren lokalen Computer herunterzuladen. - Extrahieren Sie die Datei namens
export.csv
aus derarchive.zip
-Datei. Dieexport.csv
-Datei enthält die Daten für dieses Tutorial.
Gehen Sie wie folgt vor, um die export.csv
-Datei in das Volume hochzuladen:
- Klicken Sie auf der Seitenleiste auf Katalog.
- Navigieren Sie im Katalog-Explorer zu dem Volume, in das Sie die Datei hochladen möchten, und öffnen Sie sie die
export.csv
-Datei. - Klicken Sie auf die Schaltfläche Upload to this volume (In dieses Volume hochladen).
- Ziehen sie die
export.csv
-Datei und legen Sie sie ab, oder navigieren Sie zu der Datei auf Ihrem lokalen Computer und wählen Sie sie aus. - Klicken Sie auf Hochladen.
Ersetzen Sie in den folgenden Codebeispielen /Volumes/main/default/my-volume/export.csv
durch den Pfad zur Datei export.csv
in Ihrem Zielvolume.
Erstellen einer Tabelle
In Azure Databricks erstellte Tabellen verwenden standardmäßig das Delta Lake-Protokoll. Databricks empfiehlt die Verwendung von verwalteten Unity Catalog-Tabellen.
Ersetzen Sie im vorherigen Codebeispiel und den folgenden Codebeispielen den Tabellennamen main.default.people_10m
durch ihren dreiteiligen Zielkatalog, Schema und Tabellennamen im Unity-Katalog.
Hinweis
Delta Lake ist die Standardeinstellung für alle Lese-, Schreib- und Tabellenerstellungsbefehle für Azure Databricks.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
SQL
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
Die vorherigen Vorgänge erstellen eine neue verwaltete Tabelle. Informationen zu den verfügbaren Optionen beim Erstellen einer Delta-Tabelle finden Sie unter CREATE TABLE.
In Databricks Runtime 13.3 LTS und höher können Sie mit CREATE TABLE LIKE eine neue leere Delta-Tabelle erstellen, welche die Schema- und Tabelleneigenschaften einer Delta-Quelltabelle dupliziert. Dies kann besonders nützlich sein, wenn Tabellen aus einer Entwicklungsumgebung in die Produktion hochgestuft werden, wie im folgenden Codebeispiel gezeigt:
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
Um eine leere Tabelle zu erstellen, können Sie auch die DeltaTableBuilder
-API in Delta Lake für Python und Scala verwenden. Im Vergleich zu entsprechenden DataFrameWriter-APIs erleichtern diese APIs die Angabe zusätzlicher Informationen, darunter Spaltenkommentare, Tabelleneigenschaften und generierte Spalten.
Wichtig
Dieses Feature befindet sich in der Public Preview.
Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Upsert in eine Tabelle
Um eine Reihe von Updates und Einfügungen in einer vorhandene Delta-Tabelle zusammenzuführen, verwenden Sie die DeltaTable.merge
-Methode für Python und Scala sowie die MERGE INTO-Anweisung für SQL. Mit der folgenden Anweisung werden beispielsweise Daten aus der Quelltabelle übernommen und mit der Delta-Zieltabelle zusammengeführt. Wenn in beiden Tabellen eine übereinstimmende Zeile vorhanden ist, wird die Datenspalte in Delta Lake mithilfe des angegebenen Ausdrucks aktualisiert. Wenn keine übereinstimmende Zeile vorhanden ist, wird in Delta Lake eine neue Zeile hinzugefügt. Dieser Vorgang wird als Upsert bezeichnet.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
SQL
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Wenn Sie in SQL *
angeben, werden alle Spalten in der Zieltabelle aktualisiert oder eingefügt, vorausgesetzt, die Quelltabelle enthält die gleichen Spalten wie die Zieltabelle. Wenn die Zieltabelle nicht über dieselben Spalten verfügt, löst die Abfrage einen Analysefehler aus.
Beim Ausführen eines Einfügevorgangs muss für jede Spalte in der Tabelle ein Wert angegeben werden (z. B. wenn im vorhandenen Dataset keine übereinstimmende Zeile vorhanden ist). Es müssen jedoch nicht alle Werte aktualisiert werden.
Fragen Sie die Tabelle ab, um die Ergebnisse anzuzeigen.
Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SQL
SELECT * FROM main.default.people_10m WHERE id >= 9999998
Lesen einer Tabelle
Sie greifen auf Daten in Delta-Tabellen über den Tabellennamen oder -pfad zu, wie in den folgenden Beispielen gezeigt:
Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)
Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SQL
SELECT * FROM main.default.people_10m;
Schreiben in eine Tabelle
Delta Lake verwendet Standardsyntax zum Schreiben von Daten in Tabellen.
Verwenden Sie den Anfügemodus, wie in den folgenden Beispielen gezeigt, um einer vorhandenen Delta-Tabelle atomisch neue Daten hinzuzufügen:
Python
df.write.mode("append").saveAsTable("main.default.people_10m")
Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
SQL
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
Um alle Daten in einer Tabelle zu ersetzen, verwenden Sie den Überschreibmodus wie in den folgenden Beispielen:
Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
SQL
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
Aktualisieren einer Tabelle
Sie können Daten aktualisieren, die einem Prädikat in einer Delta-Tabelle entsprechen. Beispielsweise können Sie in der Beispieltabelle namens people_10m
eine Abkürzung in der gender
-Spalte von M
oder F
zu Male
oder Female
zu ändern:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
SQL
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
Löschen aus einer Tabelle
Sie können Daten, die einem Prädikat entsprechen, aus einer Delta-Tabelle entfernen. In der Beispieltabelle namens people_10m
können Sie beispielsweise Folgendes ausführen, um alle Zeilen zu löschen, die Personen mit einem Wert in der Spalte birthDate
vor 1955
entsprechen:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
SQL
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
Wichtig
Durch das Löschen werden die Daten aus der neuesten Version der Delta-Tabelle, aber nicht aus dem physischen Speicher entfernt, bis die alten Versionen explizit abgesaugt werden. Details finden Sie unter Vacuum.
Anzeigen des Tabellenverlaufs
Um den Verlauf einer Tabelle anzuzeigen, verwenden Sie die DeltaTable.history
-Methode für Python und Scala, sowie die DESCRIBE HISTORY-Anweisung in SQL, die Herkunftsinformationen bereitstellt, einschließlich der Tabellenversion, des Vorgangs, des Benutzers usw. für jeden Schreibvorgang in einer Tabelle.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
SQL
DESCRIBE HISTORY main.default.people_10m
Abfragen einer früheren Version der Tabelle (Zeitreise)
Mit einer Delta Lake-Zeitreise können Sie eine ältere Momentaufnahme einer Delta-Tabelle abfragen.
Um eine ältere Version einer Tabelle abzufragen, geben Sie die Version oder den Zeitstempel der Tabelle an. Wenn Sie beispielsweise Version 0 oder Zeitstempel 2024-05-15T22:43:15.000+00:00Z
aus dem vorherigen Verlauf abfragen möchten, verwenden Sie Folgendes:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
Als Zeitstempel werden nur Datums- oder Zeitstempelzeichenfolgen akzeptiert, z. B. "2024-05-15T22:43:15.000+00:00"
oder "2024-05-15 22:43:15"
.
Mit den DataFrameReader-Optionen können Sie beispielsweise in Python einen DataFrame aus einer Delta-Tabelle erstellen, der auf eine bestimmte Version oder einen Zeitstempel der Tabelle festgelegt ist:
Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
Weitere Informationen finden Sie unter Arbeiten mit dem Delta Lake-Tabellenverlauf.
Optimieren einer Tabelle
Nachdem Sie mehrere Änderungen an einer Tabelle vorgenommen haben, verfügen Sie möglicherweise über viele kleine Dateien. Zur Beschleunigung von Leseabfragen können Sie „Vorgang optimieren“ verwenden, um kleine Dateien zu größeren zusammenzufassen:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE main.default.people_10m
Z-Reihenfolge nach Spalten
Zur Verbesserung der Leseleistung können Sie verwandte Informationen in einer Dateiengruppe entsprechend der Z-Reihenfolge zusammenstellen. Delta Lake-Datensprungalgorithmen verwenden diese Kollokation, um die Menge der zu lesenden Daten erheblich zu reduzieren. Wenn Daten in der Z-Reihenfolge sortiert werden sollen, geben Sie die Spalten, nach denen sortiert werden soll, in der Z-Reihenfolge nach Vorgang an. Führen Sie beispielsweise zum Sortieren nach gender
den folgenden Code aus:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
SQL
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
Die vollständigen Optionen, die beim Ausführen des Optimierungsvorgangs verfügbar sind, finden Sie unter Optimieren des Datendateilayouts.
Bereinigen von Momentaufnahmen VACUUM
Delta Lake ermöglicht die Momentaufnahmenisolation für Lesevorgänge, was bedeutet, dass ein Optimierungsvorgang auch dann sicher ausgeführt werden kann, wenn die Tabelle von anderen Benutzern oder Aufträgen abgefragt wird. Irgendwann sollten Sie alte Momentaufnahmen jedoch bereinigen. Sie können dies tun, indem Sie den Vakuumbetrieb ausführen:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
SQL
VACUUM main.default.people_10m
Ausführliche Informationen zur effektiven Verwendung des Vakuumbetriebs finden Sie unter Entfernen nicht verwendeter Datendateien mit Vakuum.