Gegevens selectief overschrijven met Delta Lake

Azure Databricks maakt gebruik van Delta Lake-functionaliteit ter ondersteuning van twee verschillende opties voor selectieve overschrijvingen:

  • De replaceWhere optie vervangt alle records die overeenkomen met een bepaald predicaat.
  • U kunt mappen met gegevens vervangen op basis van hoe tabellen worden gepartitioneerd met behulp van dynamische partitieoverschrijven.

Voor de meeste bewerkingen raadt replaceWhere Databricks aan om op te geven welke gegevens moeten worden overschreven.

Belangrijk

Als gegevens per ongeluk zijn overschreven, kunt u herstellen gebruiken om de wijziging ongedaan te maken.

Willekeurig selectief overschrijven met replaceWhere

U kunt selectief alleen de gegevens overschrijven die overeenkomen met een willekeurige expressie.

Notitie

SQL vereist Databricks Runtime 12.2 LTS of hoger.

De volgende opdracht vervangt gebeurtenissen in januari in de doeltabel, die wordt gepartitioneerd door start_date, door de gegevens in replace_data:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Deze voorbeeldcode schrijft de gegevens uit replace_data, valideert dat alle rijen overeenkomen met het predicaat en voert een atomische vervanging uit met behulp van overwrite semantiek. Als waarden in de bewerking buiten de beperking vallen, mislukt deze bewerking standaard met een fout.

U kunt dit gedrag wijzigen in overwrite waarden binnen het predicaatbereik en insert records die buiten het opgegeven bereik vallen. Als u dit wilt doen, schakelt u de beperkingscontrole uit door in te stellen spark.databricks.delta.replaceWhere.constraintCheck.enabled op false met behulp van een van de volgende instellingen:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Verouderd gedrag

Verouderde standaardgedrag had replaceWhere alleen gegevens overschreven die overeenkomen met een predicaat voor partitiekolommen. Met dit verouderde model vervangt de volgende opdracht de maand januari in de doeltabel, die wordt gepartitioneerd door date, door de gegevens in df:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")

Als u terug wilt vallen op het oude gedrag, kunt u de spark.databricks.delta.replaceWhere.dataColumns.enabled vlag uitschakelen:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Dynamische partitie overschrijft

Belangrijk

Deze functie is beschikbaar als openbare preview.

Databricks Runtime 11.3 LTS en hoger biedt ondersteuning voor dynamische partitieoverschrijfmodus voor gepartitioneerde tabellen. Voor tabellen met meerdere partities ondersteunt Databricks Runtime 11.3 LTS en hieronder alleen dynamische partitieoverschrijven als alle partitiekolommen van hetzelfde gegevenstype zijn.

Wanneer bewerkingen in de dynamische partitie worden overschreven, overschrijven bewerkingen alle bestaande gegevens in elke logische partitie waarvoor de schrijfbewerking nieuwe gegevens doorvoert. Bestaande logische partities waarvoor de schrijfbewerking geen gegevens bevat, blijven ongewijzigd. Deze modus is alleen van toepassing wanneer gegevens worden geschreven in de overschrijfmodus: INSERT OVERWRITE in SQL of een DataFrame-schrijfbewerking met df.write.mode("overwrite").

Configureer de modus voor het overschrijven van dynamische partities door de Configuratie van de Spark-sessie spark.sql.sources.partitionOverwriteMode in te stellen op dynamic. U kunt dit ook inschakelen door de DataFrameWriter optie partitionOverwriteMode in te stellen op dynamic. Indien aanwezig, overschrijft de queryspecifieke optie de modus die is gedefinieerd in de sessieconfiguratie. De standaardwaarde partitionOverwriteMode is static.

Belangrijk

Controleer of de gegevens die zijn geschreven met dynamische partitie, alleen de verwachte partities overschrijven. Een enkele rij in de onjuiste partitie kan ertoe leiden dat een volledige partitie onbedoeld wordt overschreven.

In het volgende voorbeeld ziet u hoe dynamische partities worden overschreven:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Notitie

  • Dynamische partitieoverschrijfconflicten met de optie replaceWhere voor gepartitioneerde tabellen.
    • Als dynamische partitieoverschrijven is ingeschakeld in de Spark-sessieconfiguratie en replaceWhere als optie DataFrameWriter wordt aangeboden, overschrijft Delta Lake de gegevens volgens de replaceWhere expressie (queryspecifieke opties overschrijven sessieconfiguraties).
    • Er wordt een foutbericht weergegeven als de DataFrameWriter opties zowel dynamische partitie overschrijven als replaceWhere ingeschakeld hebben.
  • U kunt niet opgeven overwriteSchema als true bij het gebruik van dynamische partitie overschrijven.