Utiliser des tables delta dans Spark

Effectué

Vous pouvez utiliser des tables delta (ou des fichiers de format delta) pour récupérer et modifier des données de plusieurs façons.

Utilisation de Spark SQL

La façon la plus courante d’utiliser des données dans des tables delta dans Spark consiste à utiliser Spark SQL. Vous pouvez incorporer des instructions SQL dans d’autres langages (tels que PySpark ou Scala) à l’aide de la bibliothèque spark.sql . Par exemple, le code suivant insère une ligne dans la table products .

spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")

Vous pouvez également utiliser la fonctionnalité magique %%sql dans un notebook pour exécuter des instructions SQL.

%%sql

UPDATE products
SET ListPrice = 2.49 WHERE ProductId = 1;

Utiliser l’API Delta

Lorsque vous souhaitez utiliser des fichiers delta plutôt que des tables de catalogue, il peut être plus simple d’utiliser l’API Delta Lake. Vous pouvez créer une instance d’un DeltaTable à partir d’un emplacement de dossier contenant des fichiers au format delta, puis utiliser l’API pour modifier les données de la table.

from delta.tables import *
from pyspark.sql.functions import *

# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

Recourir au voyage dans le temps pour utiliser le contrôle de versions de table

Les modifications apportées aux tables delta sont journalisées dans le journal des transactions de la table. Vous pouvez utiliser les transactions journalisées pour afficher l’historique des modifications apportées à la table et récupérer les versions antérieures des données (appelées voyages temporels)

Pour afficher l’historique d’une table, vous pouvez utiliser la DESCRIBE commande SQL comme indiqué ici.

%%sql

DESCRIBE HISTORY products

Le résultat de cette instruction indique les transactions qui ont été appliquées à la table, comme illustré ici (certaines colonnes ont été omises) :

Version horodatage opération paramètres d'opération
2 2023-04-04T21:46:43Z MISE À JOUR {"predicate » :"(ProductId = 1)"}
1 2023-04-04T21:42:48Z ÉCRIRE {"mode » :"Ajouter »,"partitionBy » :"[]"}
0 2023-04-04T20:04:23Z CRÉER TABLE {"isManaged » :"true »,"description » :null,"partitionBy » :"[] »,"properties » :"{}"}

Pour afficher l’historique d’une table externe, vous pouvez spécifier l’emplacement du dossier au lieu du nom de la table.

%%sql

DESCRIBE HISTORY 'Files/mytable'

Vous pouvez récupérer des données à partir d’une version spécifique des données en lisant l’emplacement du fichier delta dans un dataframe, en spécifiant la version requise comme versionAsOf option :

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)

Vous pouvez également spécifier un timestamp à l’aide de l’option timestampAsOf :

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)