Selektywne zastępowanie danych za pomocą usługi Delta Lake

Delta Lake ma następujące odrębne opcje częściowego nadpisywania:

Option Przypadek użycia Obsługiwane typy obliczeniowe Minimalna wersja
REPLACE WHERE Atomowo nadpisywać wiersze pasujące do predykatu. Służy do zastępowania ze stałym warunkiem dopasowania, takim jak colA = 5 lub int_col IN (1, 2, 3). Wszystkie typy obliczeniowe. SQL w środowisku Databricks Runtime 12.2 LTS lub nowszym. Python i Scala w środowisku Databricks Runtime 9.1 LTS lub nowszym.
REPLACE USING Dynamiczne nadpisywanie danych. Zamienia wszystkie wiersze zgodne z określonymi kolumnami na podstawie porównania równości wartości kolumn w podanym zestawie danych. Wszystkie typy obliczeniowe. Sql w środowisku Databricks Runtime 16.3 lub nowszym. Python i Scala w środowisku Databricks Runtime 18.2 lub nowszym.
REPLACE ON Nadpisywanie danych dynamicznych za pomocą wyrażenia logicznego. Służy do zastępowania ze złożonym lub bezpiecznym dla wartości NULL warunkiem dopasowania, takim jak s.colA <=> t.colA AND s.colB <=> t.colB. Wszystkie typy obliczeniowe. Sql w środowisku Databricks Runtime 17.1 lub nowszym. Python i Scala w środowisku Databricks Runtime 18.2 lub nowszym.
partitionOverwriteMode Starszy mechanizm nadpisywania partycji dynamicznych, który nadpisuje wszystkie istniejące dane w każdej partycji, do której zostaną zapisane nowe dane. Nie jest zalecane w przypadku nowych obciążeń. Język SQL obsługuje tylko klasyczne obliczenia. Python i Scala obsługują wszystkie typy obliczeniowe. SQL, Python i Scala w środowisku Databricks Runtime 11.3 LTS i nowszych wersjach.

W większości przypadków użycia Databricks zaleca użycie REPLACE USING lub REPLACE WHERE. Używaj REPLACE ON tylko wtedy, gdy przypadek użycia wymaga złożonych lub bezpiecznych warunków dopasowania o wartości NULL.

Aby uzyskać szczegółowe informacje na temat zachowania zastępczego każdej opcji, zobacz INSERT. Aby uzyskać pełną listę DataFrameWriter opcji usługi Delta Lake, zobacz Delta Lake i Apache Iceberg.

W języku Scala i Python nie można używać replaceOn i replaceUsing w połączeniu z replaceWhere, partitionOverwriteMode lub overwriteSchema.

W przypadku pustych zapytań źródłowych zarówno REPLACE USING, jak i REPLACE ON nie usuwają danych, jednak REPLACE WHERE może usuwać dane.

Important

Jeśli dane zostały przypadkowo zastąpione, możesz użyć przywróć, aby cofnąć zmianę.

REPLACE WHERE

Można selektywnie zastąpić tylko dane pasujące do dowolnego wyrażenia za pomocą polecenia REPLACE WHERE.

Important

Aby skorzystać z odświeżania przyrostowego podczas uruchamiania REPLACE WHERE, użyj przepływów REPLACE WHERE w rozwiązaniu Spark Declarative Pipelines (SDP). Zobacz Przetwarzanie wsadowe z użyciem przepływów REPLACEWHERE.

