Udostępnij za pośrednictwem


Ładowanie danych za pomocą Mosaic Streaming

W tym artykule opisano sposób użycia Mosaic Streaming do konwertowania danych z Apache Spark na format zgodny z biblioteką PyTorch.

Mozaika Streaming to biblioteka ładowania danych typu open source. Umożliwia jednowęzłowe lub rozproszone trenowanie i ewaluację modeli uczenia głębokiego z zestawów danych, które są już załadowane jako ramki danych Apache Spark. Mozaika Streaming przede wszystkim obsługuje Mozaika Composer, ale także integruje się z natywnym PyTorch, PyTorch Lightning oraz TorchDistributor. Usługa Mosaic Streaming oferuje szereg korzyści płynących z tradycyjnych modułów PyTorch DataLoaders, w tym:

  • Zgodność z dowolnym typem danych, w tym obrazami, tekstem, wideo i danymi wielomodalnymi.
  • Obsługa głównych dostawców magazynu w chmurze (AWS, OCI, GCS, Azure, Databricks UC Volume i dowolnych zgodnych magazynów obiektów S3, takich jak Cloudflare R2, Coreweave, Backblaze b2 itp.)
  • Maksymalizowanie gwarancji poprawności, wydajności, elastyczności i łatwości użycia. Aby uzyskać więcej informacji, odwiedź ich stronę kluczowych funkcji.

Aby uzyskać ogólne informacje na temat Mosaic Streaming, zapoznaj się z dokumentacją Streaming API.

Uwaga

Usługa Mosaic Streaming została wstępnie zainstalowana we wszystkich wersjach środowiska Databricks Runtime 15.2 ML i nowszych.

Ładowanie danych z ramek danych Spark przy użyciu strumieniowania Mosaic.

Mosaic Streaming zapewnia prosty przepływ pracy do konwertowania z Apache Spark na format Mozaikowego Fragmentu Danych (MDS), który można następnie załadować do użycia w środowisku rozproszonym.

Zalecany przepływ pracy to:

  1. Ładowanie i opcjonalne wstępne przetwarzanie danych przy użyciu platformy Apache Spark.
  2. Użyj streaming.base.converters.dataframe_to_mds do zapisania ramki danych na dysku w celu przechowywania tymczasowego i/lub do woluminu katalogu Unity na potrzeby przechowywania trwałego. Te dane będą przechowywane w formacie MDS i można je dodatkowo zoptymalizować z obsługą kompresji i skrótów. Zaawansowane przypadki użycia mogą również obejmować przetwarzanie wstępne danych za pomocą funkcji zdefiniowanych przez użytkownika. Aby uzyskać więcej informacji, zobacz samouczek Spark DataFrame to MDS.
  3. Użyj streaming.StreamingDataset polecenia , aby załadować niezbędne dane do pamięci. StreamingDataset jest wersją IterableDataset PyTorch, która oferuje deterministyczne i elastyczne mieszanie, co umożliwia szybkie wznowienie w połowie epoki. Aby uzyskać więcej informacji, zobacz dokumentację usługi StreamingDataset.
  4. Użyj streaming.StreamingDataLoader do załadowania niezbędnych danych do treningu, oceny i testowania. StreamingDataLoader to wersja modułu DataLoader firmy PyTorch, która udostępnia dodatkowy interfejs punktu kontrolnego/wznowienia, dla którego śledzi liczbę próbek widocznych przez model w tej klasyfikacji.

Aby zapoznać się z przykładem kompleksowym, zobacz następujący notatnik.

Uproszczenie ładowania danych z platformy Spark do PyTorch przy użyciu notatnika Mosaic Streaming

Pobierz zeszyt

Rozwiązywanie problemów: błąd uwierzytelniania

Jeśli podczas ładowania danych z woluminu Unity Catalog przy użyciu StreamingDatasetzostanie wyświetlony następujący błąd, skonfiguruj zmienne środowiskowe, jak pokazano poniżej.

ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.

Uwaga

Jeśli ten błąd występuje podczas uruchamiania trenowania rozproszonego przy użyciu TorchDistributor, należy również ustawić zmienne środowiskowe na węzłach roboczych.

db_host = "https://your-databricks-host.databricks.com"
db_token = "YOUR API TOKEN" # Create a token with either method from https://docs.databricks.com/en/dev-tools/auth/index.html#databricks-authentication-methods

def your_training_function():
  import os
  os.environ['DATABRICKS_HOST'] = db_host
  os.environ['DATABRICKS_TOKEN'] = db_token

# The above function can be distributed with TorchDistributor:
# from pyspark.ml.torch.distributor import TorchDistributor
# distributor = TorchDistributor(...)
# distributor.run(your_training_function)