Mettre à jour un schéma de table Delta Lake
Delta Lake vous permet de mettre à jour le schéma d’une table. Les types d’applications suivants sont pris en charge :
- Ajout de nouvelles colonnes (à des positions arbitraires)
- Réorganisation des colonnes existantes
- Attribution d’un nouveau nom aux colonnes existantes
Vous pouvez apporter ces modifications explicitement à l’aide de DDL ou implicitement à l’aide de DML.
Important
Une mise à jour d’un schéma de table Delta est une opération qui entre en conflit avec toutes les opérations d’écriture Delta simultanées.
Lorsque vous mettez à jour un schéma de table Delta, les flux qui lisent à partir de cette table se terminent. Si vous souhaitez que le flux continue, vous devez le redémarrer. Pour plus d’informations, consultez Considérations relatives à la production pour les applications Structured Streaming.
Mettre à jour explicitement le schéma pour ajouter des colonnes
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Par défaut, la nullité est true
.
Pour ajouter une colonne à un champ imbriqué, utilisez :
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Par exemple, si le schéma avant l’exécution de ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
est :
- root
| - colA
| - colB
| +-field1
| +-field2
le schéma après est :
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
Notes
L’ajout de colonnes imbriquées est pris en charge uniquement pour les structs. Les tableaux et les mappages ne sont pas pris en charge.
Mettre à jour explicitement le schéma pour modifier le commentaire ou l’ordre des colonnes
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Pour modifier une colonne dans un champ imbriqué, utilisez :
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Par exemple, si le schéma avant l’exécution de ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST
est :
- root
| - colA
| - colB
| +-field1
| +-field2
le schéma après est :
- root
| - colA
| - colB
| +-field2
| +-field1
Mettre à jour explicitement le schéma pour remplacer les colonnes
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Par exemple, lors de l’exécution de la commande DDL suivante :
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
si le schéma avant est :
- root
| - colA
| - colB
| +-field1
| +-field2
le schéma après est :
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
Mettre à jour explicitement le schéma pour renommer des colonnes
Remarque
Cette fonctionnalité est disponible sur Databricks Runtime 10.4 LTS et versions ultérieures.
Pour renommer des colonnes sans réécrire les données existantes des colonnes, vous devez activer le mappage de colonnes pour la table. Cf. Renommage et suppression des colonnes avec le mappage de colonnes Delta Lake.
Pour renommer une colonne :
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
Pour renommer un champ imbriqué :
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
Par exemple, lorsque vous exécutez la commande suivante :
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
Si le schéma avant est :
- root
| - colA
| - colB
| +-field1
| +-field2
Alors le schéma après est :
- root
| - colA
| - colB
| +-field001
| +-field2
Cf. Renommage et suppression des colonnes avec le mappage de colonnes Delta Lake.
Mettre à jour explicitement le schéma pour supprimer des colonnes
Remarque
Cette fonctionnalité est disponible dans Databricks Runtime 11.3 LTS et versions ultérieures.
Pour supprimer des colonnes en tant qu’opération de métadonnées uniquement sans réécriture de fichiers de données, vous devez activer le mappage de colonnes pour la table. Cf. Renommage et suppression des colonnes avec le mappage de colonnes Delta Lake.
Important
La suppression d’une colonne des métadonnées ne supprime pas les données sous-jacentes de la colonne dans les fichiers. Pour vider les données de colonne supprimées, vous pouvez utiliser REORG TABLE pour réécrire des fichiers. Vous pouvez ensuite utiliser VACUUM pour supprimer physiquement les fichiers qui contiennent les données de colonne supprimées.
Pour supprimer une colonne :
ALTER TABLE table_name DROP COLUMN col_name
Pour supprimer plusieurs colonnes :
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
Mettre à jour explicitement le schéma pour modifier le type ou le nom de la colonne
Vous pouvez modifier le type ou le nom d’une colonne ou supprimer une colonne en réécrivant la table. Pour ce faire, utilisez l’option overwriteSchema
.
L’exemple suivant illustre la modification d’un type de colonne :
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
L’exemple suivant illustre la modification d’un nom de colonne :
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Activer l’évolution du schéma
Vous pouvez activer l’évolution du schéma en effectuant l’une des opérations suivantes :
- Définissez le
.option("mergeSchema", "true")
sur une opération DataFrame Sparkwrite
ouwriteStream
. Consultez Activer l’évolution du schéma pour les écritures pour ajouter de nouvelles colonnes. - Utilisez la syntaxe
MERGE WITH SCHEMA EVOLUTION
. Consultez Syntaxe d’évolution de schéma pour la fusion. - Définissez la conf Spark
spark.databricks.delta.schema.autoMerge.enabled
surtrue
pour la session SparkSession actuelle.
Databricks recommande d’activer l’évolution du schéma pour chaque opération d’écriture plutôt que de définir une conf Spark.
Lorsque vous utilisez des options ou une syntaxe pour activer l’évolution du schéma dans une opération d’écriture, cela est prioritaire sur la conf Spark.
Remarque
Il n’existe aucune clause d’évolution de schéma pour les instructions INSERT INTO
.
Activer l’évolution du schéma pour les écritures afin d’ajouter de nouvelles colonnes
Les colonnes qui sont présentes dans la requête d’approvisionnement mais qui sont absentes de la table cible sont automatiquement ajoutées dans le cadre d’une transaction d’écriture lorsque l’évolution du schéma est activée. Consultez Activer l’évolution du schéma.
La casse est conservée lors de l’ajout d’une nouvelle colonne. Les nouvelles colonnes sont ajoutées à la fin du schéma de la table. Si les colonnes supplémentaires se trouvent dans un struct, elles sont ajoutées à la fin du struct dans la table cible.
L’exemple suivant illustre l’utilisation de l’option mergeSchema
avec le chargeur automatique. Consultez Qu’est-ce que Auto Loader ?.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
L’exemple suivant illustre l’utilisation de l’option mergeSchema
avec une opération d’écriture par lots :
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)
Évolution automatique du schéma pour la fusion Delta Lake
L’évolution du schéma permet aux utilisateurs de résoudre les incompatibilités de schéma entre la table cible et la table source dans la fusion. Il gère les deux cas suivants :
- Une colonne de la table source n’est pas présente dans la table cible. La nouvelle colonne est ajoutée au schéma cible et ses valeurs sont insérées ou mises à jour à l’aide des valeurs sources.
- Une colonne dans la table cible n’est pas présente dans la table source. Le schéma cible reste inchangé ; les valeurs de la colonne cible supplémentaire sont laissées inchangées (pour
UPDATE
) ou définies surNULL
(pourINSERT
).
Vous devez activer manuellement l’évolution automatique du schéma. Consultez Activer l’évolution du schéma.
Remarque
Dans Databricks Runtime 12.2 LTS et versions ultérieures, vous pouvez spécifier les colonnes et les champs struct présents dans la table source par leur nom dans les actions d’insertion ou de mise à jour. Dans Databricks Runtime 11.3 LTS et versions antérieures, seules les actions INSERT *
ou UPDATE SET *
peuvent être utilisées pour l’évolution du schéma avec la fusion.
Dans Databricks Runtime 13.3 LTS et versions ultérieures, vous pouvez utiliser l’évolution du schéma avec des structs imbriqués à l’intérieur des cartes, comme map<int, struct<a: int, b: int>>
.
Syntaxe d’évolution de schéma pour la fusion
Dans Databricks Runtime 15.2 et versions ultérieures, vous pouvez spécifier une évolution de schéma dans une instruction de fusion en utilisant SQL ou les API Table Delta :
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Python
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
Exemples d’opérations de fusion avec l’évolution du schéma
Voici quelques exemples des effets de l’opération merge
avec et sans évolution du schéma.
Colonnes | Requête (dans SQL) | Comportement sans évolution du schéma (par défaut) | Comportement avec évolution du schéma |
---|---|---|---|
Colonnes cibles : key, value Colonnes sources : key, value, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
Le schéma de la table reste inchangé. Seules les colonnes key et value sont mises à jour/insérées. |
Le schéma de la table est modifié en (key, value, new_value) . Les enregistrements existants avec des correspondances sont mis à jour avec le value et le new_value dans la source. Les nouvelles lignes sont insérées avec le schéma (key, value, new_value) . |
Colonnes cibles : key, old_value Colonnes sources : key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
Les actions UPDATE et INSERT génèrent une erreur parce que la colonne cible old_value ne figure pas dans la source |
Le schéma de la table est modifié en (key, old_value, new_value) . Les enregistrements existants avec des correspondances sont mis à jour avec le new_value dans la source, laissant le old_value inchangé. Les nouveaux enregistrements sont insérés avec les key , new_value et NULL spécifiés pour le old_value . |
Colonnes cibles : key, old_value Colonnes sources : key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET new_value = s.new_value |
UPDATE génère une erreur parce que la colonne new_value n’existe pas dans la table cible. |
Le schéma de la table est modifié en (key, old_value, new_value) . Les enregistrements existants avec des correspondances sont mis à jour avec le new_value dans la source, laissant le old_value inchangé, et les enregistrements sans correspondance ont NULL entré pour le new_value . Voir la remarque (1). |
Colonnes cibles : key, old_value Colonnes sources : key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) |
INSERT génère une erreur parce que la colonne new_value n’existe pas dans la table cible. |
Le schéma de la table est modifié en (key, old_value, new_value) . Les nouveaux enregistrements sont insérés avec les key , new_value et NULL spécifiés pour le old_value . Les enregistrements existants ont NULL entré pour new_value , laissant old_value inchangé. Voir la remarque (1). |
(1) Ce comportement est disponible dans Databricks Runtime 12.2 LTS et versions ultérieures. Databricks Runtime 11.3 LTS et versions antérieures présentent une erreur dans cette condition.
Exclure des colonnes avec la fusion Delta Lake
Dans Databricks Runtime 12.2 LTS et versions ultérieures, vous pouvez utiliser des clauses EXCEPT
dans des conditions de fusion pour exclure explicitement des colonnes. Le comportement du mot clé EXCEPT
varie selon que l’évolution du schéma est activée.
Si l’évolution du schéma est désactivée, le mot clé EXCEPT
s’applique à la liste des colonnes de la table cible et permet d’exclure les colonnes des actions UPDATE
ouINSERT
. Les colonnes exclues sont définies sur null
.
Si l’évolution du schéma est activée, le mot clé EXCEPT
s’applique à la liste des colonnes de la table source et permet d’exclure les colonnes de l’évolution du schéma. Une nouvelle colonne dans la source qui n’est pas présente dans la cible n’est pas ajoutée au schéma cible si elle est répertoriée dans la clause EXCEPT
. Les colonnes exclues qui sont déjà présentes dans la cible sont définies sur null
.
Les exemples ci-dessous illustrent cette syntaxe :
Colonnes | Requête (dans SQL) | Comportement sans évolution du schéma (par défaut) | Comportement avec évolution du schéma |
---|---|---|---|
Colonnes cibles : id, title, last_updated Colonnes sources : id, title, review, last_updated |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
Les lignes correspondantes sont mises à jour en définissant le champ last_updated sur la date actuelle. Les nouvelles lignes sont insérées à l’aide de valeurs pour id et title . Le champ exclu last_updated est défini sur null . Le champ review est ignoré, car il n’est pas dans la cible. |
Les lignes correspondantes sont mises à jour en définissant le champ last_updated sur la date actuelle. Le schéma est évolué pour ajouter le champ review . Les nouvelles lignes sont insérées à l’aide de tous les champs sources, à l’exception de last_updated , qui est défini sur null . |
Colonnes cibles : id, title, last_updated Colonnes sources : id, title, review, internal_count |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT génère une erreur parce que la colonne internal_count n’existe pas dans la table cible. |
Les lignes correspondantes sont mises à jour en définissant le champ last_updated sur la date actuelle. Le champ review est ajouté à la table cible, mais le champ internal_count est ignoré. Pour les nouvelles lignes insérées, le champ last_updated est défini sur null . |
Traitement des colonnes NullType
dans les mises à jour de schéma
Comme parquet ne prend pas en charge NullType
, NullType
les colonnes sont supprimées du tableau lors de l’écriture dans les tables delta, mais elles sont toujours stockées dans le schéma. Lorsqu’un type de données différent est reçu pour cette colonne, Delta Lake fusionne le schéma avec le nouveau type de données. Si Delta Lake reçoit une NullType
pour une colonne existante, l’ancien schéma est conservé et la nouvelle colonne est supprimée pendant l’écriture.
NullType
la diffusion en continu n’est pas prise en charge. Étant donné que vous devez définir des schémas lors de l’utilisation de la diffusion en continu, cela doit être très rare. NullType
n’est pas non plus accepté pour les types complexes tels que ArrayType
et MapType
.
Remplacer un schéma de table
Par défaut, le remplacement des données dans une table ne remplace pas le schéma. Lors du remplacement d’une table à l’aide mode("overwrite")
de sans replaceWhere
, vous souhaiterez peut-être quand même remplacer le schéma des données en cours d’écriture. Vous remplacez le schéma et le partitionnement de la table en affectant à l’option overwriteSchema
à la true
:
df.write.option("overwriteSchema", "true")
Important
Vous ne pouvez pas spécifier overwriteSchema
comme true
lors de l’utilisation du remplacement de partition dynamique.