Freigeben über


Lernprogramm: Erstellen und Verwalten von Delta Lake-Tabellen

In diesem Lernprogramm werden allgemeine Delta-Tabellenvorgänge mithilfe von Beispieldaten veranschaulicht. Delta Lake ist die optimierte Speicherebene, die die Grundlage für Tabellen auf Databricks bietet. Sofern nicht anders angegeben, sind alle Tabellen in Databricks Delta-Tabellen.

Bevor Sie anfangen

Um die Schritte dieses Tutorials abzuschließen, benötigen Sie Folgendes:

  • Berechtigung zum Verwenden einer vorhandenen Computeressource oder Erstellen einer neuen Computeressource. Siehe Compute.
  • Unity-Katalogberechtigungen: USE CATALOG, USE SCHEMA, und CREATE TABLE im workspace Katalog. Wenden Sie sich zum Festlegen dieser Berechtigungen an Ihre Databricks-Administratoren/-Administratorinnen, oder lesen Sie den Artikel Unity Catalog-Berechtigungen und sicherungsfähige Objekte, um mehr zu erfahren.

Diese Beispiele basieren auf einem Dataset namens Synthetic Person Records: 10K to 10M Records. Dieses Dataset enthält fiktive Datensätze von Personen, einschließlich Vornamen und Nachnamen, Geschlecht und Alter.

Laden Sie zuerst das Dataset für dieses Lernprogramm herunter.

  1. Besuchen Sie die Seite "Synthetische Personendatensätze: 10K bis 10M Datensätze " auf Kaggle.
  2. Klicken Sie auf "Herunterladen " und dann auf "Dataset herunterladen" als ZIP-Datei. Dadurch wird eine Datei namens archive.zip auf Ihren lokalen Computer heruntergeladen.
  3. Extrahieren Sie den archive Ordner aus der archive.zip Datei.

Laden Sie als Nächstes das person_10000.csv Dataset in ein Unity-Katalogvolume in Ihrem Azure Databricks-Arbeitsbereich hoch. Azure Databricks empfiehlt, Ihre Daten in ein Unity-Katalogvolume hochzuladen, da Volumes Funktionen für den Zugriff auf, das Speichern, Verwalten und Organisieren von Dateien bieten.

  1. Öffnen Sie den Katalog-Explorer, indem Sie auf das Datensymbol klicken.Katalog in der Randleiste.
  2. Klicken Sie im Katalog-Explorer auf Hinzufügen oder Plussymbol"Daten hinzufügen " und "Volume erstellen".
  3. Benennen Sie das Volume my-volume , und wählen Sie "Verwaltetes Volume " als Volumetyp aus.
  4. Wählen Sie den workspace Katalog und das default Schema aus, und klicken Sie dann auf "Erstellen".
  5. Öffnen my-volume und klicken Sie auf "Auf dieses Volume hochladen".
  6. Ziehen und ablegen oder navigieren Sie zu der Datei und wählen Sie sie person_10000.csv im archive Ordner auf Ihrem lokalen Computer aus.
  7. Klicken Sie auf Hochladen.

Erstellen Sie zuletzt ein Notizbuch zum Ausführen des Beispielcodes.

  1. Klicken Sie in der Randleiste auf Neu ".
  2. Klicken Sie auf das Symbol Notizbuch zum Erstellen eines neuen Notizbuchs.
  3. Wählen Sie eine Sprache für das Notizbuch aus.

Erstellen einer Tabelle

Erstellen Sie eine neue Unity-Katalog verwaltete Tabelle workspace.default.people_10k aus person_10000.csv. Delta Lake ist die Standardeinstellung für alle Tabellenerstellungs-, Lese- und Schreibbefehle in Azure Databricks.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/workspace/default/my-volume/person_10000.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()

# If you know the table does not already exist, you can use this command instead.
# df.write.saveAsTable("workspace.default.people_10k")

# View the new table.
df = spark.read.table("workspace.default.people_10k")
display(df)

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, true),
  StructField("firstName", StringType, true),
  StructField("lastName", StringType, true),
  StructField("gender", StringType, true),
  StructField("age", IntegerType, true)
))

