Arbeiten mit Deltatabellen in Spark

Abgeschlossen

Sie können mit Deltatabellen (oder Dateien im Deltaformat) arbeiten, um Daten auf verschiedene Arten abzurufen und zu ändern.

Verwendung von Spark SQL

Spark SQL ist die gängigste Methode zum Arbeiten mit Daten in Spark-Deltatabellen. Sie können SQL-Anweisungen in anderen Sprachen (z. B. PySpark oder Scala) einbetten, indem Sie die Bibliothek spark.sql verwenden. Der folgende Code fügt beispielsweise eine Zeile in die Tabelle products ein.

spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")

Alternativ können Sie den Magic-Befehl %%sql in einem Notebook verwenden, um SQL-Anweisungen auszuführen.

%%sql

UPDATE products
SET Price = 2.49 WHERE ProductId = 1;

Verwendung der Delta-API

Wenn Sie mit Delta-Dateien anstelle von Katalogtabellen arbeiten möchten, ist es möglicherweise einfacher, die Delta Lake-API zu verwenden. Sie können eine Instanz einer Deltatabelle aus einem Ordner erstellen, der Änderungsdateien enthält, und dann die API verwenden, um die Daten in der Tabelle zu bearbeiten.

from delta.tables import *
from pyspark.sql.functions import *

# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

Verwenden von Zeitreisen für die Versionsverwaltung von Tabellen

Änderungen an Deltatabellen werden im Transaktionsprotokoll für die Tabelle protokolliert. Sie können die protokollierten Transaktionen verwenden, um den Verlauf der an der Tabelle vorgenommenen Änderungen anzuzeigen und ältere Versionen der Daten abzurufen (als Zeitreise bezeichnet).

Um den Verlauf einer Tabelle anzuzeigen, können Sie wie hier gezeigt den SQL-Befehl DESCRIBE verwenden.

%%sql

DESCRIBE HISTORY products

Die Ergebnisse dieser Anweisung sind die Transaktionen, die auf die Tabelle angewendet wurden (einige Spalten wurden weggelassen):

version timestamp operation operationParameters
2 2023-04-04T21:46:43Z UPDATE {"predicate":"(ProductId = 1)"}
1 2023-04-04T21:42:48Z WRITE {"mode":"Append","partitionBy":"[]"}
0 2023-04-04T20:04:23Z CREATE TABLE {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"}

Um den Verlauf einer externen Tabelle anzuzeigen, können Sie anstelle des Tabellennamens den Ordnerspeicherort angeben.

%%sql

DESCRIBE HISTORY 'Files/mytable'

Sie können Daten aus einer bestimmten Datenversion abrufen, indem Sie den Speicherort der Änderungsdatei in einen Dataframe einlesen und dabei die gewünschte Version als versionAsOf-Option angeben:

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)

Alternativ können Sie einen Zeitstempel mithilfe der Option timestampAsOf angeben:

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)