Selektywne zastępowanie danych za pomocą usługi Delta Lake
Usługa Azure Databricks korzysta z funkcji usługi Delta Lake, aby obsługiwać dwie różne opcje selektywnego zastępowania:
- Opcja
replaceWhere
niepodzielna zastępuje wszystkie rekordy zgodne z danym predykatem. - Katalogi danych można zastąpić na podstawie sposobu partycjonowania tabel przy użyciu zastępowania partycji dynamicznej.
W przypadku większości operacji usługa Databricks zaleca użycie polecenia replaceWhere
w celu określenia, które dane mają być zastępowane.
Ważne
Jeśli dane zostały przypadkowo zastąpione, możesz użyć przywracania , aby cofnąć zmianę.
Dowolne selektywne zastępowanie za pomocą polecenia replaceWhere
Można selektywnie zastąpić tylko dane pasujące do dowolnego wyrażenia.
Uwaga
Sql wymaga środowiska Databricks Runtime 12.2 LTS lub nowszego.
Następujące polecenie niepodziealnie zastępuje zdarzenia w styczniu w tabeli docelowej, która jest partycjonowana przez start_date
element , z danymi w pliku :replace_data
Python
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
)
Scala
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
SQL
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
Ten przykładowy kod zapisuje dane w replace_data
pliku , sprawdza, czy wszystkie wiersze są zgodne z predykatem i wykonuje niepodzielne zastąpienie przy użyciu overwrite
semantyki. Jeśli jakiekolwiek wartości operacji wykraczają poza ograniczenie, ta operacja domyślnie kończy się niepowodzeniem z powodu błędu.
To zachowanie można zmienić na overwrite
wartości w zakresie predykatów i insert
rekordów, które wykraczają poza określony zakres. Aby to zrobić, wyłącz sprawdzanie ograniczeń, ustawiając spark.databricks.delta.replaceWhere.constraintCheck.enabled
wartość false przy użyciu jednego z następujących ustawień:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
Zachowanie starszej wersji
Starsze zachowanie domyślne zastępowało replaceWhere
dane pasujące tylko do predykatu w kolumnach partycji. W przypadku tego starszego modelu następujące polecenie spowoduje niepodzielne zastąpienie miesiąca stycznia w tabeli docelowej, która jest partycjonowana przez date
element , przy użyciu danych w elemencie :df
Python
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
)
Scala
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
Jeśli chcesz wrócić do starego zachowania, możesz wyłączyć flagę spark.databricks.delta.replaceWhere.dataColumns.enabled
:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
Zastępowanie partycji dynamicznej
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Środowisko Databricks Runtime 11.3 LTS i nowsze obsługują tryb zastępowania partycji dynamicznej dla tabel partycjonowanych. W przypadku tabel z wieloma partycjami środowisko Databricks Runtime 11.3 LTS i poniżej obsługuje tylko zastępowanie partycji dynamicznej, jeśli wszystkie kolumny partycji mają ten sam typ danych.
W trybie zastępowania partycji dynamicznej operacje zastępują wszystkie istniejące dane w każdej partycji logicznej, dla której zapis zatwierdza nowe dane. Wszystkie istniejące partycje logiczne, dla których zapis nie zawiera danych, pozostają niezmienione. Ten tryb ma zastosowanie tylko wtedy, gdy dane są zapisywane w trybie zastępowania: INSERT OVERWRITE
w języku SQL lub w obiekcie DataFrame zapisu za pomocą polecenia df.write.mode("overwrite")
.
Skonfiguruj tryb zastępowania partycji dynamicznej, ustawiając konfigurację spark.sql.sources.partitionOverwriteMode
sesji platformy Spark na dynamic
wartość . Można to również włączyć, ustawiając DataFrameWriter
opcję partitionOverwriteMode
na dynamic
. Jeśli istnieje, opcja specyficzna dla zapytania zastępuje tryb zdefiniowany w konfiguracji sesji. Wartość domyślna to partitionOverwriteMode
static
.
Ważne
Sprawdź, czy dane zapisane za pomocą partycji dynamicznej zastępują tylko oczekiwane partycje. Pojedynczy wiersz w nieprawidłowej partycji może prowadzić do przypadkowego zastąpienia całej partycji.
W poniższym przykładzie pokazano użycie zastępowania partycji dynamicznej:
SQL
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
Python
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
Scala
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
Uwaga
- Partycja dynamiczna zastępuje konflikty z opcją
replaceWhere
dla tabel partycjonowanych.- Jeśli funkcja zastępowania partycji dynamicznej jest włączona w konfiguracji sesji platformy Spark i
replaceWhere
jest udostępniana jako opcja, usługa Delta Lake zastępuje dane zgodnie zreplaceWhere
wyrażeniemDataFrameWriter
(opcje specyficzne dla zapytania zastępują konfiguracje sesji). - Zostanie wyświetlony błąd, jeśli
DataFrameWriter
opcje mają zarówno partycję dynamiczną, jak ireplaceWhere
włączoną.
- Jeśli funkcja zastępowania partycji dynamicznej jest włączona w konfiguracji sesji platformy Spark i
- Nie można określić
overwriteSchema
wartości podczastrue
używania zastępowania partycji dynamicznej.