Поделиться через


Загрузка данных с помощью Mosaic Streaming

В этой статье описывается, как использовать Mosaic Streaming для преобразования данных из Apache Spark в формат, поддерживаемый PyTorch.

Mosaic Streaming — это библиотека загрузки данных с открытым исходным кодом. Он обеспечивает одноузловую или распределенную подготовку и оценку моделей глубокого обучения из наборов данных, которые уже загружены в качестве кадров данных Apache Spark. Mosaic Streaming в основном поддерживает Mosaic Composer, но также интегрируется с PyTorch, PyTorch Lightning и TorchDistributor. Потоковая передача Mosaic предоставляет ряд преимуществ перед традиционными загрузчиками данных PyTorch, включая:

  • Совместимость с любым типом данных, включая изображения, текст, видео и многомодальные данные.
  • Поддержка крупных поставщиков облачных хранилищ (AWS, OCI, GCS, Azure, Databricks UC Volume и любых объектных хранилищ, совместимых с S3, таких как Cloudflare R2, Coreweave, Backblaze B2 и т. д.)
  • Максимизация гарантий правильности, производительности, гибкости и простоты использования. Дополнительные сведения см. на странице основных функций .

Общие сведения о потоковой передаче Мозаики см. в документации по API потоковой передачи.

Примечание.

Мозаичная потоковая передача предварительно установлена во всех версиях Databricks Runtime 15.2 ML и выше.

Загрузка данных из Spark DataFrames с использованием Mosaic Streaming

Mosaic Streaming предоставляет простой рабочий процесс для преобразования из Apache Spark в формат Mosaic Data Shard (MDS), который затем можно загрузить для использования в распределенной среде.

Рекомендуемый рабочий процесс:

  1. Используйте Apache Spark для загрузки и, при необходимости, предварительной обработки данных.
  2. Используйте streaming.base.converters.dataframe_to_mds для сохранения фрейма данных на диск для временного хранения и/или в каталог Unity для постоянного хранения. Эти данные будут храниться в формате MDS и могут быть оптимизированы с поддержкой сжатия и хэширования. Дополнительные варианты использования также могут включать предварительную обработку данных с помощью пользовательских функций. Для получения дополнительной информации см. руководство по Spark DataFrame в MDS.
  3. Используется streaming.StreamingDataset для загрузки необходимых данных в память. StreamingDataset — это версия PyTorch IterableDataset, которая включает эластично детерминированное перемешивание, обеспечивающее быстрое возобновление в середине эпохи. Дополнительные сведения см. в документации StreamingDataset.
  4. Используется streaming.StreamingDataLoader для загрузки необходимых данных для обучения, оценки и тестирования. StreamingDataLoader — это версия DataLoader из PyTorch, которая предоставляет дополнительный интерфейс контрольных точек и возобновления, с помощью которого отслеживается количество примеров, обработанных моделью на этом ранге.

Полный пример см. в следующей записной книжке:

Упростите загрузку данных из Spark в PyTorch с помощью записной книжки Mosaic Streaming

Получить записную книжку

Устранение неполадок

Ошибка проверки подлинности

Если при загрузке данных из тома каталога Unity с помощью StreamingDatasetотображается следующая ошибка, настройте переменные среды, как показано ниже.

ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.

Примечание.

Если эта ошибка возникает при выполнении распределенного обучения с помощью TorchDistributor, необходимо также задать переменные среды на рабочих узлах.

db_host = "https://your-databricks-host.databricks.com"
db_token = "YOUR API TOKEN" # Create a token with either method from https://docs.databricks.com/en/dev-tools/auth/index.html#databricks-authentication-methods

def your_training_function():
  import os
  os.environ['DATABRICKS_HOST'] = db_host
  os.environ['DATABRICKS_TOKEN'] = db_token

# The above function can be distributed with TorchDistributor:
# from pyspark.ml.torch.distributor import TorchDistributor
# distributor = TorchDistributor(...)
# distributor.run(your_training_function)

Проблемы с общей памятью Python 3.11

Из-за проблем с реализацией StreamingDataset общей памяти Python 3.11 может возникнуть временные проблемы в Databricks Runtime 15.4 LTS для машинного обучения. Эти проблемы можно избежать, выполнив обновление до Databricks Runtime 16.4 LTS для машинного обучения, так как Python 3.12 устраняет эти проблемы.