Werken met deltatabellen in Spark

Voltooid

U kunt op verschillende manieren werken met deltatabellen (of delta-indelingsbestanden) om gegevens op verschillende manieren op te halen en te wijzigen.

Spark SQL gebruiken

De meest voorkomende manier om met gegevens in deltatabellen in Spark te werken, is door Spark SQL te gebruiken. U kunt SQL-instructies insluiten in andere talen (zoals PySpark of Scala) met behulp van de spark.sql-bibliotheek . Met de volgende code wordt bijvoorbeeld een rij ingevoegd in de tabel producten .

spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")

U kunt ook de %%sql magie in een notebook gebruiken om SQL-instructies uit te voeren.

%%sql

UPDATE products
SET Price = 2.49 WHERE ProductId = 1;

De Delta-API gebruiken

Als u wilt werken met Delta-bestanden in plaats van catalogustabellen, is het mogelijk eenvoudiger om de Delta Lake-API te gebruiken. U kunt een exemplaar van een DeltaTable maken op basis van een maplocatie met bestanden in delta-indeling en vervolgens de API gebruiken om de gegevens in de tabel te wijzigen.

from delta.tables import *
from pyspark.sql.functions import *

# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

Tijdreizen gebruiken om te werken met tabelversiebeheer

Wijzigingen in deltatabellen worden vastgelegd in het transactielogboek voor de tabel. U kunt de geregistreerde transacties gebruiken om de geschiedenis van wijzigingen in de tabel weer te geven en oudere versies van de gegevens op te halen (ook wel tijdreizen genoemd)

Als u de geschiedenis van een tabel wilt bekijken, kunt u de DESCRIBE SQL-opdracht gebruiken, zoals hier wordt weergegeven.

%%sql

DESCRIBE HISTORY products

De resultaten van deze instructie geven de transacties weer die zijn toegepast op de tabel, zoals hier wordt weergegeven (sommige kolommen zijn weggelaten):

version tijdstempel schakelapparatuur optimaliseren operationParameters
2 2023-04-04T21:46:43Z UPDATE {"predicaat":"(ProductId = 1)"}
1 2023-04-04T21:42:48Z SCHRIJVEN {"mode":"Append","partitionBy":"[]"}
0 2023-04-04T20:04:23Z CREATE TABLE {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}}

Als u de geschiedenis van een externe tabel wilt bekijken, kunt u de maplocatie opgeven in plaats van de tabelnaam.

%%sql

DESCRIBE HISTORY 'Files/mytable'

U kunt gegevens ophalen uit een specifieke versie van de gegevens door de locatie van het Delta-bestand te lezen in een dataframe, waarbij u de versie opgeeft die is vereist als een versionAsOf optie:

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)

U kunt ook een tijdstempel opgeven met behulp van de timestampAsOf optie:

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)