Sdílet prostřednictvím


Selektivní přepsání dat pomocí Delta Lake

Azure Databricks využívá funkci Delta Lake k podpoře dvou různých možností selektivního přepsání:

  • Možnost replaceWhere atomicky nahradí všechny záznamy, které odpovídají danému predikátu.
  • Adresáře dat můžete nahradit na základě způsobu dělení tabulek pomocí dynamických přepsání oddílů.

Pro většinu operací doporučuje replaceWhere Databricks určit, která data se mají přepsat.

Důležité

Pokud byla data omylem přepsána, můžete změnu vrátit zpět pomocí obnovení .

Libovolná selektivní přepsání pomocí replaceWhere

Můžete selektivně přepsat pouze data, která odpovídají libovolnému výrazu.

Poznámka:

SQL vyžaduje Databricks Runtime 12.2 LTS nebo vyšší.

Následující příkaz atomicky nahrazuje události v lednu v cílové tabulce, která je rozdělena pomocí start_datedat v replace_data:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Tento ukázkový kód zapíše data do replace_data, ověří, že všechny řádky odpovídají predikátu a provádí atomické nahrazení pomocí overwrite sémantiky. Pokud některé hodnoty v operaci spadají mimo omezení, tato operace ve výchozím nastavení selže s chybou.

Toto chování můžete změnit na overwrite hodnoty v rámci rozsahu predikátu a insert záznamů, které spadají mimo zadanou oblast. Pokud to chcete udělat, zakažte kontrolu omezení nastavením spark.databricks.delta.replaceWhere.constraintCheck.enabled na false pomocí jednoho z následujících nastavení:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Chování starší verze

Starší výchozí chování mělo replaceWhere přepsání dat odpovídajících predikátu pouze pro sloupce oddílů. V tomto starším modelu by následující příkaz atomicky nahradil měsíc Leden v cílové tabulce, který je rozdělený na oddíly date, daty v df:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")

Pokud se chcete vrátit ke starému chování, můžete příznak zakázat spark.databricks.delta.replaceWhere.dataColumns.enabled :

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Dynamické přepsání oddílů

Důležité

Tato funkce je ve verzi Public Preview.

Databricks Runtime 11.3 LTS a vyšší podporuje režim přepsání dynamických oddílů pro dělené tabulky. U tabulek s více oddíly podporují Databricks Runtime 11.3 LTS a níže pouze dynamické přepsání oddílů, pokud jsou všechny sloupce oddílů stejného datového typu.

V režimu přepsání dynamického oddílu operace přepíšou všechna existující data v každém logickém oddílu, pro které zápis potvrdí nová data. Všechny existující logické oddíly, pro které zápis neobsahuje data, zůstanou beze změny. Tento režim platí pouze v případě, že se data zapisují v režimu přepsání: buď INSERT OVERWRITE v SQL, nebo při zápisu do datového rámce s df.write.mode("overwrite").

Nakonfigurujte režim přepsání dynamického oddílu nastavením konfigurace spark.sql.sources.partitionOverwriteMode relace Sparku na dynamic. Můžete to také povolit nastavením DataFrameWriter možnosti partitionOverwriteModedynamic. Pokud existuje, možnost specifická pro dotaz přepíše režim definovaný v konfiguraci relace. Výchozí hodnota je partitionOverwriteModestatic.

Důležité

Ověřte, že se data zapsaná dynamickým oddílem přepíší jenom do očekávaných oddílů. Jeden řádek v nesprávném oddílu může vést k neúmyslnému přepsání celého oddílu.

Následující příklad ukazuje použití dynamického přepsání oddílů:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Poznámka:

  • Dynamické přepsání oddílu koliduje s možností replaceWhere pro dělené tabulky.
    • Pokud je v konfiguraci relace Sparku povolena možnost přepsání dynamického oddílu a replaceWhere je k dispozici jako DataFrameWriter možnost, delta Lake přepíše data podle výrazu replaceWhere (možnosti specifické pro dotazy přepisují konfigurace relace).
    • Pokud mají možnosti možnost přepsání dynamického DataFrameWriter oddílu i replaceWhere povolené, zobrazí se chyba.
  • Nelze určit overwriteSchema , jako true při použití dynamického přepsání oddílu.