Arbeiten mit Deltatabellen in Spark
Sie können mit Deltatabellen (oder Dateien im Deltaformat) arbeiten, um Daten auf verschiedene Arten abzurufen und zu ändern.
Verwendung von Spark SQL
Spark SQL ist die gängigste Methode zum Arbeiten mit Daten in Spark-Deltatabellen. Sie können SQL-Anweisungen in anderen Sprachen (z. B. PySpark oder Scala) einbetten, indem Sie die Bibliothek spark.sql verwenden. Der folgende Code fügt beispielsweise eine Zeile in die Tabelle products ein.
spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")
Alternativ können Sie den Magic-Befehl %%sql
in einem Notebook verwenden, um SQL-Anweisungen auszuführen.
%%sql
UPDATE products
SET Price = 2.49 WHERE ProductId = 1;
Verwendung der Delta-API
Wenn Sie mit Delta-Dateien anstelle von Katalogtabellen arbeiten möchten, ist es möglicherweise einfacher, die Delta Lake-API zu verwenden. Sie können eine Instanz einer Deltatabelle aus einem Ordner erstellen, der Änderungsdateien enthält, und dann die API verwenden, um die Daten in der Tabelle zu bearbeiten.
from delta.tables import *
from pyspark.sql.functions import *
# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
Verwenden von Zeitreisen für die Versionsverwaltung von Tabellen
Änderungen an Deltatabellen werden im Transaktionsprotokoll für die Tabelle protokolliert. Sie können die protokollierten Transaktionen verwenden, um den Verlauf der an der Tabelle vorgenommenen Änderungen anzuzeigen und ältere Versionen der Daten abzurufen (als Zeitreise bezeichnet).
Um den Verlauf einer Tabelle anzuzeigen, können Sie wie hier gezeigt den SQL-Befehl DESCRIBE
verwenden.
%%sql
DESCRIBE HISTORY products
Die Ergebnisse dieser Anweisung sind die Transaktionen, die auf die Tabelle angewendet wurden (einige Spalten wurden weggelassen):
version | timestamp | operation | operationParameters |
---|---|---|---|
2 | 2023-04-04T21:46:43Z | UPDATE | {"predicate":"(ProductId = 1)"} |
1 | 2023-04-04T21:42:48Z | WRITE | {"mode":"Append","partitionBy":"[]"} |
0 | 2023-04-04T20:04:23Z | CREATE TABLE | {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"} |
Um den Verlauf einer externen Tabelle anzuzeigen, können Sie anstelle des Tabellennamens den Ordnerspeicherort angeben.
%%sql
DESCRIBE HISTORY 'Files/mytable'
Sie können Daten aus einer bestimmten Datenversion abrufen, indem Sie den Speicherort der Änderungsdatei in einen Dataframe einlesen und dabei die gewünschte Version als versionAsOf
-Option angeben:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
Alternativ können Sie einen Zeitstempel mithilfe der Option timestampAsOf
angeben:
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)