val df = spark.read
  .format("csv")
  .option("header", true)
  .schema(schema)
  .load("/Volumes/workspace/default/my-volume/person_10000.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()

// If you know the table does not already exist, you can use this command instead.
// df.saveAsTable("workspace.default.people_10k")

// View the new table.
val df2 = spark.read.table("workspace.default.people_10k")
display(df2)

SQL

-- Create the table with only the required columns and rename person_id to id.
CREATE OR REPLACE TABLE workspace.default.people_10k AS
SELECT
  person_id AS id,
  firstname,
  lastname,
  gender,
  age
FROM read_files(
  '/Volumes/workspace/default/my-volume/person_10000.csv',
  format => 'csv',
  header => true
);

-- View the new table.
SELECT * FROM workspace.default.people_10k;

Es gibt verschiedene Möglichkeiten zum Erstellen oder Klonen von Tabellen. Weitere Informationen finden Sie unter CREATE TABLE.

In Databricks Runtime 13.3 LTS und höher können Sie mit CREATE TABLE LIKE eine neue leere Delta-Tabelle erstellen, die die Schema- und Tabelleneigenschaften einer Quell-Delta-Tabelle dupliziert. Dies kann nützlich sein, wenn Tabellen aus einer Entwicklungsumgebung in die Produktion übertragen werden.

CREATE TABLE workspace.default.people_10k_prod LIKE workspace.default.people_10k

Wichtig

Dieses Feature befindet sich in der Public Preview.

Verwenden Sie die DeltaTableBuilder API für Python und Scala , um eine leere Tabelle zu erstellen. Im Vergleich zu DataFrameWriter und DataFrameWriterV2, erleichtert die DeltaTableBuilder API die Angabe zusätzlicher Informationen wie Spaltenkommentare, Tabelleneigenschaften und generierte Spalten.

Python

from delta.tables import DeltaTable

(
  DeltaTable.createIfNotExists(spark)
    .tableName("workspace.default.people_10k_2")
    .addColumn("id", "INT")
    .addColumn("firstName", "STRING")
    .addColumn("lastName", "STRING", comment="surname")
    .addColumn("gender", "STRING")
    .addColumn("age", "INT")
    .execute()
)

display(spark.read.table("workspace.default.people_10k_2"))

Scala

import io.delta.tables.DeltaTable

DeltaTable.createOrReplace(spark)
  .tableName("workspace.default.people_10k")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build()
  )
  .addColumn("gender", "STRING")
  .addColumn("age", "INT")
  .execute()

display(spark.read.table("workspace.default.people_10k"))

Upsert in eine Tabelle

Ändern Sie vorhandene Datensätze in einer Tabelle, oder fügen Sie neue Datensätze mithilfe eines Vorgangs namens "upsert" hinzu. Um eine Reihe von Updates und Einfügungen in eine vorhandene Delta-Tabelle zusammenzuführen, verwenden Sie die DeltaTable.merge Methode in Python und Scala und die MERGE INTO Anweisung in SQL.

Führen Sie z. B. Daten aus der Quelltabelle people_10k_updates mit der Ziel-Delta-Tabelle workspace.default.people_10kzusammen. Wenn in beiden Tabellen eine übereinstimmende Zeile vorhanden ist, wird die Datenspalte in Delta Lake mithilfe des angegebenen Ausdrucks aktualisiert. Wenn keine übereinstimmende Zeile vorhanden ist, wird in Delta Lake eine neue Zeile hinzugefügt.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

data = [
  (10001, 'Billy', 'Luppitt', 'M', 55),
  (10002, 'Mary', 'Smith', 'F', 98),
  (10003, 'Elias', 'Leadbetter', 'M', 48),
  (10004, 'Jane', 'Doe', 'F', 30),
  (10005, 'Joshua', '', 'M', 90),
  (10006, 'Ginger', '', 'F', 16),
]

# Create the source table if it does not exist. Otherwise, replace the existing source table.
people_10k_updates = spark.createDataFrame(data, schema)
people_10k_updates.createOrReplaceTempView("people_10k_updates")

# Merge the source and target tables.
deltaTable = DeltaTable.forName(spark, 'workspace.default.people_10k')

