Werken met deltatabellen in Spark
U kunt op verschillende manieren werken met deltatabellen (of delta-indelingsbestanden) om gegevens op verschillende manieren op te halen en te wijzigen.
Spark SQL gebruiken
De meest voorkomende manier om met gegevens in deltatabellen in Spark te werken, is door Spark SQL te gebruiken. U kunt SQL-instructies insluiten in andere talen (zoals PySpark of Scala) met behulp van de spark.sql-bibliotheek . Met de volgende code wordt bijvoorbeeld een rij ingevoegd in de tabel producten .
spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")
U kunt ook de %%sql
magie in een notebook gebruiken om SQL-instructies uit te voeren.
%%sql
UPDATE products
SET Price = 2.49 WHERE ProductId = 1;
De Delta-API gebruiken
Als u wilt werken met Delta-bestanden in plaats van catalogustabellen, is het mogelijk eenvoudiger om de Delta Lake-API te gebruiken. U kunt een exemplaar van een DeltaTable maken op basis van een maplocatie met bestanden in delta-indeling en vervolgens de API gebruiken om de gegevens in de tabel te wijzigen.
from delta.tables import *
from pyspark.sql.functions import *
# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
Tijdreizen gebruiken om te werken met tabelversiebeheer
Wijzigingen in deltatabellen worden vastgelegd in het transactielogboek voor de tabel. U kunt de geregistreerde transacties gebruiken om de geschiedenis van wijzigingen in de tabel weer te geven en oudere versies van de gegevens op te halen (ook wel tijdreizen genoemd)
Als u de geschiedenis van een tabel wilt bekijken, kunt u de DESCRIBE
SQL-opdracht gebruiken, zoals hier wordt weergegeven.
%%sql
DESCRIBE HISTORY products
De resultaten van deze instructie geven de transacties weer die zijn toegepast op de tabel, zoals hier wordt weergegeven (sommige kolommen zijn weggelaten):
version | tijdstempel | schakelapparatuur optimaliseren | operationParameters |
---|---|---|---|
2 | 2023-04-04T21:46:43Z | UPDATE | {"predicaat":"(ProductId = 1)"} |
1 | 2023-04-04T21:42:48Z | SCHRIJVEN | {"mode":"Append","partitionBy":"[]"} |
0 | 2023-04-04T20:04:23Z | CREATE TABLE | {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}} |
Als u de geschiedenis van een externe tabel wilt bekijken, kunt u de maplocatie opgeven in plaats van de tabelnaam.
%%sql
DESCRIBE HISTORY 'Files/mytable'
U kunt gegevens ophalen uit een specifieke versie van de gegevens door de locatie van het Delta-bestand te lezen in een dataframe, waarbij u de versie opgeeft die is vereist als een versionAsOf
optie:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
U kunt ook een tijdstempel opgeven met behulp van de timestampAsOf
optie:
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)