Gegevens selectief overschrijven met Delta Lake
Azure Databricks maakt gebruik van Delta Lake-functionaliteit ter ondersteuning van twee verschillende opties voor selectieve overschrijvingen:
- De
replaceWhere
optie vervangt alle records die overeenkomen met een bepaald predicaat. - U kunt mappen met gegevens vervangen op basis van hoe tabellen worden gepartitioneerd met behulp van dynamische partitieoverschrijven.
Voor de meeste bewerkingen raadt replaceWhere
Databricks aan om op te geven welke gegevens moeten worden overschreven.
Belangrijk
Als gegevens per ongeluk zijn overschreven, kunt u herstellen gebruiken om de wijziging ongedaan te maken.
Willekeurig selectief overschrijven met replaceWhere
U kunt selectief alleen de gegevens overschrijven die overeenkomen met een willekeurige expressie.
Notitie
SQL vereist Databricks Runtime 12.2 LTS of hoger.
De volgende opdracht vervangt gebeurtenissen in januari in de doeltabel, die wordt gepartitioneerd door start_date
, door de gegevens in replace_data
:
Python
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
)
Scala
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
SQL
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
Deze voorbeeldcode schrijft de gegevens uit replace_data
, valideert dat alle rijen overeenkomen met het predicaat en voert een atomische vervanging uit met behulp van overwrite
semantiek. Als waarden in de bewerking buiten de beperking vallen, mislukt deze bewerking standaard met een fout.
U kunt dit gedrag wijzigen in overwrite
waarden binnen het predicaatbereik en insert
records die buiten het opgegeven bereik vallen. Als u dit wilt doen, schakelt u de beperkingscontrole uit door in te stellen spark.databricks.delta.replaceWhere.constraintCheck.enabled
op false met behulp van een van de volgende instellingen:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
Verouderd gedrag
Verouderde standaardgedrag had replaceWhere
alleen gegevens overschreven die overeenkomen met een predicaat voor partitiekolommen. Met dit verouderde model vervangt de volgende opdracht de maand januari in de doeltabel, die wordt gepartitioneerd door date
, door de gegevens in df
:
Python
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
)
Scala
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
Als u terug wilt vallen op het oude gedrag, kunt u de spark.databricks.delta.replaceWhere.dataColumns.enabled
vlag uitschakelen:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
Dynamische partitie overschrijft
Belangrijk
Deze functie is beschikbaar als openbare preview.
Databricks Runtime 11.3 LTS en hoger biedt ondersteuning voor dynamische partitieoverschrijfmodus voor gepartitioneerde tabellen. Voor tabellen met meerdere partities ondersteunt Databricks Runtime 11.3 LTS en hieronder alleen dynamische partitieoverschrijven als alle partitiekolommen van hetzelfde gegevenstype zijn.
Wanneer bewerkingen in de dynamische partitie worden overschreven, overschrijven bewerkingen alle bestaande gegevens in elke logische partitie waarvoor de schrijfbewerking nieuwe gegevens doorvoert. Bestaande logische partities waarvoor de schrijfbewerking geen gegevens bevat, blijven ongewijzigd. Deze modus is alleen van toepassing wanneer gegevens worden geschreven in de overschrijfmodus: INSERT OVERWRITE
in SQL of een DataFrame-schrijfbewerking met df.write.mode("overwrite")
.
Configureer de modus voor het overschrijven van dynamische partities door de Configuratie van de Spark-sessie spark.sql.sources.partitionOverwriteMode
in te stellen op dynamic
. U kunt dit ook inschakelen door de DataFrameWriter
optie partitionOverwriteMode
in te stellen op dynamic
. Indien aanwezig, overschrijft de queryspecifieke optie de modus die is gedefinieerd in de sessieconfiguratie. De standaardwaarde partitionOverwriteMode
is static
.
Belangrijk
Controleer of de gegevens die zijn geschreven met dynamische partitie, alleen de verwachte partities overschrijven. Een enkele rij in de onjuiste partitie kan ertoe leiden dat een volledige partitie onbedoeld wordt overschreven.
In het volgende voorbeeld ziet u hoe dynamische partities worden overschreven:
SQL
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
Python
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
Scala
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
Notitie
- Dynamische partitieoverschrijfconflicten met de optie
replaceWhere
voor gepartitioneerde tabellen.- Als dynamische partitieoverschrijven is ingeschakeld in de Spark-sessieconfiguratie en
replaceWhere
als optieDataFrameWriter
wordt aangeboden, overschrijft Delta Lake de gegevens volgens dereplaceWhere
expressie (queryspecifieke opties overschrijven sessieconfiguraties). - Er wordt een foutbericht weergegeven als de
DataFrameWriter
opties zowel dynamische partitie overschrijven alsreplaceWhere
ingeschakeld hebben.
- Als dynamische partitieoverschrijven is ingeschakeld in de Spark-sessieconfiguratie en
- U kunt niet opgeven
overwriteSchema
alstrue
bij het gebruik van dynamische partitie overschrijven.