Partager via


Vue d’ensemble de Linux Foundation Delta Lake

Pour plus de clarté, cet article a été adapté à partir de son équivalent d’origine ici. Cet article vous permet de découvrir rapidement les principales fonctionnalités de Delta Lake. L’article fournit des extraits de code qui montrent comment lire et écrire dans des tables Delta Lake à partir de requêtes interactives, par lots et de streaming. Les extraits de code sont également disponibles dans un ensemble de notebooks PySpark ici, Scala ici et C# ici

Nous allons aborder les sujets suivants :

  • Créer une table
  • Lire les données
  • Mettre à jour les données d’une table
  • Remplacer les données d’une table
  • Mettre à jour sous conditions sans remplacement
  • Lire des versions antérieures de données à l’aide de Voyage dans le temps
  • Écrire un flux de données dans une table
  • Lire un flux de modifications à partir d’une table
  • Prise en charge SQL

Configuration

Veillez à modifier l’extrait suivant en fonction de votre environnement.

import random

session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)

delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";

deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";

Retourne comme résultat :

« /delta/delta-table-335323 »

Créer une table

Pour créer une table Delta Lake, écrivez un élément DataFrame à partir d’un DataFrame au format delta. Vous pouvez remplacer le format Parquet, CSV, JSON, et ainsi de suite, par delta.

Le code suivant vous montre comment créer une table Delta Lake à l’aide du schéma déduit à partir de votre DataFrame.

data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)

Retourne comme résultat :

id
0
1
2
3
4

Lire les données

Vous lisez les données de votre table Delta Lake en spécifiant le chemin des fichiers et le format delta.

df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()

Retourne comme résultat :

id
1
3
4
0
2

L’ordre des résultats est différent de celui indiqué ci-dessus, car aucune commande n’a été spécifiée explicitement avant la sortie des résultats.

Mettre à jour les données d’une table

Delta Lake prend en charge plusieurs opérations pour modifier les tables en utilisant des API DataFrame standard. Ces opérations sont une des améliorations que le format delta ajoute. L’exemple suivant exécute un traitement par lots pour remplacer les données de la table.

data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()

Retourne comme résultat :

id
7
8
5
9
6

Ici, vous pouvez voir que les cinq enregistrements ont été mis à jour pour contenir de nouvelles valeurs.

Enregistrer en tant que tables de catalogue

Delta Lake peut écrire dans des tables de catalogue gérées ou externes.

data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show

Retourne comme résultat :

database tableName isTemporary
default externaldeltatable false
default manageddeltatable false

À l’aide de ce code, vous avez créé une table dans le catalogue depuis un DataFrame existant ; elle est appelée « table gérée ». Ensuite, vous avez défini une nouvelle table externe dans le catalogue qui utilise un emplacement existant ; elle est appelée « table externe ». Dans la sortie, vous pouvez voir les deux tables répertoriées dans le catalogue, peu importe la façon dont elles ont été créées.

Vous pouvez maintenant examiner les propriétés étendues de ces deux tables

spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)

Retourne comme résultat :

col_name data_type comment
id bigint null
Informations détaillées sur la table
Base de données default
Table de charge de travail manageddeltatable
Propriétaire trusted-service-user
Heure de création Sat Apr 25 00:35:34 UTC 2020
Dernier accès Thu Jan 01 00:00:00 UTC 1970
Créé par Spark 2.4.4.2.6.99.201-11401300
Type MANAGED
Fournisseur delta
Propriétés de la table [transient_lastDdlTime=1587774934]
Statistiques 2407 bytes
Emplacement abfss://data@<data lake>.dfs.core.windows.net/synapse/workspaces/<workspace name>/warehouse/manageddeltatable
Bibliothèque Serde org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Propriétés de stockage [serialization.format=1]
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)

Retourne comme résultat :

col_name data_type comment
id bigint null
Informations détaillées sur la table
Base de données default
Table de charge de travail externaldeltatable
Propriétaire trusted-service-user
Heure de création Sat Apr 25 00:35:38 UTC 2020
Dernier accès Thu Jan 01 00:00:00 UTC 1970
Créé par Spark 2.4.4.2.6.99.201-11401300
Type EXTERNAL
Fournisseur DELTA
Propriétés de la table [transient_lastDdlTime=1587774938]
Emplacement abfss://data@<data lake>.dfs.core.windows.net/delta/delta-table-587152
Bibliothèque Serde org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Propriétés de stockage [serialization.format=1]

