Partage via


Tutoriel : Delta Lake

Ce tutoriel présente les opérations Delta Lake courantes sur Azure Databricks, dont les suivantes :

Vous pouvez exécuter l’exemple de code Python, R, Scala et SQL dans cet article à partir d’un notebook attaché à une ressource de calcul Azure Databricks telles qu’un cluster. Vous pouvez également exécuter le code SQL indiqué dans cet article à partir d’une requête associée à un entrepôt SQL dans Databricks SQL.

Préparer les données source

Ce tutoriel s’appuie sur un jeu de données appelé People 10 M. Il contient 10 millions d’enregistrements fictifs qui contiennent des informations sur des personnes, telles que le prénom et le nom, la date de naissance et le salaire. Ce tutoriel suppose que ce jeu de données se situe dans un volume Unity Catalog associé à votre espace de travail Azure Databricks cible.

Si vous souhaitez obtenir le jeu de données People 10 M pour ce tutoriel, effectuez ce qui suit :

  1. Accédez à la page People 10 M dans Kaggle.
  2. Cliquez sur Télécharger pour télécharger un fichier nommé archive.zip dans votre ordinateur local.
  3. Extrayez le fichier nommé export.csv à partir du fichier archive.zip. Le fichier export.csv contient les données pour ce tutoriel.

Pour charger le fichier export.csv dans le volume, effectuez ce qui suit :

  1. Dans la barre latérale, cliquez sur Catalogue.
  2. Dans l’Explorateur de catalogues, accédez et ouvrez le volume où vous souhaitez charger le fichier export.csv.
  3. Cliquez sur Charger vers ce volume.
  4. Glissez-déposez, ou accédez et sélectionnez, le fichier export.csv dans votre ordinateur local.
  5. Cliquez sur Télécharger.

Dans les exemples de code suivants, remplacez /Volumes/main/default/my-volume/export.csv par le chemin d’accès au fichier export.csv dans votre volume cible.

Créez une table

Toutes les tables créées sur Azure Databricks utilisent Delta Lake par défaut. Databricks recommande d’utiliser les tables gérées par Unity Catalog.

Dans l’exemple de code précédent et les exemples de code suivants, remplacez le nom de table main.default.people_10m par votre catalogue en trois parties cible, schéma et nom de table dans Unity Catalog.

Remarque

Delta Lake est la valeur par défaut pour toutes les commandes de lecture, d'écriture et de création de table Azure Databricks.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

Les opérations précédentes créent une table managée. Pour plus d’informations sur les options disponibles lors de la création d’une table Delta, consultez CREATE TABLE.

Dans Databricks Runtime 13.3 LTS et versions ultérieures, vous pouvez utiliser CREATE TABLE LIKE pour créer une table Delta vide qui duplique les propriétés de schéma et de table d’une table Delta source. Cela peut être particulièrement utile lors de la promotion de tables d’un environnement de développement en production, comme illustré dans l’exemple de code suivant :

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

Pour créer une table vide, vous pouvez également utiliser l’API DeltaTableBuilder dans Delta Lake pour Python et Scala. Par rapport aux API DataFrameWriter équivalentes, ces API permettent de spécifier plus facilement des informations supplémentaires telles que des commentaires de colonne, des propriétés de table et des colonnes générées.

Important

Cette fonctionnalité est disponible en préversion publique.

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Faire un upsert vers une table

Pour fusionner un ensemble de mises à jour et d’insertions dans une table Delta existante, vous utilisez l’instruction la méthode DeltaTable.merge pour Python et Scala et l’instruction MERGE INTO pour SQL. Par exemple, l’exemple suivant extrait des données de la table source et les fusionne dans la table Delta cible. Lorsqu’il existe une ligne correspondante dans les deux tables, Delta Lake met à jour la colonne de données à l’aide de l’expression donnée. En l’absence de ligne correspondante, Delta Lake ajoute une nouvelle ligne. Cette opération porte le nom d’upsert.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Dans SQL, si vous spécifiez *, l’opération met à jour et insère toutes les colonnes dans la table cible, en supposant que la table source a les mêmes colonnes que la table cible. Si la table cible n’a pas les mêmes colonnes, la requête déclenche une erreur d’analyse.

