Tutoriel : Delta Lake
Ce tutoriel présente les opérations Delta Lake courantes sur Azure Databricks, dont les suivantes :
- Créer une table.
- Faire un upsert vers une table.
- Lire dans une table.
- Afficher l’historique d’une table.
- Interroger une version antérieure d’une table.
- Optimiser une table.
- Ajouter un index d’ordre de plan.
- Nettoyer les fichiers non référencés.
Vous pouvez exécuter l’exemple de code Python, R, Scala et SQL dans cet article à partir d’un notebook attaché à une ressource de calcul Azure Databricks telles qu’un cluster. Vous pouvez également exécuter le code SQL indiqué dans cet article à partir d’une requête associée à un entrepôt SQL dans Databricks SQL.
Préparer les données source
Ce tutoriel s’appuie sur un jeu de données appelé People 10 M. Il contient 10 millions d’enregistrements fictifs qui contiennent des informations sur des personnes, telles que le prénom et le nom, la date de naissance et le salaire. Ce tutoriel suppose que ce jeu de données se situe dans un volume Unity Catalog associé à votre espace de travail Azure Databricks cible.
Si vous souhaitez obtenir le jeu de données People 10 M pour ce tutoriel, effectuez ce qui suit :
- Accédez à la page People 10 M dans Kaggle.
- Cliquez sur Télécharger pour télécharger un fichier nommé
archive.zip
dans votre ordinateur local. - Extrayez le fichier nommé
export.csv
à partir du fichierarchive.zip
. Le fichierexport.csv
contient les données pour ce tutoriel.
Pour charger le fichier export.csv
dans le volume, effectuez ce qui suit :
- Dans la barre latérale, cliquez sur Catalogue.
- Dans l’Explorateur de catalogues, accédez et ouvrez le volume où vous souhaitez charger le fichier
export.csv
. - Cliquez sur Charger vers ce volume.
- Glissez-déposez, ou accédez et sélectionnez, le fichier
export.csv
dans votre ordinateur local. - Cliquez sur Télécharger.
Dans les exemples de code suivants, remplacez /Volumes/main/default/my-volume/export.csv
par le chemin d’accès au fichier export.csv
dans votre volume cible.
Créez une table
Toutes les tables créées sur Azure Databricks utilisent Delta Lake par défaut. Databricks recommande d’utiliser les tables gérées par Unity Catalog.
Dans l’exemple de code précédent et les exemples de code suivants, remplacez le nom de table main.default.people_10m
par votre catalogue en trois parties cible, schéma et nom de table dans Unity Catalog.
Remarque
Delta Lake est la valeur par défaut pour toutes les commandes de lecture, d'écriture et de création de table Azure Databricks.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
SQL
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
Les opérations précédentes créent une table managée. Pour plus d’informations sur les options disponibles lors de la création d’une table Delta, consultez CREATE TABLE.
Dans Databricks Runtime 13.3 LTS et versions ultérieures, vous pouvez utiliser CREATE TABLE LIKE pour créer une table Delta vide qui duplique les propriétés de schéma et de table d’une table Delta source. Cela peut être particulièrement utile lors de la promotion de tables d’un environnement de développement en production, comme illustré dans l’exemple de code suivant :
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
Pour créer une table vide, vous pouvez également utiliser l’API DeltaTableBuilder
dans Delta Lake pour Python et Scala. Par rapport aux API DataFrameWriter équivalentes, ces API permettent de spécifier plus facilement des informations supplémentaires telles que des commentaires de colonne, des propriétés de table et des colonnes générées.
Important
Cette fonctionnalité est disponible en préversion publique.
Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Faire un upsert vers une table
Pour fusionner un ensemble de mises à jour et d’insertions dans une table Delta existante, vous utilisez l’instruction la méthode DeltaTable.merge
pour Python et Scala et l’instruction MERGE INTO pour SQL. Par exemple, l’exemple suivant extrait des données de la table source et les fusionne dans la table Delta cible. Lorsqu’il existe une ligne correspondante dans les deux tables, Delta Lake met à jour la colonne de données à l’aide de l’expression donnée. En l’absence de ligne correspondante, Delta Lake ajoute une nouvelle ligne. Cette opération porte le nom d’upsert.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
SQL
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Dans SQL, si vous spécifiez *
, l’opération met à jour et insère toutes les colonnes dans la table cible, en supposant que la table source a les mêmes colonnes que la table cible. Si la table cible n’a pas les mêmes colonnes, la requête déclenche une erreur d’analyse.
Vous devez spécifier une valeur pour chaque colonne dans votre table lorsque vous effectuez une opération d’insertion (par exemple lorsqu’il n’y a aucune ligne correspondante dans le jeu de données existant). Toutefois, vous n’avez pas besoin de mettre à jour toutes les valeurs.
Pour afficher les résultats, interrogez la table.
Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SQL
SELECT * FROM main.default.people_10m WHERE id >= 9999998
Lire une table
Vous accédez aux données d’une table Delta en indiquant le nom de la table ou le chemin de la table, comme cela est montré dans les exemples suivants :
Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)
Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SQL
SELECT * FROM main.default.people_10m;
Écrire dans une table
Delta Lake utilise la syntaxe standard pour écrire des données dans des tables.
Pour ajouter atomiquement de nouvelles données à une table Delta existante, utilisez le mode d’ajout comme illustré dans les exemples suivants :
Python
df.write.mode("append").saveAsTable("main.default.people_10m")
Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
SQL
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
Pour remplacer toutes les données dans une table, utilisez le mode remplacement comme dans les exemples suivants :
Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
SQL
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
Mettre à jour une table
Vous pouvez mettre à jour des données correspondant à un prédicat dans une table Delta. Par exemple, dans un exemple de table people_10m
, pour modifier une abréviation dans la colonne gender
de M
ou F
en Male
ou Female
, vous pouvez exécuter la commande suivante :
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
SQL
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
Supprimer d’une table
Vous pouvez supprimer d’une table Delta des données correspondant à un prédicat. Par exemple, dans un exemple de table people_10m
, pour supprimer toutes les lignes correspondant aux personnes ayant une valeur dans la colonne birthDate
avant 1955
, vous pouvez exécuter la commande suivante :
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
SQL
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
Important
La suppression supprime les données de la dernière version de la table Delta, mais ne les supprime pas du stockage physique tant que les anciennes versions n’ont pas été explicitement vidées de la mémoire. Pour plus de détails, consultez vacuum.
Afficher l’historique d’une table
Pour afficher l’historique d’une table, vous utilisez la méthode DeltaTable.history
pour Python et Scala et l’instruction DESCRIBE HISTORY dans SQL, qui fournit des informations sur la provenance, notamment la version de table, l’opération, l’utilisateur, etc., pour chaque écriture dans une table.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
SQL
DESCRIBE HISTORY main.default.people_10m
Interroger une version antérieure d’une table (voyage dans le temps)
Le voyage dans le temps Delta Lake vous permet d’interroger un ancien instantané d’une table Delta.
Pour interroger une version antérieure d’une table, spécifiez la version ou le timestamp de la version. Par exemple, pour interroger la version 0 ou timestamp 2024-05-15T22:43:15.000+00:00Z
à partir de l’historique précédent, utilisez ce qui suit :
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
Pour les timestamps, seules les chaînes de date ou de timestamp sont acceptées, par exemple "2024-05-15T22:43:15.000+00:00"
ou "2024-05-15 22:43:15"
.
Les options DataFrameReader vous permettent de créer un DataFrame à partir d’une table Delta qui est définie sur une version ou un timestamp spécifique de la table, par exemple :
Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
Pour plus d’informations, consultez Utiliser l’historique des tables Delta Lake.
Optimiser une table
Après avoir apporté de nombreuses modifications à une table, il est possible que vous ayez un grand nombre de petits fichiers. Pour améliorer la rapidité des requêtes de lecture, vous pouvez utiliser l’opération d’optimisation afin de combiner les fichiers de petite taille en fichiers plus gros :
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE main.default.people_10m
Organiser les colonnes selon l’ordre de plan
Pour améliorer encore davantage les performances de lecture, vous pouvez colocaliser les informations associées dans le même jeu de fichiers en procédant à une mise en ordre Z. Cette colocalisation est utilisée par les algorithmes ignorant les données Delta Lake pour réduire considérablement la quantité de données qui doit être lue. Pour procéder à une mise en ordre Z, vous spécifiez les colonnes à trier dans l’ordre Z par opération. Par exemple, pour colocaliser par gender
, exécutez :
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
SQL
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
Pour voir l’ensemble complet des options disponibles quand vous exécutez l’opération d’optimisation, consultez Optimiser la disposition des fichiers de données.
Nettoyer les instantanés avec VACUUM
Delta Lake fournit une isolation des instantanés pour les lectures, ce qui signifie qu’il est sans danger d’exécuter une opération d’optimisation même pendant que d’autres utilisateurs ou travaux interrogent la table. Toutefois, il est préférable de nettoyer les anciens instantanés. Pour ce faire, exécutez l’opération de nettoyage :
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
SQL
VACUUM main.default.people_10m
Pour obtenir plus d’informations sur l’opération de nettoyage efficace, consultez Supprimer les fichiers de données inutilisés avec le nettoyage.