Bagikan melalui


Tutorial: Pelatihan Terdistribusi dengan Horovod Estimator dan PyTorch (tidak digunakan lagi)

Horovod adalah kerangka kerja pelatihan terdistribusi untuk TensorFlow dan PyTorch. Dengan Horovod, pengguna dapat meningkatkan skrip pelatihan yang ada untuk dijalankan di ratusan GPU hanya dalam beberapa baris kode.

Dalam Azure Synapse Analytics, pengguna dapat dengan cepat memulai Horovod menggunakan runtime Apache Spark 3 default. Untuk aplikasi alur Spark ML menggunakan PyTorch, pengguna dapat menggunakan API estimator horovod.spark. Notebook ini menggunakan kerangka data Apache Spark untuk melakukan pelatihan terdistribusi model jaringan neural terdistribusi (DNN) pada himpunan data MNIST. Tutorial ini menggunakan PyTorch dan Horovod Estimator untuk menjalankan proses pelatihan.

Prasyarat

  • Ruang kerja Azure Synapse Analytics dengan akun penyimpanan Azure Data Lake Storage Gen2 yang dikonfigurasi sebagai penyimpanan default. Anda harus menjadi Penyumbang Data Blob Penyimpanan untuk file sistem Data Lake Storage Gen2 yang Anda gunakan.
  • Buat kumpulan Apache Spark dengan dukungan GPU di ruang kerja Azure Synapse Analytics Anda. Untuk detail selengkapnya, lihat Membuat kumpulan Apache Spark dengan dukungan GPU di Azure Synapse. Untuk tutorial ini, sebaiknya gunakan ukuran kluster GPU-Besar dengan 3 node.

Catatan

Fitur pratinjau untuk pool yang mendukung GPU di Azure Synapse tidak lagi didukung.

Mengonfigurasi sesi Apache Spark

Pada awal sesi, kita perlu mengonfigurasi beberapa pengaturan Apache Spark. Dalam kebanyakan kasus, kita hanya perlu menyetel numExecutors dan spark.rapids.memory.gpu.reserve. Untuk model besar, pengguna mungkin juga perlu mengonfigurasi spark.kryoserializer.buffer.max pengaturan. Untuk model TensorFlow, pengguna perlu mengatur spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH menjadi true.

Dalam contoh, Anda dapat melihat bagaimana konfigurasi Spark dapat diteruskan dengan %%configure perintah . Arti terperinci dari setiap parameter dijelaskan dalam Dokumentasi konfigurasi Apache Spark. Nilai yang disediakan adalah nilai praktik terbaik yang disarankan untuk kumpulan besar GPU Azure Synapse.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

Untuk tutorial ini, kita akan menggunakan konfigurasi berikut:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g"
   }
}

Catatan

Saat berlatih dengan Horovod, pengguna harus mengatur konfigurasi Spark agar numExecutors kurang atau sama dengan jumlah node.

Mengimpor dependensi

Dalam tutorial ini, kami menggunakan PySpark untuk membaca dan memproses himpunan data. Kemudian, kami menggunakan PyTorch dan Horovod untuk membangun model jaringan neural terdistribusi (DNN) dan menjalankan proses pelatihan. Untuk memulai, kita perlu mengimpor dependensi berikut:

# base libs
import sys
import uuid

# numpy
import numpy as np

# pyspark related
import pyspark
import pyspark.sql.types as T
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

# pytorch related
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# horovod related
import horovod.spark.torch as hvd
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store

# azure related
from azure.synapse.ml.horovodutils import AdlsStore

Menyambungkan ke akun penyimpanan alternatif

Kita memerlukan akun Azure Data Lake Storage (ADLS) untuk menyimpan data perantara dan model. Jika Anda menggunakan akun penyimpanan alternatif, pastikan untuk menyiapkan layanan tertaut untuk mengautentikasi dan membaca secara otomatis dari akun. Selain itu, Anda perlu memodifikasi properti berikut: remote_url, , account_namedan linked_service_name.

