Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Usługa Azure Databricks korzysta z funkcji usługi Delta Lake, aby obsługiwać dwie różne opcje selektywnego zastępowania:
- Opcja
replaceWhereniepodzielna zastępuje wszystkie rekordy zgodne z danym predykatem. - Możesz zastąpić foldery danych w zależności od sposobu partycjonowania tabel, używając dynamicznego nadpisywania partycji.
W przypadku większości operacji usługa Databricks zaleca użycie polecenia replaceWhere w celu określenia, które dane mają być zastępowane.
Important
Jeśli dane zostały przypadkowo zastąpione, możesz użyć przywróć, aby cofnąć zmianę.
Dowolne selektywne zastępowanie za pomocą polecenia replaceWhere
Można selektywnie zastąpić tylko dane pasujące do dowolnego wyrażenia.
Note
Sql wymaga środowiska Databricks Runtime 12.2 LTS lub nowszego.
Następujące polecenie niepodzielnie zastępuje zdarzenia w styczniu w tabeli docelowej, która jest partycjonowana na podstawie start_datez danymi w replace_data.
Python
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
)
Scala
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
SQL
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
Ten przykładowy kod zapisuje dane w replace_datapliku , sprawdza, czy wszystkie wiersze są zgodne z predykatem i wykonuje niepodzielne zastąpienie przy użyciu overwrite semantyki. Jeśli jakiekolwiek wartości operacji wykraczają poza ograniczenie, ta operacja domyślnie kończy się niepowodzeniem z powodu błędu.
To zachowanie można zmienić na overwrite wartości mieszczące się w zakresie predykatu i insert rekordy wykraczające poza określony zakres. Aby to zrobić, wyłącz sprawdzanie ograniczeń, ustawiając spark.databricks.delta.replaceWhere.constraintCheck.enabled wartość false przy użyciu jednego z następujących ustawień:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
Zachowanie starszej wersji
Starsze domyślne zachowanie powodowało, że replaceWhere zastępowało dane spełniające tylko warunek w kolumnach partycji. W przypadku tego starszego modelu następujące polecenie spowoduje atomowe zastąpienie miesiąca stycznia w tabeli docelowej, która jest partycjonowana przez date, danymi z df:
Python
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
)
Scala
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
Jeśli chcesz wrócić do starego zachowania, możesz wyłączyć flagę spark.databricks.delta.replaceWhere.dataColumns.enabled :
Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
nadpisywanie partycji dynamicznej
Mechanizm nadpisywania dynamicznego aktualizuje tylko te partycje, w których zapis zatwierdza nowe dane. Zastępuje wszystkie istniejące dane w tych partycjach i pozostawia inne niezmienione.
Usługa Azure Databricks obsługuje dwa podejścia:
-
REPLACE USING(zalecane) — działa we wszystkich typach obliczeniowych, w tym w magazynach SQL usługi Databricks, bezserwerowych obliczeniach i klasycznych obliczeniach. Nie wymaga ustawienia konfiguracji sesji platformy Spark. -
partitionOverwriteMode(starsza wersja) — wymaga klasycznego środowiska obliczeniowego i ustawienia konfiguracji sesji platformy Spark. Nieobsługiwane na Databricks SQL lub obliczeniach bezserwerowych.
W poniższych sekcjach pokazano, jak używać poszczególnych metod.
Dynamiczne nadpisywanie partycji za pomocą REPLACE USING
Środowisko Databricks Runtime w wersji 16.3 lub nowszej obsługuje zastępowanie partycji dynamicznej dla tabel partycjonowanych przy użyciu polecenia REPLACE USING. Ta metoda umożliwia selektywne zastępowanie danych we wszystkich typach obliczeniowych bez konieczności ustawiania konfiguracji sesji platformy Spark.
REPLACE USING Umożliwia niezależne od rodzaju obliczeń, atomowe zastępowanie, które działa w magazynach SQL usługi Databricks, obliczeniach bezserwerowych i klasycznych.
REPLACE USING zastępuje tylko partycje objęte danymi przychodzącymi. Wszystkie pozostałe partycje pozostają niezmienione.
W poniższym przykładzie pokazano, jak używać dynamicznego nadpisywania partycji za pomocą REPLACE USING. Obecnie można używać tylko języka SQL, a nie języka Python ani języka Scala. Aby uzyskać szczegółowe informacje, zobacz INSERT w dokumentacji języka SQL.
INSERT INTO TABLE events
REPLACE USING (event_id, start_date)
SELECT * FROM source_data
Należy pamiętać o następujących ograniczeniach i zachowaniach podczas zastępowania dynamicznych partycji:
- Należy określić pełny zestaw kolumn partycji tabeli w klauzuli
USING. - Zawsze sprawdzaj, czy zapisane dane dotykają tylko oczekiwanych partycji. Pojedynczy wiersz w niewłaściwej partycji może przypadkowo zastąpić całą partycję.
Jeśli potrzebujesz bardziej konfigurowalnej logiki dopasowywania niż REPLACE USING obsługiwana, na przykład traktuj NULL wartości jako równe, zamiast tego użyj uzupełniającego REPLACE ON . Zobacz INSERT , aby uzyskać szczegółowe informacje.
Dynamiczne nadpisywanie partycji z partitionOverwriteMode (starsza wersja)
Important
Ta funkcja jest dostępna w publicznej wersji testowej.
Środowisko Databricks Runtime 11.3 LTS i nowsze, obsługują dynamiczne zastępowanie partycji dla tabel podzielonych na partycje przy użyciu trybu zastępowania: INSERT OVERWRITE w SQL lub zapis ramki danych za pomocą df.write.mode("overwrite"). Ten typ nadpisywania jest dostępny tylko dla klasycznych metod obliczeniowych, a nie dla magazynów SQL Databricks ani obliczeń bezserwerowych.
Skonfiguruj tryb nadpisywania dynamicznych partycji, ustawiając konfigurację sesji Spark z spark.sql.sources.partitionOverwriteMode na dynamic. Alternatywnie możesz ustawić DataFrameWriter opcję partitionOverwriteMode na dynamic. Jeśli istnieje, opcja specyficzna dla zapytania zastępuje tryb zdefiniowany w konfiguracji sesji. Wartość domyślna to spark.sql.sources.partitionOverwriteModestatic.
W poniższym przykładzie pokazano użycie polecenia partitionOverwriteMode:
SQL
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
Python
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
Scala
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
Należy pamiętać o następujących ograniczeniach i zachowaniach dla programu partitionOverwriteMode:
- Nie można ustawić
overwriteSchemanatrue. - Nie można określić jednocześnie
partitionOverwriteModeireplaceWherew tej samej operacjiDataFrameWriter. - Jeśli określisz
replaceWherewarunek za pomocąDataFrameWriteropcji, usługa Delta Lake zastosuje ten warunek do kontrolowania, które dane są nadpisywane. Ta opcja ma pierwszeństwo przed konfiguracjąpartitionOverwriteModena poziomie sesji. - Zawsze sprawdzaj, czy zapisane dane dotykają tylko oczekiwanych partycji. Pojedynczy wiersz w niewłaściwej partycji może przypadkowo zastąpić całą partycję.