Aby atomowo zastąpić zdarzenia ze stycznia w tabeli docelowej, która jest partycjonowana według start_date, danymi w replace_data:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .saveAsTable("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .saveAsTable("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Ten przykładowy kod zapisuje dane w pliku replace_data, sprawdza, czy wszystkie wiersze są zgodne z predykatem i wykonuje niepodzielne zastąpienie, przy użyciu semantyki overwrite. Jeśli jakiekolwiek wartości operacji wykraczają poza predykat, ta operacja domyślnie kończy się niepowodzeniem z powodu błędu.

W klasycznych obliczeniach, aby zmienić to zachowanie na overwrite wartości mieszczące się w zakresie predykatu i insert rekordy poza określonym zakresem, wyłącz sprawdzanie ograniczenia, ustawiając spark.databricks.delta.replaceWhere.constraintCheck.enabled na false:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Note

REPLACE WHERE akceptuje boolean_expression z pewnymi ograniczeniami. Zobacz INSERT w dokumentacji języka SQL.

W przypadku pustych zapytań źródłowych REPLACE WHERE można usunąć wiersze tabeli.

Dziedziczne zachowanie

Starsza wersja replaceWhere jest dostępna tylko w przypadku obliczeń klasycznych. Zobacz Omówienie klasycznych obliczeń.

Jeśli używasz starszego zachowania replaceWhere, zapytania nadpisują dane, które pasują do predykatu tylko w kolumnach partycji. Następujące polecenie spowoduje atomowe zastąpienie miesiąca stycznia w tabeli docelowej, która jest partycjonowana przez date, danymi z df.

Python
(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .saveAsTable("people10m")
)
Scala
df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .saveAsTable("people10m")

Aby użyć starszego działania, ustaw spark.databricks.delta.replaceWhere.dataColumns.enabled na false:

Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Dynamiczne nadpisywanie danych

Dynamiczne nadpisywanie danych selektywnie zastępuje dane odpowiadające określonym kolumnom klucza lub wyrażeniu boolowskiemu, pozostawiając wszystkie pozostałe dane bez zmian. Obsługiwane są tabele partycjonowane, niepartycjonowane tabele i tabele z klastrowaniem płynnym.

Dynamiczne nadpisywanie partycji stanowi podzbiór dynamicznego nadpisywania danych. Dynamiczne nadpisywanie partycji zastępuje wszystkie istniejące dane w każdej partycji, do której zostaną zapisane nowe dane, i pozostawia wszystkie pozostałe partycje bez zmian. Obsługiwane są tylko tabele partycjonowane.

REPLACE USING

Język SQL obsługiwany w środowisku Databricks Runtime 16.3 lub nowszym. Python i Scala obsługiwane w środowisku Databricks Runtime 18.2 lub nowszym. Informacje o różnicach w zachowaniu w wersjach Databricks Runtime od 16.3 do 17.1 można znaleźć w sekcji Starsze zachowanie.

REPLACE USING Umożliwia niezależne od rodzaju obliczeń, atomowe zastępowanie, które działa w magazynach SQL usługi Databricks, obliczeniach bezserwerowych i klasycznych. REPLACE USING Nie wymaga ustawienia konfiguracji sesji platformy Spark.

REPLACE USING zastępuje wiersze, gdy określone kolumny są równe w sensie relacji równości. Wszystkie inne dane pozostają niezmienione.

Użyj dynamicznego nadpisywania danych z użyciem REPLACE USING:

Python

(sourceDataDF.write
  .mode("overwrite")
  .option("replaceUsing", "event_id, start_date")
  .saveAsTable("events")
)

Scala

sourceDataDF.write
  .mode("overwrite")
  .option("replaceUsing", "event_id, start_date")
  .saveAsTable("events")

SQL

INSERT INTO TABLE events
  REPLACE USING (event_id, start_date)
  SELECT * FROM source_data

W przypadku pustych zapytań REPLACE USING źródłowych nie usuwa żadnych wierszy tabeli.

W przypadku złożonych warunków dopasowania lub warunków bezpiecznych względem wartości NULL należy zamiast tego użyć REPLACE ON. Zobacz: REPLACE ON.

Zobacz INSERT w dokumentacji języka SQL.

Dotychczasowe zachowanie

W środowisku Databricks Runtime 16.3 do 17.1 REPLACE USING używa starszego zachowania i zezwala tylko na zastępowanie partycji dynamicznej, natomiast środowisko Databricks Runtime 17.2 i nowsze zezwala na dynamiczne zastępowanie danych.

Należy pamiętać o następujących ograniczeniach i zachowaniach w przypadku REPLACE USING dotychczasowego działania:

  • Należy określić pełny zestaw kolumn partycji tabeli w klauzuli USING .
  • Zawsze sprawdzaj, czy zapisane dane dotykają tylko oczekiwanych partycji. Pojedynczy wiersz w niewłaściwej partycji może przypadkowo zastąpić całą partycję.

REPLACE ON

Język SQL obsługiwany w środowisku Databricks Runtime 17.1 lub nowszym. Python i Scala obsługiwane w środowisku Databricks Runtime 18.2 lub nowszym.

REPLACE ON zastępuje wiersze, gdy spełniają warunek zdefiniowany przez użytkownika, w przeciwieństwie do REPLACE USING, które zastępuje wiersze, gdy określone kolumny są równe w sensie operatora równości. Użyj REPLACE ON , jeśli potrzebujesz zgodnej logiki, która REPLACE USING nie obsługuje, na przykład traktując NULL wartości jako równe.

Opcjonalnie użyj opcji targetAlias, aby określić alias tabeli docelowej, oraz interfejsów API .as() lub .alias(), aby określić alias danych źródłowych.

Aby uzyskać informacje o składni JĘZYKA SQL, zobacz INSERT.

Python

(sourceDataDF.alias("s")
  .write
  .mode("overwrite")
  .option("targetAlias", "t")
  .option("replaceOn", "s.event_id <=> t.event_id AND s.start_date <=> t.start_date")
  .saveAsTable("events")
)

Scala

sourceDataDF.as("s")
  .write
  .mode("overwrite")
  .option("targetAlias", "t")
  .option("replaceOn", "s.event_id <=> t.event_id AND s.start_date <=> t.start_date")
  .saveAsTable("events")

SQL

INSERT INTO TABLE events AS t
  REPLACE ON (s.event_id <=> t.event_id AND s.start_date <=> t.start_date)
  (SELECT * FROM source_data) AS s

W przypadku pustych zapytań REPLACE ON źródłowych nie usuwa żadnych wierszy tabeli.

Dynamiczne nadpisywanie partycji za pomocą partitionOverwriteMode (starsza wersja)

Important

Ta funkcja jest dostępna w publicznej wersji testowej.

Środowisko Databricks Runtime 11.3 LTS i nowsze, obsługują dynamiczne zastępowanie partycji dla tabel podzielonych na partycje przy użyciu trybu zastępowania: INSERT OVERWRITE w SQL lub zapis ramki danych za pomocą df.write.mode("overwrite"). Ten typ nadpisywania jest dostępny tylko dla klasycznych metod obliczeniowych, a nie dla magazynów SQL Databricks ani obliczeń bezserwerowych.

Ostrzeżenie

Jeśli to możliwe, użyj INSERT REPLACE USING zamiast nadpisywania partycji INSERT OVERWRITE PARTITION i spark.sql.sources.partitionOverwriteMode=dynamic. Nadpisanie partycji może używać nieaktualnych danych, gdy zajdą zmiany w partycjonowaniu.

Aby użyć trybu zastępowania partycji dynamicznej, ustaw konfigurację sesji platformy Spark w spark.sql.sources.partitionOverwriteMode na dynamic wartość. Alternatywnie możesz ustawić DataFrameWriter opcję partitionOverwriteMode na dynamic. Jeśli istnieje, opcja specyficzna dla zapytania zastępuje tryb zdefiniowany w konfiguracji sesji. Wartość domyślna parametru spark.sql.sources.partitionOverwriteMode to static.

W poniższym przykładzie użyto partitionOverwriteMode:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Należy pamiętać o następujących ograniczeniach i zachowaniach dla programu partitionOverwriteMode:

  • Nie można ustawić overwriteSchema na true.
  • Nie można określić jednocześnie partitionOverwriteMode i replaceWhere w tej samej operacji DataFrameWriter.
  • Jeśli określisz replaceWhere warunek za pomocą DataFrameWriter opcji, usługa Delta Lake zastosuje ten warunek do kontrolowania, które dane są nadpisywane. Ta opcja ma pierwszeństwo przed konfiguracją partitionOverwriteMode na poziomie sesji.
  • Zawsze sprawdzaj, czy zapisane dane dotykają tylko oczekiwanych partycji. Pojedynczy wiersz w niewłaściwej partycji może przypadkowo zastąpić całą partycję.