次の方法で共有


AI ランタイムにデータを読み込む

Important

単一ノード タスクの AI ランタイムは パブリック プレビュー段階です。 マルチ GPU ワークロード用の分散トレーニング API は ベータ版のままです。

このセクションでは、特に ML および DL アプリケーション用に AI ランタイムにデータを読み込む方法について説明します。 Spark Python API を使用してデータを読み込んで変換する方法の詳細については、 チュートリアル を参照してください。

Unity カタログが必要です。 AI ランタイムのすべてのデータ アクセスは、Unity カタログを経由します。 テーブルとボリュームは Unity カタログに登録され、ユーザーまたはサービス プリンシパルからアクセスできる必要があります。

表形式データを読み込む

Spark Connect を使用して 、Delta テーブルから表形式の機械学習データを読み込みます。

単一ノード トレーニングでは、PySpark メソッドを使用して Apache Spark DataFrames を pandas DataFrames に変換しtoPandas()必要に応じて PySpark メソッドを使用して NumPy 形式に変換できますto_numpy()

Spark Connect は、分析と名前解決を実行時間に遅延させ、コードの動作を変更する可能性があります。 Spark Connect と Spark クラシックの比較を参照してください。

Spark Connect では、Spark SQL、Spark 上の Pandas API、構造化ストリーミング、MLlib (DataFrame ベース) など、ほとんどの PySpark API がサポートされています。 サポートされている最新の API については、 PySpark API リファレンス ドキュメント を参照してください。

その他の制限事項については、「 サーバーレス コンピューティングの制限事項」を参照してください。

ボリュームを使用して大規模な Delta テーブルを読み込む

toPandas()で変換するには大きすぎる大きな差分テーブルの場合は、データを Unity カタログ ボリュームにエクスポートし、PyTorch または Hugging Face を使用して直接読み込みます。

# Step 1: Export the Delta table to Parquet files in a UC volume
output_path = "/Volumes/catalog/schema/my_volume/training_data"
spark.table("catalog.schema.my_table").write.mode("overwrite").parquet(output_path)
# Step 2: Load the exported data directly using Hugging Face datasets
from datasets import load_dataset

dataset = load_dataset("parquet", data_files="/Volumes/catalog/schema/my_volume/training_data/*.parquet")

この方法では、トレーニング中の Spark のオーバーヘッドを回避し、単一 GPU と分散トレーニングの両方のワークフローに適しています。

ボリュームから非構造化データを読み込む

イメージ、オーディオ、テキスト ファイルなどの非構造化データの場合は、Unity カタログ ボリュームを使用します。 次の例は、ボリュームからファイルを読み取り、PyTorch Datasetで使用する方法を示しています。

# Read files from a UC volume
volume_path = "/Volumes/catalog/schema/my_volume/images/"

from torch.utils.data import Dataset
import os
from PIL import Image

class ImageDataset(Dataset):
    def __init__(self, root_dir):
        self.file_list = [os.path.join(root_dir, f) for f in os.listdir(root_dir)]

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        img = Image.open(self.file_list[idx])
        return img

@distributed デコレーター内でデータを読み込む

分散トレーニングに サーバーレス GPU API を 使用する場合は、 @distributed デコレーター内でデータ読み込みコードを移動します。 データセットのサイズは pickle で許可されている最大サイズを超える可能性があるため、次に示すように、デコレーター内でデータセットを生成することをお勧めします。

from serverless_gpu import distributed

# This may cause a pickle error if the dataset is too large
dataset = get_dataset(file_path)

@distributed(gpus=8, gpu_type='H100')
def run_train():
    # Load data inside the decorator to avoid pickle serialization issues
    dataset = get_dataset(file_path)
    ...

データ読み込みのパフォーマンス

/Workspace ディレクトリと /Volumes ディレクトリは、リモート Unity カタログ ストレージでホストされます。 データセットが Unity カタログに格納されている場合、データの読み込み速度は使用可能なネットワーク帯域幅によって制限されます。 複数のエポックをトレーニングする場合は、最初にデータをローカルにコピーし、特に高速 NVMe SSD ストレージでホストされている /tmp ディレクトリにコピーすることをお勧めします。

データセットが大きい場合は、次の手法を使用してパフォーマンスを向上させることができます。

  • マルチエポック トレーニングのためにデータをローカルにキャッシュします。 /tmpにデータセットをコピーして、エポック間のアクセスを高速化します。

    import shutil
    shutil.copytree("/Volumes/catalog/schema/volume/dataset", "/tmp/dataset")
    
  • データ取得を並列化します。 複数のワーカーで Torch DataLoader を使用して、GPU 計算とデータ読み込みを重ね合わせて使用します。 num_workersを少なくとも 2 に設定します。 パフォーマンスを向上させるには、 num_workers (並列読み取りを増やす) または prefetch_factor (各ワーカープリフェッチ項目の数を増やす) を増やします。

    from torch.utils.data import DataLoader
    
    loader = DataLoader(
        dataset,
        batch_size=32,
        num_workers=2,
        prefetch_factor=2,
        pin_memory=True
    )
    
  • 大規模な表形式データセットには Spark Connect を使用します。 Spark Connect は、ほとんどの PySpark API をサポートし、分散読み取りを効率的に処理します。

ストリーミング データセット

メモリに収まらない非常に大規模なデータセットの場合は、ストリーミング アプローチを使用します。