Потребность в оптимизации записи в Apache Spark
Аналитические рабочие нагрузки в модулях обработки больших данных, таких как Apache Spark, выполняются эффективнее всего при использовании больших файлов стандартного размера. Важную роль для производительности играет соотношение между размером файла, количеством файлов, числом рабочих ролей и конфигурациями Spark. Рабочие нагрузки приема в таблицы озера данных могут иметь унаследованную характеристику постоянной записи большого количества небольших файлов; этот сценарий обычно называется "проблемой с небольшим файлом".
Оптимизация записи — это функция Delta Lake в Synapse, которая сокращает число создаваемых файлов и увеличивает размер отдельных файлов, в которые ведется запись данных. Он динамически оптимизирует секции при создании файлов с размером по умолчанию 128 МБ. Размер целевого файла может быть изменен для каждой рабочей нагрузки с помощью конфигураций.
Эта функция обеспечивает нужный размер файла с помощью дополнительного этапа перетасовки данных по секциям, что ведет к повышению затрат на обработку при записи данных. Небольшой штраф при записи должен компенсироваться эффективностью чтения таблиц.
Примечание.
- Он доступен в пулах Synapse для версий Apache Spark выше 3.1.
Преимущества оптимизации записи
- Эта функция доступна в таблицах Delta Lake для записи как в пакетном, так и в потоковом режиме.
- Менять шаблон работы команды
spark.write
не требуется. Эта функция активируется параметром конфигурации или свойством таблицы. - Она уменьшает число транзакций записи по сравнению с командой OPTIMIZE.
- Операции OPTIMIZE выполняются быстрее, так как работают с меньшим количеством файлов.
- Команда VACUUM для удаления старых файлов без ссылок также работает быстрее.
- Запросы сканируют меньше файлов оптимизированного размера, что повышает или производительность чтения, или использование ресурсов.
Оптимизация сценариев использования записи
Сценарии использования
- Секционированные таблицы Delta Lake подвергаются написанию шаблонов, которые создают неоптимальные (менее 128 МБ) или нестандартные размеры файлов (файлы с различными размерами между собой).
- Повторно секционированные кадры данных, которые записываются на диск с неоптимальным размером файлов.
- Секционированные таблицы Delta Lake, к которым обращаются небольшие пакетные команды SQL, такие как UPDATE, DELETE, MERGE, CREATE TABLE AS SELECT, INSERT INTO и т. д.
- Сценарии потокового приема с шаблонами добавления данных в секционированные таблицы Delta Lake, где допустима дополнительная задержка при записи.
Сценарии, которых требуется избегать
- Несекционированные таблицы.
- Варианты использования, при которых дополнительная задержка при записи неприемлема.
- Большие таблицы с четко определенной структурой оптимизации и шаблонами чтения.
Включение и отключение функции оптимизации записи
Функция оптимизации записи по умолчанию отключена. В пуле Spark 3.3 он включен по умолчанию для секционированных таблиц.
После настройки параметров для пула или сеанса эта функция применяется для всех шаблонов записи Spark.
Для использования функции оптимизации записи включите ее с помощью следующего параметра:
- Scala и PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
- SQL Spark
SET `spark.microsoft.delta.optimizeWrite.enabled` = true
Чтобы проверить текущее значение конфигурации, используйте команду, как показано ниже.
- Scala и PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.enabled")
- SQL Spark
SET `spark.microsoft.delta.optimizeWrite.enabled`
Чтобы отключить функцию оптимизации записи, измените следующую конфигурацию, как показано ниже.
- Scala и PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "false")
- SQL Spark
SET `spark.microsoft.delta.optimizeWrite.enabled` = false
Управление оптимизацией записи с помощью свойств таблицы
Новые таблицы
- SQL
CREATE TABLE <table_name> TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
- Scala
Использование API DeltaTableBuilder
val table = DeltaTable.create()
.tableName("<table_name>")
.addColumnn("<colName>", <dataType>)
.location("<table_location>")
.property("delta.autoOptimize.optimizeWrite", "true")
.execute()
Существующие таблицы
- SQL
ALTER TABLE <table_name> SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
- Scala
Использование API DeltaTableBuilder
val table = DeltaTable.replace()
.tableName("<table_name>")
.location("<table_location>")
.property("delta.autoOptimize.optimizeWrite", "true")
.execute()
Как получить и изменить текущую конфигурацию максимального размера файла для оптимизации записи
Чтобы получить текущее значение параметра, используйте команды ниже. Значение по умолчанию — 128 МБ.
- Scala и PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.binSize")
- SQL
SET `spark.microsoft.delta.optimizeWrite.binSize`
- Изменение значения параметра
- Scala и PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "134217728")
- SQL
SET `spark.microsoft.delta.optimizeWrite.binSize` = 134217728