Bagikan melalui


Memuat data menggunakan Mosaic Streaming

Artikel ini menjelaskan cara menggunakan Mosaic Streaming untuk mengonversi data dari Apache Spark ke format yang kompatibel dengan PyTorch.

Mosaic Streaming adalah pustaka pemuatan data sumber terbuka. Ini memungkinkan node tunggal atau pelatihan terdistribusi dan evaluasi model pembelajaran mendalam dari himpunan data yang sudah dimuat sebagai Apache Spark DataFrames. Mosaic Streaming terutama mendukung Mosaic Composer, tetapi juga terintegrasi dengan PyTorch asli, PyTorch Lightning, dan TorchDistributor. Mosaic Streaming memberikan serangkaian manfaat daripada PyTorch DataLoaders tradisional termasuk:

  • Kompatibilitas dengan jenis data apa pun, termasuk gambar, teks, video, dan data multimodal.
  • Dukungan untuk penyedia penyimpanan cloud utama (AWS, OCI, GCS, Azure, Databricks UC Volume, dan penyimpanan objek yang kompatibel dengan S3 seperti Cloudflare R2, Coreweave, Backblaze b2, dll.)
  • Memaksimalkan jaminan kebenaran, performa, fleksibilitas, dan kemudahan penggunaan. Untuk informasi selengkapnya, lihat halaman fitur utama mereka.

Untuk informasi umum tentang Mosaic Streaming, lihat dokumentasi API Streaming.

Catatan

Mosaic Streaming telah diinstal sebelumnya ke semua versi Databricks Runtime 15.2 ML dan yang lebih tinggi.

Memuat data dari Spark DataFrames menggunakan Mosaic Streaming

Mosaic Streaming menyediakan alur kerja mudah untuk mengonversi dari Apache Spark ke format Mosaic Data Shard (MDS) yang kemudian dapat dimuat untuk digunakan di lingkungan terdistribusi.

Alur kerja yang direkomendasikan adalah:

  1. Gunakan Apache Spark untuk memuat dan memproses data secara opsional.
  2. Gunakan streaming.base.converters.dataframe_to_mds untuk menyimpan dataframe ke disk untuk penyimpanan sementara dan/atau ke volume Katalog Unity untuk penyimpanan persisten. Data ini akan disimpan dalam format MDS dan dapat dioptimalkan lebih lanjut dengan dukungan untuk kompresi dan hashing. Kasus penggunaan tingkat lanjut juga dapat mencakup praproscesing data menggunakan UDF. Lihat tutorial Spark DataFrame ke MDS untuk informasi selengkapnya.
  3. Gunakan streaming.StreamingDataset untuk memuat data yang diperlukan ke memori. StreamingDataset adalah versi dari PyTorch IterableDataset yang mengedepankan pengacakan deterministik elastis, memungkinkan proses dimulainya kembali dengan cepat di tengah-tengah epoch. Lihat dokumentasi StreamingDataset untuk informasi selengkapnya.
  4. Gunakan streaming.StreamingDataLoader untuk memuat data yang diperlukan untuk pelatihan/evaluasi/pengujian. StreamingDataLoader adalah versi DataLoader PyTorch yang menyediakan antarmuka titik pemeriksaan/penerbitan ulang tambahan, yang melacak jumlah sampel yang dilihat oleh model dalam peringkat ini.

Untuk contoh lengkap, lihat buku catatan berikut:

Menyederhanakan pemuatan data dari Spark ke PyTorch menggunakan notebook Mosaic Streaming

Dapatkan buku catatan

Troubleshooting

Kesalahan autentikasi

Jika Anda melihat kesalahan berikut saat memuat data dari volume Katalog Unity menggunakan StreamingDataset, siapkan variabel lingkungan seperti yang ditunjukkan di bawah ini.

ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.

Catatan

Jika Anda melihat kesalahan ini saat menjalankan pelatihan terdistribusi menggunakan TorchDistributor, Anda juga harus mengatur variabel lingkungan pada simpul pekerja.

db_host = "https://your-databricks-host.databricks.com"
db_token = "YOUR API TOKEN" # Create a token with either method from https://docs.databricks.com/en/dev-tools/auth/index.html#databricks-authentication-methods

def your_training_function():
  import os
  os.environ['DATABRICKS_HOST'] = db_host
  os.environ['DATABRICKS_TOKEN'] = db_token

# The above function can be distributed with TorchDistributor:
# from pyspark.ml.torch.distributor import TorchDistributor
# distributor = TorchDistributor(...)
# distributor.run(your_training_function)

Masalah memori bersama (shared memory) Python 3.11

Karena masalah dengan implementasi memori bersama Python 3.11, StreamingDataset dapat mengalami masalah sementara pada Databricks Runtime 15.4 LTS untuk Pembelajaran Mesin. Anda dapat menghindari masalah ini dengan meningkatkan ke Databricks Runtime 16.4 LTS untuk Pembelajaran Mesin, karena Python 3.12 mengatasi masalah ini.