Delta Lake-tabellen maken

Voltooid

Delta Lake is gebouwd op tabellen die een relationele opslagabstractie bieden voor bestanden in een data lake.

Een Delta Lake-tabel maken op basis van een dataframe

Een van de eenvoudigste manieren om een Delta Lake-tabel te maken, is door een dataframe op te slaan in de delta-indeling , waarbij een pad wordt opgegeven waarin de gegevensbestanden en gerelateerde metagegevensinformatie voor de tabel moeten worden opgeslagen.

De volgende PySpark-code laadt bijvoorbeeld een dataframe met gegevens uit een bestaand bestand en slaat dat dataframe vervolgens op in een nieuwe maplocatie in delta-indeling :

# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)

# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)

Nadat u de deltatabel hebt opgeslagen, bevat de padlocatie die u hebt opgegeven parquet-bestanden voor de gegevens (ongeacht de indeling van het bronbestand dat u in het dataframe hebt geladen) en een _delta_log map met het transactielogboek voor de tabel.

Notitie

In het transactielogboek worden alle gegevenswijzigingen in de tabel vastgelegd. Door elke wijziging bij te houden, kan transactionele consistentie worden afgedwongen en kan versiegegevens voor de tabel worden bewaard.

U kunt een bestaande Delta Lake-tabel vervangen door de inhoud van een dataframe met behulp van de overschrijfmodus , zoals hier wordt weergegeven:

new_df.write.format("delta").mode("overwrite").save(delta_table_path)

U kunt ook rijen uit een dataframe toevoegen aan een bestaande tabel met behulp van de toevoegmodus :

new_rows_df.write.format("delta").mode("append").save(delta_table_path)

Voorwaardelijke updates maken

Hoewel u gegevenswijzigingen in een dataframe kunt aanbrengen en vervolgens een Delta Lake-tabel kunt vervangen door deze te overschrijven, is het gebruikelijker om rijen in een bestaande tabel in te voegen, bij te werken of te verwijderen als discrete transactionele bewerkingen. Als u dergelijke wijzigingen wilt aanbrengen in een Delta Lake-tabel, kunt u het DeltaTable-object gebruiken in de Delta Lake-API, die update-, verwijder- en samenvoegbewerkingen ondersteunt. U kunt bijvoorbeeld de volgende code gebruiken om de prijskolom voor alle rijen bij te werken met een categoriekolomwaarde van 'Accessoires':

from delta.tables import *
from pyspark.sql.functions import *

# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

De gegevenswijzigingen worden vastgelegd in het transactielogboek en nieuwe Parquet-bestanden worden indien nodig in de tabelmap gemaakt.

Tip

Zie de Documentatie voor de Delta Lake-API voor meer informatie over het gebruik van de Data Lake-API.

Een query uitvoeren op een eerdere versie van een tabel

Delta Lake-tabellen ondersteunen versiebeheer via het transactielogboek. In het transactielogboek worden wijzigingen in de tabel vastgelegd, waarbij de tijdstempel en het versienummer voor elke transactie worden vermeld. U kunt deze vastgelegde versiegegevens gebruiken om eerdere versies van de tabel weer te geven, een functie die bekend staat als tijdreizen.

U kunt gegevens ophalen uit een specifieke versie van een Delta Lake-tabel door de gegevens van de locatie van de Delta-tabel te lezen in een dataframe, waarbij u de versie opgeeft die vereist is als een versionAsOf optie:

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)

U kunt ook een tijdstempel opgeven met behulp van de timestampAsOf optie:

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)