(deltaTable.alias("people_10k")
  .merge(
    people_10k_updates.alias("people_10k_updates"),
    "people_10k.id = people_10k_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

# View the additions to the table.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] >= 10001)
display(df_filtered)

Scala

import org.apache.spark.sql.types._
import io.delta.tables._

// Define schema
val schema = StructType(Array(
  StructField("id", IntegerType, true),
  StructField("firstName", StringType, true),
  StructField("lastName", StringType, true),
  StructField("gender", StringType, true),
  StructField("age", IntegerType, true)
))

// Create data as Seq of Tuples
val data = Seq(
  (10001, "Billy", "Luppitt", "M", 55),
  (10002, "Mary", "Smith", "F", 98),
  (10003, "Elias", "Leadbetter", "M", 48),
  (10004, "Jane", "Doe", "F", 30),
  (10005, "Joshua", "", "M", 90),
  (10006, "Ginger", "", "F", 16)
)

// Create DataFrame directly from Seq of Tuples
val people_10k_updates = spark.createDataFrame(data).toDF(
  "id", "firstName", "lastName", "gender", "age"
)
people_10k_updates.createOrReplaceTempView("people_10k_updates")

// Merge the source and target tables
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

deltaTable.as("people_10k")
  .merge(
    people_10k_updates.as("people_10k_updates"),
    "people_10k.id = people_10k_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

// View the additions to the table.
val df = spark.read.table("workspace.default.people_10k")
val df_filtered = df.filter($"id" >= 10001)
display(df_filtered)

SQL

-- Create the source table if it does not exist. Otherwise, replace the existing source table.
CREATE OR REPLACE TABLE workspace.default.people_10k_updates(
  id INT,
  firstName STRING,
  lastName STRING,
  gender STRING,
  age INT
);
-- Insert new data into the source table.
INSERT INTO workspace.default.people_10k_updates VALUES
  (10001, "Billy", "Luppitt", "M", 55),
  (10002, "Mary", "Smith", "F", 98),
  (10003, "Elias", "Leadbetter", "M", 48),
  (10004, "Jane", "Doe", "F", 30),
  (10005, "Joshua", "", "M", 90),
  (10006, "Ginger", "", "F", 16);

-- Merge the source and target tables.
MERGE INTO workspace.default.people_10k AS people_10k
USING workspace.default.people_10k_updates AS people_10k_updates
ON people_10k.id = people_10k_updates.id
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *;

-- View the additions to the table.
SELECT * FROM workspace.default.people_10k WHERE id >= 10001

In SQL aktualisiert oder fügt der * Operator alle Spalten in der Zieltabelle ein, vorausgesetzt, die Quelltabelle hat die gleichen Spalten wie die Zieltabelle. Wenn die Zieltabelle nicht über dieselben Spalten verfügt, löst die Abfrage einen Analysefehler aus. Außerdem müssen Sie einen Wert für jede Spalte in der Tabelle angeben, wenn Sie einen Einfügevorgang ausführen. Die Spaltenwerte können leer sein, ''z. B. . Wenn Sie einen Einfügevorgang ausführen, müssen Sie nicht alle Werte aktualisieren.

Lesen einer Tabelle

Verwenden Sie den Tabellennamen oder Pfad, um auf Daten in Delta-Tabellen zuzugreifen. Verwenden Sie einen vollqualifizierten Tabellennamen, um auf verwaltete Tabellen im Unity-Katalog zuzugreifen. Der pfadbasierte Zugriff wird nur für Volumes und externe Tabellen unterstützt, nicht für verwaltete Tabellen. Weitere Informationen finden Sie unter Pfadregeln und Zugriff in Unity-Katalogvolumes.

Python

people_df = spark.read.table("workspace.default.people_10k")
display(people_df)

Scala

val people_df = spark.read.table("workspace.default.people_10k")
display(people_df)

SQL

SELECT * FROM workspace.default.people_10k;

Schreiben in eine Tabelle

Delta Lake verwendet die Standardsyntax zum Schreiben von Daten in Tabellen. Verwenden Sie den Anfügemodus, um einer vorhandenen Delta-Tabelle neue Daten hinzuzufügen. Im Gegensatz zum Upserting wird beim Schreiben in eine Tabelle nicht nach doppelten Datensätzen gesucht.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

data = [
  (10007, 'Miku', 'Hatsune', 'F', 25)
]

# Create the new data.
df  = spark.createDataFrame(data, schema)

# Append the new data to the target table.
df.write.mode("append").saveAsTable("workspace.default.people_10k")

# View the new addition.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] == 10007)
display(df_filtered)

Scala

// Create the new data.
val data = Seq(
  (10007, "Miku", "Hatsune", "F", 25)
)

val df = spark.createDataFrame(data)
  .toDF("id", "firstName", "lastName", "gender", "age")

// Append the new data to the target table
df.write.mode("append").saveAsTable("workspace.default.people_10k")

// View the new addition.
val df2 = spark.read.table("workspace.default.people_10k")
val df_filtered = df2.filter($"id" === 10007)
display(df_filtered)

SQL

CREATE OR REPLACE TABLE workspace.default.people_10k_new (
  id INT,
  firstName STRING,
  lastName STRING,
  gender STRING,
  age INT
);

-- Insert the new data.
INSERT INTO workspace.default.people_10k_new VALUES
  (10007, 'Miku', 'Hatsune', 'F', 25);

-- Append the new data to the target table.
INSERT INTO workspace.default.people_10k
SELECT * FROM workspace.default.people_10k_new;

-- View the new addition.
SELECT * FROM workspace.default.people_10k WHERE id = 10007;

In Databricks-Notizbuchzellen werden maximal 10.000 Zeilen oder 2 MB angezeigt, je nachdem, welcher Wert niedriger ist. Da workspace.default.people_10k mehr als 10.000 Zeilen enthalten sind, werden in der Notizbuchausgabe display(df)nur die ersten 10.000 Zeilen angezeigt. Die zusätzlichen Zeilen sind in der Tabelle vorhanden, werden jedoch aufgrund dieses Grenzwerts nicht in der Notizbuchausgabe gerendert. Sie können die zusätzlichen Zeilen anzeigen, indem Sie speziell nach ihnen filtern.

Verwenden Sie den Überschreibmodus, um alle Daten in einer Tabelle zu ersetzen.

Python

df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")

Scala

df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")

SQL

INSERT OVERWRITE TABLE workspace.default.people_10k SELECT * FROM workspace.default.people_10k_2

Aktualisieren einer Tabelle

Aktualisieren von Daten in einer Delta-Tabelle basierend auf einem Prädikat. Ändern Sie z. B. die Werte in der gender-Spalte von Female zu F, von Male zu M und von Other zu O.

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

# Declare the predicate and update rows using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'Female'",
  set = { "gender": "'F'" }
)

# Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'Male',
  set = { 'gender': lit('M') }
)

deltaTable.update(
  condition = col('gender') == 'Other',
  set = { 'gender': lit('O') }
)

# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

// Declare the predicate and update rows using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'Female'",
  Map("gender" -> "'F'")
)

// Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
  col("gender") === "Male",
  Map("gender" -> lit("M")));

deltaTable.update(
  col("gender") === "Other",
  Map("gender" -> lit("O")));

// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)

SQL

-- Declare the predicate and update rows.
UPDATE workspace.default.people_10k SET gender = 'F' WHERE gender = 'Female';
UPDATE workspace.default.people_10k SET gender = 'M' WHERE gender = 'Male';
UPDATE workspace.default.people_10k SET gender = 'O' WHERE gender = 'Other';

-- View the updated table.
SELECT * FROM workspace.default.people_10k;

Löschen aus einer Tabelle

Entfernen Sie Daten, die einem Prädikat aus einer Delta-Tabelle entsprechen. Der folgende Code veranschaulicht z. B. zwei Löschvorgänge: das erste Löschen von Zeilen, bei denen das Alter kleiner als 18 ist, und das Löschen von Zeilen, bei denen das Alter kleiner als 21 ist.

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

# Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")

# Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col('age') < '21')

# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

// Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")

// Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col("age") < "21")

// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)

SQL

-- Delete rows using a predicate.
DELETE FROM workspace.default.people_10k WHERE age < '21';

-- View the updated table.
SELECT * FROM workspace.default.people_10k;

Wichtig

Durch das Löschen werden die Daten aus der neuesten Version der Delta-Tabelle entfernt, aber nicht aus dem physischen Speicher entfernt, bis die alten Versionen explizit abgesaugt werden. Weitere Informationen finden Sie unter Vakuum.

Anzeigen des Tabellenverlaufs

Verwenden Sie die DeltaTable.history Methode in Python und Scala und die DESCRIBE HISTORY Anweisung in SQL, um die Provenienzinformationen für jeden Schreibvorgang in eine Tabelle anzuzeigen.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())

SQL

DESCRIBE HISTORY workspace.default.people_10k

