Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этой статье описывается, как использовать Mosaic Streaming для преобразования данных из Apache Spark в формат, поддерживаемый PyTorch.
Mosaic Streaming — это библиотека загрузки данных с открытым исходным кодом. Он обеспечивает одноузловую или распределенную подготовку и оценку моделей глубокого обучения из наборов данных, которые уже загружены в качестве кадров данных Apache Spark. Mosaic Streaming в основном поддерживает Mosaic Composer, но также интегрируется с PyTorch, PyTorch Lightning и TorchDistributor. Потоковая передача Mosaic предоставляет ряд преимуществ перед традиционными загрузчиками данных PyTorch, включая:
- Совместимость с любым типом данных, включая изображения, текст, видео и многомодальные данные.
- Поддержка крупных поставщиков облачных хранилищ (AWS, OCI, GCS, Azure, Databricks UC Volume и любых объектных хранилищ, совместимых с S3, таких как Cloudflare R2, Coreweave, Backblaze B2 и т. д.)
- Максимизация гарантий правильности, производительности, гибкости и простоты использования. Дополнительные сведения см. на странице основных функций .
Общие сведения о потоковой передаче Мозаики см. в документации по API потоковой передачи.
Примечание.
Мозаичная потоковая передача предварительно установлена во всех версиях Databricks Runtime 15.2 ML и выше.
Загрузка данных из Spark DataFrames с использованием Mosaic Streaming
Mosaic Streaming предоставляет простой рабочий процесс для преобразования из Apache Spark в формат Mosaic Data Shard (MDS), который затем можно загрузить для использования в распределенной среде.
Рекомендуемый рабочий процесс:
- Используйте Apache Spark для загрузки и, при необходимости, предварительной обработки данных.
- Используйте
streaming.base.converters.dataframe_to_mdsдля сохранения фрейма данных на диск для временного хранения и/или в каталог Unity для постоянного хранения. Эти данные будут храниться в формате MDS и могут быть оптимизированы с поддержкой сжатия и хэширования. Дополнительные варианты использования также могут включать предварительную обработку данных с помощью пользовательских функций. Для получения дополнительной информации см. руководство по Spark DataFrame в MDS. - Используется
streaming.StreamingDatasetдля загрузки необходимых данных в память.StreamingDataset— это версия PyTorch IterableDataset, которая включает эластично детерминированное перемешивание, обеспечивающее быстрое возобновление в середине эпохи. Дополнительные сведения см. в документации StreamingDataset. - Используется
streaming.StreamingDataLoaderдля загрузки необходимых данных для обучения, оценки и тестирования.StreamingDataLoader— это версия DataLoader из PyTorch, которая предоставляет дополнительный интерфейс контрольных точек и возобновления, с помощью которого отслеживается количество примеров, обработанных моделью на этом ранге.
Полный пример см. в следующей записной книжке:
Упростите загрузку данных из Spark в PyTorch с помощью записной книжки Mosaic Streaming
Устранение неполадок
Ошибка проверки подлинности
Если при загрузке данных из тома каталога Unity с помощью StreamingDatasetотображается следующая ошибка, настройте переменные среды, как показано ниже.
ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.
Примечание.
Если эта ошибка возникает при выполнении распределенного обучения с помощью TorchDistributor, необходимо также задать переменные среды на рабочих узлах.
db_host = "https://your-databricks-host.databricks.com"
db_token = "YOUR API TOKEN" # Create a token with either method from https://docs.databricks.com/en/dev-tools/auth/index.html#databricks-authentication-methods
def your_training_function():
import os
os.environ['DATABRICKS_HOST'] = db_host
os.environ['DATABRICKS_TOKEN'] = db_token
# The above function can be distributed with TorchDistributor:
# from pyspark.ml.torch.distributor import TorchDistributor
# distributor = TorchDistributor(...)
# distributor.run(your_training_function)
Проблемы с общей памятью Python 3.11
Из-за проблем с реализацией StreamingDataset общей памяти Python 3.11 может возникнуть временные проблемы в Databricks Runtime 15.4 LTS для машинного обучения. Эти проблемы можно избежать, выполнив обновление до Databricks Runtime 16.4 LTS для машинного обучения, так как Python 3.12 устраняет эти проблемы.