Partager via


Classe DataFrameWriterV2

Interface utilisée pour écrire un DataFrame dans un stockage externe à l’aide de l’API v2.

Pour la plupart des cas d’usage avec des tables Databricks et Delta Lake, DataFrameWriterV2 fournit des options plus puissantes et flexibles que l’original DataFrameWriter :

  • Meilleure prise en charge des propriétés de table
  • Contrôle plus précis sur le partitionnement
  • Fonctionnalités de remplacement conditionnel
  • Prise en charge du clustering
  • Sémantique plus claire pour les opérations de création ou de remplacement

Prend en charge Spark Connect

Syntaxe

Permet DataFrame.writeTo(table) d’accéder à cette interface.

Méthodes

Méthode Description
using(provider) Spécifie un fournisseur pour la source de données de sortie sous-jacente.
option(key, value) Ajoutez une option d’écriture. Par exemple, pour créer une table managée : df.writeTo("test").using("delta").option("path", "s3://test").createOrReplace().
options(**options) Ajouter des options d’écriture.
tableProperty(property, value) Ajouter une propriété de table. Par exemple, utilisez cette méthode tableProperty("location", "s3://test") pour créer une table EXTERNAL (non managée).
partitionedBy(col, *cols) Partitionnez la table de sortie créée par create, createOrReplace ou remplacez à l’aide des colonnes ou transformations données.
clusterBy(col, *cols) Clusters les données par les colonnes données pour optimiser les performances des requêtes.
create() Créez une table à partir du contenu de la trame de données.
replace() Remplacez une table existante par le contenu de la trame de données.
createOrReplace() Créez une table ou remplacez une table existante par le contenu de la trame de données.
append() Ajoutez le contenu de la trame de données à la table de sortie.
overwrite(condition) Remplacez les lignes correspondant à la condition de filtre donnée avec le contenu de la trame de données dans la table de sortie.
overwritePartitions() Remplacez toutes les partitions pour lesquelles la trame de données contient au moins une ligne avec le contenu de la trame de données dans la table de sortie.

Exemples

Création d’une table

# Create a new table with DataFrame contents
df = spark.createDataFrame([{"name": "Alice", "age": 30}])
df.writeTo("my_table").create()

# Create with a specific provider
df.writeTo("my_table").using("parquet").create()

Partitionnement des données

# Partition by single column
df.writeTo("my_table") \
    .partitionedBy("year") \
    .create()

# Partition by multiple columns
df.writeTo("my_table") \
    .partitionedBy("year", "month") \
    .create()

# Partition using transform functions
from pyspark.sql.functions import years, months, days

df.writeTo("my_table") \
    .partitionedBy(years("date"), months("date")) \
    .create()

Définition des propriétés de la table

# Add table properties
df.writeTo("my_table") \
    .tableProperty("key1", "value1") \
    .tableProperty("key2", "value2") \
    .create()

Utilisation des options

# Add write options
df.writeTo("my_table") \
    .option("compression", "snappy") \
    .option("maxRecordsPerFile", "10000") \
    .create()

# Add multiple options at once
df.writeTo("my_table") \
    .options(compression="snappy", maxRecordsPerFile="10000") \
    .create()

Données de clustering

# Cluster by columns for query optimization
df.writeTo("my_table") \
    .clusterBy("user_id", "timestamp") \
    .create()

Opérations de remplacement

# Replace existing table
df.writeTo("my_table") \
    .using("parquet") \
    .replace()

# Create or replace (safe operation)
df.writeTo("my_table") \
    .using("parquet") \
    .createOrReplace()

Opérations d’ajout

# Append to existing table
df.writeTo("my_table").append()

Opérations de remplacement

from pyspark.sql.functions import col

# Overwrite specific rows based on condition
df.writeTo("my_table") \
    .overwrite(col("date") == "2025-01-01")

# Overwrite entire partitions
df.writeTo("my_table") \
    .overwritePartitions()

Chaînage de méthodes

# Combine multiple configurations
df.writeTo("my_table") \
    .using("parquet") \
    .option("compression", "snappy") \
    .tableProperty("description", "User data table") \
    .partitionedBy("year", "month") \
    .clusterBy("user_id") \
    .createOrReplace()