Eine frühere Version der Tabelle mithilfe von Zeitsprung abfragen

Abfragen einer älteren Momentaufnahme einer Delta-Tabelle mithilfe der Delta Lake-Zeitreise. Verwenden Sie die Versionsnummer oder den Zeitstempel der Tabelle, um eine bestimmte Version abzufragen. Beispiel: Abfrageversion 0 oder Zeitstempel 2026-01-05T23:09:47.000+00:00 aus dem Tabellenverlauf.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaHistory = deltaTable.history()

# Query using the version number.
display(deltaHistory.where("version == 0"))

# Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
val deltaHistory = deltaTable.history()

// Query using the version number.
display(deltaHistory.where("version == 0"))

// Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))

SQL

-- Query using the version number
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;

-- Query using the timestamp
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';

Bei Zeitstempeln werden nur Datums- oder Zeitstempelzeichenfolgen akzeptiert. Beispielsweise müssen Zeichenfolgen als "2026-01-05T22:43:15.000+00:00" oder "2026-01-05 22:43:15" formatiert werden.

Verwenden Sie DataFrameReader Optionen, um einen DataFrame aus einer Delta-Tabelle zu erstellen, die auf eine bestimmte Version oder einen Zeitstempel der Tabelle festgelegt ist.

Python

# Query using the version number.
df = spark.read.option('versionAsOf', 0).table("workspace.default.people_10k")

# Query using the timestamp.
df = spark.read.option('timestampAsOf', '2026-01-05T23:09:47.000+00:00').table("workspace.default.people_10k")

display(df)

Scala

// Query using the version number.
val dfVersion = spark.read
  .option("versionAsOf", 0)
  .table("workspace.default.people_10k")

// Query using the timestamp.
val dfTimestamp = spark.read
  .option("timestampAsOf", "2026-01-05T23:09:47.000+00:00")
  .table("workspace.default.people_10k")

display(dfVersion)
display(dfTimestamp)

SQL

-- Create a temporary view from version 0 of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_v0 AS
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;

-- Create a temporary view from a previous timestamp of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_t0 AS
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';

SELECT * FROM people_10k_v0;
SELECT * FROM people_10k_t0;

Weitere Informationen finden Sie unter "Arbeiten mit Tabellenverlauf".

Optimieren einer Tabelle

Mehrere Änderungen an einer Tabelle können mehrere kleine Dateien erstellen, wodurch die Leseabfrageleistung verlangsamt wird. Verwenden Sie den Optimierungsvorgang, um die Geschwindigkeit zu verbessern, indem Sie kleine Dateien in größere Dateien kombinieren. Siehe OPTIMIZE.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE workspace.default.people_10k

Hinweis

Wenn die Vorhersageoptimierung aktiviert ist, müssen Sie sie nicht manuell optimieren. Predictive Optimization verwaltet Wartungsaufgaben automatisch. Weitere Informationen finden Sie unter Predictive Optimization für verwaltete Tabellen im Unity-Katalog.

Z-Reihenfolge nach Spalten

Um die Z-Ordnung von Daten zu erreichen und die Leseleistung weiter zu verbessern, geben Sie die Spalten an, nach denen sortiert werden soll. Sortieren Sie z. B. nach der Spalte firstName mit hoher Kardinalität. Weitere Informationen zur Z-Reihenfolge finden Sie unter „Datenüberspringen“.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")

SQL

OPTIMIZE workspace.default.people_10k
ZORDER BY (firstName)

Bereinigung von Momentaufnahmen mit der Vakuum-Operation

Delta Lake verfügt über Schnappschussisolation für Lesevorgänge, was bedeutet, dass es sicher ist, einen Optimierungsvorgang durchzuführen, während andere Benutzer oder Jobs die Tabelle abfragen. Sie sollten jedoch schließlich alte Momentaufnahmen bereinigen, da dadurch die Speicherkosten reduziert, die Abfrageleistung verbessert und die Datencompliance sichergestellt wird. Führen Sie den VACUUM Vorgang aus, um alte Momentaufnahmen zu bereinigen. Siehe VACUUM.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()

SQL

VACUUM workspace.default.people_10k

Weitere Informationen zur effektiven Verwendung des Vakuumvorgangs finden Sie unter Entfernen nicht verwendeter Datendateien mit Vakuum.

Nächste Schritte