Mettre à jour sous conditions sans remplacement

Delta Lake fournit des API programmatiques pour mettre à jour, supprimer et fusionner de façon conditionnelle (cette commande est communément appelée « upsert ») des données dans des tables.

from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;

var deltaTable = DeltaTable.ForPath(deltaTablePath);

deltaTable.Update(
  condition: Expr("id % 2 == 0"),
  set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath(deltaTablePath)

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show

Retourne comme résultat :

id
106
108
5
7
9

Ici, vous venez d’ajouter 100 à chaque ID pair.

delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show

Retourne comme résultat :

id
5
7
9

Remarquez que chaque ligne paire a été supprimée.

new_data = spark.range(0,20).alias("newData")

delta_table.alias("oldData")\
    .merge(new_data.alias("newData"), "oldData.id = newData.id")\
    .whenMatchedUpdate(set = { "id": lit("-1")})\
    .whenNotMatchedInsert(values = { "id": col("newData.id") })\
    .execute()

delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");

deltaTable
    .As("oldData")
    .Merge(newData, "oldData.id = newData.id")
    .WhenMatched()
        .Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
    .WhenNotMatched()
        .Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
    .Execute();

deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF

deltaTable.as("oldData").
  merge(
    newData.as("newData"),
    "oldData.id = newData.id").
  whenMatched.
  update(Map("id" -> lit(-1))).
  whenNotMatched.
  insert(Map("id" -> col("newData.id"))).
  execute()

deltaTable.toDF.show()

Retourne comme résultat :

id
18
15
19
2
1
6
8
3
-1
10
13
0
16
4
-1
12
11
14
-1
17

Ici, vous avez une combinaison des données existantes. La valeur -1 a été affectée aux données existantes dans le chemin du code de mise à jour (WhenMatched). Les données qui ont été créées en haut de l’extrait de code, et qui ont été ajoutées via le chemin du code d’insertion (WhenNotMatched), ont également été ajoutées.

Historique

Delta Lake permet d’examiner l’historique d’une table. Autrement dit, les modifications apportées à la table Delta sous-jacente. La cellule ci-dessous montre comme il est simple d’inspecter l’historique.

delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)

Retourne comme résultat :

version timestamp userId userName opération operationParameters travail notebook clusterId readVersion isolationLevel isBlindAppend
4 2020-04-25 00:36:27 null null MERGE [predicate -> (oldData.ID = newData.ID)] null null null 3 null false
3 2020-04-25 00:36:08 null null Suppression [predicate -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] null null null 2 null false
2 2020-04-25 00:35:51 null null UPDATE [predicate -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] null null null 1 null false
1 2020-04-25 00:35:05 null null WRITE [mode -> Overwrite, partitionBy -> []] null null null 0 null false
0 2020-04-25 00:34:34 null null WRITE [mode -> ErrorIfExists, partitionBy -> []] null null null null null true

Ici, vous pouvez voir toutes les modifications apportées aux extraits de code ci-dessus.

Lire des versions antérieures de données à l’aide de Voyage dans le temps

Il est possible d’interroger les captures instantanées précédentes de votre table Delta Lake en utilisant une fonctionnalité appelée Voyage dans le temps. Si vous souhaitez accéder aux données que vous avez remplacées, vous pouvez interroger un instantané de la table préexistant au remplacement du premier jeu de données à l’aide de l’option versionAsOf.

Après avoir exécuté la cellule ci-dessous, vous devez voir le premier jeu de données existant avant que vous ne le remplaciez. Voyage dans le temps est une fonctionnalité puissante qui exploite les capacités du journal des transactions Delta Lake pour accéder aux données qui ne se trouvent plus dans la table. La suppression de l’option version 0 (ou la spécification de version 1) vous permet de voir à nouveau les données plus récentes. Pour plus d’informations, consultez Interroger un instantané de table antérieur.

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()

Retourne comme résultat :

id
0
1
4
3
2

Ici, vous pouvez constater que vous avez rétabli la version la plus ancienne des données.

Écrire un flux de données dans une table