num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.01  # learning rate for single node code

uuid_str = str(uuid.uuid4())  # with uuid, each run will use a new directory
work_dir = '/tmp/' + uuid_str

# create adls store for model training, use your own adls account info
remote_url = "<<ABFS path to storage account>>"
account_name = "<<name of storage account>>"
linked_service_name = "<<name of linked service>>"
sas_token = TokenLibrary.getConnectionString(linked_service_name)
adls_store_path = remote_url + work_dir

store = AdlsStore.create(adls_store_path,
                         storage_options={
                             'account_name': account_name,
                             'sas_token': sas_token
                         },
                         save_runs=True)

print(adls_store_path)

Menyiapkan himpunan data

Selanjutnya, kita akan menyiapkan himpunan data untuk pelatihan. Dalam tutorial ini, kita akan menggunakan himpunan data MNIST dari Azure Open Datasets.

# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

# Download MNIST dataset from Azure Open Datasets
from azureml.opendatasets import MNIST

mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
mnist_df.info()

# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
mnist_df.head()

Memproses data dengan Apache Spark

Sekarang, kita akan membuat dataframe Apache Spark. Kerangka data ini akan digunakan dengan HorovodEstimator untuk pelatihan.

# Create Spark DataFrame for training
df = spark.createDataFrame(mnist_df)

# repartition DataFrame for training
train_df = df.repartition(num_proc)

# Train/test split
train_df, test_df = train_df.randomSplit([0.9, 0.1])

# show the dataset
train_df.show()
train_df.count()

Menentukan model DNN

Setelah selesai memproses himpunan data, kita sekarang dapat menentukan model PyTorch kita. Kode yang sama juga dapat digunakan untuk melatih model PyTorch node tunggal.

# Define the PyTorch model without any Horovod-specific parameters
class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = x.float()
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)


model = Net()
optimizer = optim.SGD(model.parameters(),
                      lr=lr_single_node * num_proc,
                      momentum=0.5)  # notice the lr is scaled up
loss = nn.NLLLoss()

Latih model

Sekarang, kita dapat melatih estimator Horovod Spark di atas kerangka data Apache Spark.

# Train a Horovod Spark Estimator on the DataFrame
backend = SparkBackend(num_proc=num_proc,
                       stdout=sys.stdout,
                       stderr=sys.stderr,
                       prefix_output_with_timestamp=True)

torch_estimator = hvd.TorchEstimator(
    backend=backend,
    store=store,
    partitions_per_process=1,  # important for GPU training
    model=model,
    optimizer=optimizer,
    loss=lambda input, target: loss(input, target.long()),
    input_shapes=[[-1, 1, 28, 28]],
    feature_cols=['features'],
    label_cols=['label'],
    batch_size=batch_size,
    epochs=epochs,
    validation=0.1,
    verbose=2)

torch_model = torch_estimator.fit(train_df).setOutputCols(['label_prob'])

Mengevaluasi model terlatih

Setelah proses pelatihan selesai, kita kemudian dapat mengevaluasi model pada himpunan data pengujian.

# Evaluate the model on the held-out test DataFrame
pred_df = torch_model.transform(test_df)

argmax = udf(lambda v: float(np.argmax(v)), returnType=T.DoubleType())
pred_df = pred_df.withColumn('label_pred', argmax(pred_df.label_prob))
evaluator = MulticlassClassificationEvaluator(predictionCol='label_pred',
                                              labelCol='label',
                                              metricName='accuracy')

print('Test accuracy:', evaluator.evaluate(pred_df))

Membersihkan sumber daya

Untuk memastikan instans Spark dimatikan, akhiri semua sesi yang tersambung (notebook). Kolam dihentikan sementara ketika waktu siaga yang ditentukan di kolam Apache Spark tercapai. Anda juga dapat memilih hentikan sesi dari bilah status di kanan atas buku catatan.

Cuplikan layar memperlihatkan tombol Hentikan sesi pada bilah status.

Langkah berikutnya