Teilen über


Selektives Überschreiben von Daten mit Delta Lake

Azure Databricks nutzt die Delta Lake-Funktionalität, um zwei verschiedene Optionen für selektive Überschreibungen zu unterstützen:

  • Die Option replaceWhere ersetzt automatisch alle Datensätze, die einem bestimmten Prädikat entsprechen.
  • Sie können Datenverzeichnisse mithilfe dynamischer Partitionsüberschreibungen basierend darauf ersetzen, wie Tabellen partitioniert sind.

Für die meisten Vorgänge empfiehlt Databricks die Verwendung von replaceWhere, um anzugeben, welche Daten überschrieben werden sollen.

Important

Wenn Daten versehentlich überschrieben wurden, können Sie die Änderung mithilfe der Wiederherstellung rückgängig machen.

Willkürliches selektives Überschreiben mit replaceWhere

Sie können selektiv nur die Daten überschreiben, die einem beliebigen Ausdruck entsprechen.

Note

SQL erfordert Databricks Runtime 12.2 LTS oder höher.

Mit dem folgenden Befehl werden Ereignisse im Januar in der nach start_date partitionierten Zieltabelle atomisch durch die Daten in replace_data ersetzt:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Dieser Beispielcode schreibt die Daten aus replace_data, überprüft, ob alle Zeilen mit dem Prädikat übereinstimmen, und führt mithilfe der overwrite-Semantik eine atomische Ersetzung durch. Wenn Werte im Vorgang außerhalb der Einschränkung liegen, tritt standardmäßig ein Fehler auf.

Sie können dieses Verhalten zu overwrite-Werten innerhalb des Prädikatbereichs und zu insert-Datensätzen ändern, die außerhalb des angegebenen Bereichs liegen. Deaktivieren Sie dazu die Überprüfung von Einschränkungen, indem Sie spark.databricks.delta.replaceWhere.constraintCheck.enabled mithilfe einer der folgenden Einstellungen auf FALSE festlegen:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Legacyverhalten

Beim Legacystandardverhalten hat replaceWhere nur Daten überschrieben, die einem Prädikat über Partitionsspalten entsprachen. Mit diesem Legacymodell würde der folgende Befehl den Monat Januar in der Zieltabelle, die mit date partitioniert wurde, atomisch durch die Daten in df ersetzen:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")

Wenn Sie das alte Verhalten anwenden möchten, können Sie das Flag spark.databricks.delta.replaceWhere.dataColumns.enabled deaktivieren:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Dynamisches Überschreiben von Partitionen

Das dynamische Überschreiben von Partitionen aktualisiert nur die Partitionen, für die das Schreiben neue Daten committet. Es überschreibt alle vorhandenen Daten in diesen Partitionen und lässt andere unverändert.

Azure Databricks unterstützt zwei Ansätze:

  • REPLACE USING (empfohlen) – Funktioniert für alle Computetypen, einschließlich Databricks SQL Warehouses, serverloser Compute und klassischer Compute. Erfordert keine Einstellung einer Spark-Sitzungskonfiguration.
  • partitionOverwriteMode (Legacy) – Erfordert klassische Rechenleistung und die Konfiguration einer Spark-Sitzung. Wird für Databricks SQL oder serverlose Berechnung nicht unterstützt.

In den folgenden Abschnitten wird die Verwendung der einzelnen Ansätze veranschaulicht.

Dynamische Partition überschreibt mit REPLACE USING

Databricks Runtime 16.3 und höher unterstützt dynamische Partitionsüberschreibungen für partitionierte Tabellen mithilfe von REPLACE USING. Mit dieser Methode können Sie Daten selektiv über alle Computetypen hinweg überschreiben, ohne eine Spark-Sitzungskonfiguration festlegen zu müssen. REPLACE USING ermöglicht ein compute-unabhängiges, atomares Überschreibverhalten, das in Databricks SQL-Warehouses, serverlosem Computing und klassischem Computing funktioniert.

REPLACE USING überschreibt nur die Partitionen, die von den eingehenden Daten bestimmt sind. Alle anderen Partitionen bleiben unverändert.

Im folgenden Beispiel wird veranschaulicht, wie dynamische Partitionen mit REPLACE USING überschrieben werden können. Derzeit können Sie nur SQL verwenden, nicht Python oder Scala. Ausführliche Informationen finden Sie in INSERT der SQL-Sprachreferenz.

INSERT INTO TABLE events
  REPLACE USING (event_id, start_date)
  SELECT * FROM source_data

Beachten Sie die folgenden Einschränkungen und Verhaltensweisen für dynamische Partitionsüberschreibungen:

  • Sie müssen den vollständigen Satz der Partitionsspalten der Tabelle in der USING Klausel angeben.
  • Überprüfen Sie immer, ob die geschriebenen Daten nur die erwarteten Partitionen berühren. Eine einzelne Zeile in der falschen Partition kann unbeabsichtigt die gesamte Partition überschreiben.

Wenn Sie eine anpassbarere Abgleichslogik benötigen, als dies REPLACE USING unterstützt wird, z. B. das Behandeln von NULL Werten als gleich, verwenden Sie stattdessen die komplementierende REPLACE ON Logik. Weitere Informationen finden Sie unter INSERT.

Dynamische Partition überschreibt mit partitionOverwriteMode (veraltet)

Important

Dieses Feature befindet sich in der Public Preview.

Databricks Runtime 11.3 LTS und höher unterstützt das dynamische Überschreiben von Partitionen bei partitionierten Tabellen im Überschreibmodus: entweder INSERT OVERWRITE in SQL oder mittels eines DataFrame-Schreibzugriffs mit df.write.mode("overwrite"). Diese Art der Überschreibung ist nur für klassische Rechenleistung verfügbar und nicht für Databricks SQL Warehouses oder serverlose Rechenleistung.

Konfigurieren Sie den dynamischen Partitionsüberschreibmodus, indem Sie die Spark-Sitzungskonfiguration spark.sql.sources.partitionOverwriteMode auf dynamic festlegen. Sie können alternativ die DataFrameWriter Option partitionOverwriteMode auf dynamic setzen. Wenn vorhanden, setzt die abfragespezifische Option den in der Sitzungskonfiguration definierten Modus außer Kraft. Die Standardeinstellung für spark.sql.sources.partitionOverwriteMode ist static.

Im folgenden Beispiel wird die Verwendung von partitionOverwriteMode veranschaulicht:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Beachten Sie die folgenden Einschränkungen und Verhaltensweisen für partitionOverwriteMode:

  • Sie können overwriteSchema nicht auf true setzen.
  • Sie können nicht sowohl partitionOverwriteMode als auch replaceWhere im selben DataFrameWriter-Vorgang angeben.
  • Wenn Sie eine replaceWhere Bedingung mithilfe einer DataFrameWriter Option angeben, wendet Delta Lake diese Bedingung an, um zu steuern, welche Daten überschrieben werden. Diese Option hat Vorrang vor der partitionOverwriteMode Konfiguration auf Sitzungsebene.
  • Überprüfen Sie immer, ob die geschriebenen Daten nur die erwarteten Partitionen berühren. Eine einzelne Zeile in der falschen Partition kann unbeabsichtigt die gesamte Partition überschreiben.