Potrzeba optymalizacji zapisu na platformie Apache Spark

Obciążenia analityczne w aparatach przetwarzania danych big data, takich jak Apache Spark, działają najbardziej wydajnie podczas korzystania ze standardowych większych rozmiarów plików. Relacja między rozmiarem pliku, liczbą plików, liczbą procesów roboczych platformy Spark i jego konfiguracjami odgrywa kluczową rolę w zakresie wydajności. Obciążenia pozyskiwania do tabel data lake mogą mieć dziedziczone cechy ciągłego pisania wielu małych plików; ten scenariusz jest powszechnie znany jako "problem z małym plikiem".

Optymalizacja zapisu to funkcja usługi Delta Lake w usłudze Synapse, która zmniejsza liczbę zapisanych plików i ma na celu zwiększenie indywidualnego rozmiaru zapisanych danych. Dynamicznie optymalizuje partycje podczas generowania plików o domyślnym rozmiarze 128 MB. Rozmiar pliku docelowego można zmienić na wymagania dotyczące obciążenia przy użyciu konfiguracji.

Ta funkcja umożliwia osiągnięcie rozmiaru pliku przy użyciu dodatkowej fazy mieszania danych w partycjach, co powoduje dodatkowy koszt przetwarzania podczas zapisywania danych. Mała kara za zapis powinna być przeważona przez wydajność odczytu w tabelach.

Uwaga

  • Jest ona dostępna w pulach usługi Synapse dla platformy Apache Spark w wersjach powyżej 3.1.

Zalety optymalizacji zapisów

  • Jest ona dostępna w tabelach usługi Delta Lake zarówno dla wzorców zapisu usługi Batch, jak i przesyłania strumieniowego.
  • Nie ma potrzeby zmieniania spark.write wzorca polecenia. Ta funkcja jest włączana przez ustawienie konfiguracji lub właściwość tabeli.
  • Zmniejsza liczbę transakcji zapisu w porównaniu z poleceniem OPTIMIZE.
  • Operacje OPTYMALIZACJI będą szybsze, ponieważ będą działać na mniejszej liczbie plików.
  • Polecenie VACUUM w celu usunięcia starych nieużywanych plików będzie również działać szybciej.
  • Zapytania przeskanują mniej plików o bardziej optymalnych rozmiarach plików, zwiększając wydajność odczytu lub użycie zasobów.

Optymalizowanie scenariuszy użycia zapisu

Zastosowanie

  • Tabele partycjonowane w usłudze Delta Lake podlegają wzorom zapisu, które generują nieoptymalne (mniej niż 128 MB) lub nietypowe rozmiary plików (pliki o różnych rozmiarach między sobą).
  • Ponownie partycjonowane ramki danych, które zostaną zapisane na dysku z nieoptymalnym rozmiarem plików.
  • Tabele partycjonowane w usłudze Delta Lake przeznaczone dla małych poleceń SQL wsadowych, takich jak UPDATE, DELETE, MERGE, CREATE TABLE AS SELECT, INSERT INTO itp.
  • Scenariusze pozyskiwania przesyłania strumieniowego z dołączanymi wzorcami danych do tabel partycjonowanych przez usługę Delta Lake, w których można tolerować dodatkowe opóźnienie zapisu.

Kiedy należy tego uniknąć

  • Tabele nieudzielone na partycje.
  • Przypadki użycia, w których dodatkowe opóźnienie zapisu nie jest akceptowalne.
  • Duże tabele z dobrze zdefiniowanymi harmonogramami optymalizacji i wzorcami odczytu.

Jak włączyć i wyłączyć funkcję optymalizowania zapisu

Funkcja optymalizacji zapisu jest domyślnie wyłączona. W puli platformy Spark 3.3 jest ona domyślnie włączona dla tabel partycjonowanych.

Po ustawieniu konfiguracji dla puli lub sesji wszystkie wzorce zapisu platformy Spark będą używać funkcji .

Aby użyć funkcji optymalizacji zapisu, włącz ją przy użyciu następującej konfiguracji:

  1. Scala i PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled` = true

Aby sprawdzić bieżącą wartość konfiguracji, użyj polecenia , jak pokazano poniżej:

  1. Scala i PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.enabled")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled`

Aby wyłączyć funkcję optymalizacji zapisu, zmień następującą konfigurację, jak pokazano poniżej:

  1. Scala i PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "false")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled` = false

Kontrolowanie optymalizacji zapisu przy użyciu właściwości tabeli

W nowych tabelach

  1. SQL
CREATE TABLE <table_name> TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
  1. Scala

Przy użyciu interfejsu API deltaTableBuilder:

val table = DeltaTable.create()
  .tableName("<table_name>")
  .addColumnn("<colName>", <dataType>)
  .location("<table_location>")
  .property("delta.autoOptimize.optimizeWrite", "true") 
  .execute()

W istniejących tabelach

  1. SQL
ALTER TABLE <table_name> SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
  1. Scala

Korzystanie z interfejsu API deltaTableBuilder

val table = DeltaTable.replace()
  .tableName("<table_name>")
  .location("<table_location>")
  .property("delta.autoOptimize.optimizeWrite", "true") 
  .execute()

Jak uzyskać i zmienić bieżącą konfigurację maksymalnego rozmiaru pliku dla optymalizacji zapisu

Aby uzyskać bieżącą wartość konfiguracji, użyj poleceń dzwonka. Wartość domyślna to 128 MB.

  1. Scala i PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.binSize")
  1. SQL
SET `spark.microsoft.delta.optimizeWrite.binSize`
  • Aby zmienić wartość konfiguracji
  1. Scala i PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "134217728")
  1. SQL
SET `spark.microsoft.delta.optimizeWrite.binSize` = 134217728

Następne kroki