Vous devez spécifier une valeur pour chaque colonne dans votre table lorsque vous effectuez une opération d’insertion (par exemple lorsqu’il n’y a aucune ligne correspondante dans le jeu de données existant). Toutefois, vous n’avez pas besoin de mettre à jour toutes les valeurs.

Pour afficher les résultats, interrogez la table.

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

Lire une table

Vous accédez aux données d’une table Delta en indiquant le nom de la table ou le chemin de la table, comme cela est montré dans les exemples suivants :

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

Écrire dans une table

Delta Lake utilise la syntaxe standard pour écrire des données dans des tables.

Pour ajouter atomiquement de nouvelles données à une table Delta existante, utilisez le mode d’ajout comme illustré dans les exemples suivants :

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

Pour remplacer toutes les données dans une table, utilisez le mode remplacement comme dans les exemples suivants :

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

Mettre à jour une table

Vous pouvez mettre à jour des données correspondant à un prédicat dans une table Delta. Par exemple, dans un exemple de table people_10m, pour modifier une abréviation dans la colonne gender de M ou F en Male ou Female, vous pouvez exécuter la commande suivante :

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

Supprimer d’une table

Vous pouvez supprimer d’une table Delta des données correspondant à un prédicat. Par exemple, dans un exemple de table people_10m, pour supprimer toutes les lignes correspondant aux personnes ayant une valeur dans la colonne birthDate avant 1955, vous pouvez exécuter la commande suivante :

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

Important

La suppression supprime les données de la dernière version de la table Delta, mais ne les supprime pas du stockage physique tant que les anciennes versions n’ont pas été explicitement vidées de la mémoire. Pour plus de détails, consultez vacuum.

Afficher l’historique d’une table

Pour afficher l’historique d’une table, vous utilisez la méthode DeltaTable.historypour Python et Scala et l’instruction DESCRIBE HISTORY dans SQL, qui fournit des informations sur la provenance, notamment la version de table, l’opération, l’utilisateur, etc., pour chaque écriture dans une table.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

Interroger une version antérieure d’une table (voyage dans le temps)

Le voyage dans le temps Delta Lake vous permet d’interroger un ancien instantané d’une table Delta.

Pour interroger une version antérieure d’une table, spécifiez la version ou le timestamp de la version. Par exemple, pour interroger la version 0 ou timestamp 2024-05-15T22:43:15.000+00:00Z à partir de l’historique précédent, utilisez ce qui suit :

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Pour les timestamps, seules les chaînes de date ou de timestamp sont acceptées, par exemple "2024-05-15T22:43:15.000+00:00" ou "2024-05-15 22:43:15".

Les options DataFrameReader vous permettent de créer un DataFrame à partir d’une table Delta qui est définie sur une version ou un timestamp spécifique de la table, par exemple :

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

Pour plus d’informations, consultez Utiliser l’historique des tables Delta Lake.

Optimiser une table

Après avoir apporté de nombreuses modifications à une table, il est possible que vous ayez un grand nombre de petits fichiers. Pour améliorer la rapidité des requêtes de lecture, vous pouvez utiliser l’opération d’optimisation afin de combiner les fichiers de petite taille en fichiers plus gros :

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE main.default.people_10m

Organiser les colonnes selon l’ordre de plan

Pour améliorer encore davantage les performances de lecture, vous pouvez colocaliser les informations associées dans le même jeu de fichiers en procédant à une mise en ordre Z. Cette colocalisation est utilisée par les algorithmes ignorant les données Delta Lake pour réduire considérablement la quantité de données qui doit être lue. Pour procéder à une mise en ordre Z, vous spécifiez les colonnes à trier dans l’ordre Z par opération. Par exemple, pour colocaliser par gender, exécutez :

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

Pour voir l’ensemble complet des options disponibles quand vous exécutez l’opération d’optimisation, consultez Optimiser la disposition des fichiers de données.

Nettoyer les instantanés avec VACUUM

Delta Lake fournit une isolation des instantanés pour les lectures, ce qui signifie qu’il est sans danger d’exécuter une opération d’optimisation même pendant que d’autres utilisateurs ou travaux interrogent la table. Toutefois, il est préférable de nettoyer les anciens instantanés. Pour ce faire, exécutez l’opération de nettoyage :

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

Pour obtenir plus d’informations sur l’opération de nettoyage efficace, consultez Supprimer les fichiers de données inutilisés avec le nettoyage.