Vous pouvez également écrire dans une table Delta Lake à l’aide de Structured Streaming (Streaming structuré) de Spark. Le journal des transactions Delta Lake garantit un seul et unique traitement, même si d’autres flux ou requêtes par lot s’exécutent simultanément sur la table. Par défaut, les flux s’exécutent en mode ajout, ce qui adjoint de nouveaux enregistrements à la table.

Pour plus d’informations sur l’intégration de Delta Lake à Structured streaming, consultez Lectures et écritures de streaming de table.

Dans les cellules ci-dessous, voici les actions que nous accomplissons :

  • Cellule 30 Montrer les données ajoutées récemment
  • Cellule 31 Inspecter l’historique
  • Cellule 32 Arrêter le travail de streaming structuré
  • Cellule 33 Inspecter l’historique <-- Vous remarquerez que les ajouts ont été arrêtés

Tout d’abord, vous allez configurer un travail de streaming Spark simple pour générer une séquence et faire en sorte que le travail écrive dans votre table Delta.

streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
    .selectExpr("value as id")\
    .writeStream\
    .format("delta")\
    .option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
    .start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)

Lire un flux de modifications à partir d’une table

Pendant que le flux s’écrit dans la table Delta Lake, vous pouvez aussi lire cette table en tant que source de streaming. Par exemple, vous pouvez démarrer une autre requête de streaming qui imprime toutes les modifications apportées à la table Delta Lake.

delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show

Retourne comme résultat :

id
19
18
17
16
15
14
13
12
11
10
8
6
4
3
2
1
0
-1
-1
-1
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show

Retourne comme résultat :

version timestamp opération operationParameters readVersion
5 2020-04-25 00:37:09 STREAMING UPDATE [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 MERGE [predicate -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 Suppression [predicate -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 WRITE [mode -> Overwrite, partitionBy -> []] 0
0 2020-04-25 00:34:34 WRITE [mode -> ErrorIfExists, partitionBy -> []] null

Ici, vous supprimez quelques-unes des colonnes les moins intéressantes pour simplifier l’expérience de consultation de la vue d’historique.

stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show

Retourne comme résultat :

version timestamp opération operationParameters readVersion
5 2020-04-25 00:37:09 STREAMING UPDATE [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 MERGE [predicate -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 Suppression [predicate -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 WRITE [mode -> Overwrite, partitionBy -> []] 0
0 2020-04-25 00:34:34 WRITE [mode -> ErrorIfExists, partitionBy -> []] null

Convertir Parquet en Delta

Vous pouvez effectuer une conversion sur place du format Parquet en Delta.

Ici, vous allez tester si la table existante est ou non au format delta.

parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)

Retourne comme résultat :

Faux

Vous allez maintenant convertir les données au format delta et vérifier que l’opération a réussi.

DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Retourne comme résultat :

Vrai

Prise en charge SQL

Delta prend en charge les commandes utilitaires de table via SQL. Vous pouvez utiliser SQL pour :

  • Obtenir l’historique d’une table Delta
  • Nettoyer une table Delta
  • Convertir un fichier Parquet en Delta
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()

Retourne comme résultat :

version timestamp userId userName opération operationParameters travail notebook clusterId readVersion isolationLevel isBlindAppend
5 2020-04-25 00:37:09 null null STREAMING UPDATE [outputMode -> Ap... null null null 4 null true
4 2020-04-25 00:36:27 null null MERGE [predicate -> (ol... null null null 3 null false
3 2020-04-25 00:36:08 null null Suppression [predicate -> ["(... null null null 2 null false
2 2020-04-25 00:35:51 null null UPDATE [predicate -> ((i... null null null 1 null false
1 2020-04-25 00:35:05 null null WRITE [mode -> Overwrit... null null null 0 null false
0 2020-04-25 00:34:34 null null WRITE [mode -> ErrorIfE... null null null null null true
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()

Retourne comme résultat :

path
abfss://data@arca...

Maintenant, vous allez vérifier qu’un tableau n’est pas un tableau au format delta. Ensuite, vous allez convertir le tableau au format delta en utilisant Spark SQL et vérifier qu’il a été converti correctement.

parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId =  (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Retourne comme résultat :

Vrai

Pour obtenir une documentation complète, consultez la page Documentation de Delta Lake.

Pour plus d’informations, consultez Projet Delta Lake.

Étapes suivantes