Potřeba optimalizace zápisu v Apache Sparku

Analytické úlohy na strojích pro zpracování velkých objemů dat, jako je Apache Spark, fungují nejefektivněji při použití standardizovaných větších velikostí souborů. Vztah mezi velikostí souboru, počtem souborů, počtem pracovních procesů Sparku a jeho konfigurací hraje důležitou roli při výkonu. Úlohy příjmu dat do tabulek Data Lake mohou mít zděděnou charakteristiku neustálého zápisu velkého množství malých souborů; tento scénář se běžně označuje jako "problém s malým souborem".

Optimalizace zápisu je funkce Delta Lake v Synapse, která snižuje počet zapsaných souborů a má za cíl zvýšit velikost jednotlivých souborů zapsaných dat. Dynamicky optimalizuje oddíly při generování souborů s výchozí velikostí 128 MB. Velikost cílového souboru se může změnit podle požadavků na úlohy pomocí konfigurací.

Tato funkce dosahuje velikosti souboru pomocí dodatečné fáze náhodného prohazování dat mezi oddíly, což způsobuje dodatečné náklady na zpracování při zápisu dat. Malé penalizace zápisu by měla být převažovaná efektivitou čtení v tabulkách.

Poznámka:

  • Je k dispozici ve fondech Synapse pro verze Apache Sparku vyšší než 3.1.

Výhody optimalizace zápisů

  • Je k dispozici v tabulkách Delta Lake pro vzory zápisu batch i streamování.
  • Vzor příkazů není potřeba měnit spark.write . Tato funkce je povolená nastavením konfigurace nebo vlastností tabulky.
  • Snižuje počet transakcí zápisu v porovnání s příkazem OPTIMIZE.
  • Operace OPTIMIZE budou rychlejší, protože bude fungovat s menším počtem souborů.
  • Příkaz VACUUM pro odstranění starých neodkazovaných souborů bude fungovat také rychleji.
  • Dotazy budou kontrolovat méně souborů s větší optimální velikostí souborů, což zlepšuje výkon čtení nebo využití prostředků.

Optimalizace scénářů použití zápisu

Kdy ji použít

  • Tabulky dělené systémem Delta Lake podléhají vzorcům zápisu, které generují neoptimální (méně než 128 MB) nebo ne standardizované velikosti souborů (soubory s různými velikostmi mezi sebou).
  • Repartitioned data frames that will be written to disk with suboptimal files size.
  • Tabulky dělené službou Delta Lake, na které cílí malé dávkové příkazy SQL, jako jsou UPDATE, DELETE, MERGE, CREATE TABLE AS SELECT, INSERT INTO atd.
  • Scénáře příjmu dat streamování se vzory přidávaných dat do tabulek rozdělených do Delta Lake, kde je latence dodatečného zápisu tolerovatelná.

Kdy se tomu vyhnout

  • Nesedělené tabulky.
  • Případy použití, kdy není přijatelná další latence zápisu.
  • Velké tabulky s dobře definovanými plány optimalizace a vzory čtení

Povolení a zakázání funkce optimalizace zápisu

Funkce optimalizace zápisu je ve výchozím nastavení zakázaná. Ve fondu Spark 3.3 je ve výchozím nastavení povolená pro dělené tabulky.

Jakmile je konfigurace nastavená pro fond nebo relaci, budou funkce používat všechny vzory zápisu Sparku.

Pokud chcete použít funkci optimalizace zápisu, povolte ji pomocí následující konfigurace:

  1. Scala a PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled` = true

Pokud chcete zkontrolovat aktuální hodnotu konfigurace, použijte příkaz, jak je znázorněno níže:

  1. Scala a PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.enabled")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled`

Pokud chcete funkci optimalizace zápisu zakázat, změňte následující konfiguraci, jak je znázorněno níže:

  1. Scala a PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "false")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled` = false

Řízení optimalizace zápisu pomocí vlastností tabulky

Na nových tabulkách

  1. SQL
CREATE TABLE <table_name> TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
  1. Scala

Použití rozhraní DeltaTableBuilder API:

val table = DeltaTable.create()
  .tableName("<table_name>")
  .addColumnn("<colName>", <dataType>)
  .location("<table_location>")
  .property("delta.autoOptimize.optimizeWrite", "true") 
  .execute()

U existujících tabulek

  1. SQL
ALTER TABLE <table_name> SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
  1. Scala

Použití rozhraní DeltaTableBuilder API

val table = DeltaTable.replace()
  .tableName("<table_name>")
  .location("<table_location>")
  .property("delta.autoOptimize.optimizeWrite", "true") 
  .execute()

Jak získat a změnit aktuální konfiguraci maximální velikosti souboru pro optimalizaci zápisu

Pokud chcete získat aktuální hodnotu konfigurace, použijte příkazy bellow. Výchozí hodnota je 128 MB.

  1. Scala a PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.binSize")
  1. SQL
SET `spark.microsoft.delta.optimizeWrite.binSize`
  • Změna hodnoty konfigurace
  1. Scala a PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "134217728")
  1. SQL
SET `spark.microsoft.delta.optimizeWrite.binSize` = 134217728

Další kroky