Adatok betöltése a Mozaik streameléssel

Ez a cikk azt ismerteti, hogyan konvertálhat adatokat az Apache Sparkból a PyTorch-kompatibilis formátumba a Mozaik streamelés használatával.

A Mosaic Streaming egy nyílt forráskódú adatbetöltő könyvtár. Lehetővé teszi a mélytanulási modellek egycsomópontos vagy elosztott betanítását és kiértékelését olyan adathalmazokból, amelyek már Apache Spark DataFrame-ként vannak betöltve. A Mozaikstreamelés elsősorban a Mozaik zeneszerzőt támogatja, de integrálható a natív PyTorch, a PyTorch Lightning és a TorchDistributor használatával is. A Mozaik streamelés számos előnnyel jár a hagyományos PyTorch DataLoaderek esetében, többek között a következőket:

  • Kompatibilitás bármilyen adattípussal, beleértve a képeket, szövegeket, videókat és multimodális adatokat.
  • A fő felhőalapú tárolószolgáltatók (AWS, OCI, GCS, Azure, Databricks UC Volume és bármilyen S3 kompatibilis objektumtároló, például Cloudflare R2, Coreweave, Backblaze b2 stb.) támogatása
  • A helyesség maximalizálása garantálja a teljesítményt, a rugalmasságot és a könnyű használatot. További információkért tekintse meg a főbb funkciók lapját .

A Mozaik streameléssel kapcsolatos általános információkért tekintse meg a Streaming API dokumentációját.

Megjegyzés

A Mozaikstreamelés előre telepítve lett a Databricks Runtime 15.2 ML és újabb verzióiban.

Adatok betöltése Spark-adatkeretekből a Mozaikstreamelés használatával

A Mozaikstreamelés egyszerű munkafolyamatot biztosít az Apache Sparkból a Mozaik adatszilánk (MDS) formátumba való konvertáláshoz, amely aztán betölthető elosztott környezetben való használatra.

Az ajánlott munkafolyamat a következő:

  1. Az Apache Spark használata adatok betöltésére és igény szerint előfeldolgozására.
  2. Az streaming.base.converters.dataframe_to_mds használatával az adatkeretet ideiglenes tároló lemezre és/vagy unitykatalógus-kötetre mentheti az állandó tároláshoz. Ezek az adatok MDS formátumban lesznek tárolva, és a tömörítés és kivonatolás támogatásával tovább optimalizálhatók. A speciális használati esetek magukban foglalhatják az adatok UDF-ek használatával történő előfeldolgozását is. További információért tekintse meg a Spark DataFrame–MDS oktatóanyagot .
  3. A szükséges adatok memóriába való betöltésére használható streaming.StreamingDataset . StreamingDataset A PyTorch IterableDataset egy olyan verziója, amely rugalmasan determinisztikus átrendezést tartalmaz, ami lehetővé teszi a gyors, közép-korszakközi újrakezdést. További információért tekintse meg a StreamingDataset dokumentációját .
  4. A betanításhoz/értékeléshez/teszteléshez szükséges adatok betöltésére használható streaming.StreamingDataLoader . StreamingDataLoader A PyTorch DataLoader egy verziója, amely egy további ellenőrzőpontot/újraindítási felületet biztosít, amely nyomon követi a modell által ebben a rangsorban látott minták számát.

Egy átfogó példáért tekintse meg a következő notebookot:

Adatbetöltés egyszerűsítése a Sparkból a PyTorchba a Mozaik streamelési jegyzetfüzet használatával

Jegyzetfüzet lekérése

Hibaelhárítás

Hitelesítési hiba

Ha az alábbi hibaüzenet jelenik meg, amikor adatokat tölt be egy Unity-katalógus kötetéből StreamingDatasethasználatával, állítsa be a környezeti változókat az alábbiak szerint.

ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.

Feljegyzés

Ha ez a hiba akkor jelenik meg, ha elosztott betanítást futtat a TorchDistributorhasználatával, a környezeti változókat is be kell állítania a feldolgozó csomópontokon.

db_host = "https://your-databricks-host.databricks.com"
db_token = "YOUR API TOKEN" # Create a token with either method from https://docs.databricks.com/en/dev-tools/auth/index.html#databricks-authentication-methods

def your_training_function():
  import os
  os.environ['DATABRICKS_HOST'] = db_host
  os.environ['DATABRICKS_TOKEN'] = db_token

# The above function can be distributed with TorchDistributor:
# from pyspark.ml.torch.distributor import TorchDistributor
# distributor = TorchDistributor(...)
# distributor.run(your_training_function)

A Python 3.11 megosztott memóriával kapcsolatos problémái

A Python 3.11 megosztott memória-implementációjával StreamingDataset kapcsolatos problémák miatt átmeneti problémákba ütközhet a Databricks Runtime 15.4 LTS for Machine Learning. Ezeket a problémákat elkerülheti a Databricks Runtime 16.4 LTS for Machine Learningre való frissítéssel, mivel a Python 3.12 ezeket a problémákat kezeli.