Zelfstudie: Gedistribueerde training met Horovod Estimator en PyTorch (afgeschaft)
Horovod is een gedistribueerd trainingsframework voor bibliotheken zoals TensorFlow en PyTorch. Met Horovod kunnen gebruikers een bestaand trainingsscript omhoog schalen om in slechts een paar regels code op honderden GPU's uit te voeren.
In Azure Synapse Analytics kunnen gebruikers snel aan de slag met Horovod met behulp van de standaard Apache Spark 3-runtime. Voor Spark ML-pijplijntoepassingen met behulp van PyTorch kunnen gebruikers de horovod.spark estimator-API gebruiken. In dit notebook wordt een Apache Spark-dataframe gebruikt om gedistribueerde training uit te voeren van een DNN-model (gedistribueerd neuraal netwerk) op de MNIST-gegevensset. In deze zelfstudie worden PyTorch en de Horovod Estimator gebruikt om het trainingsproces uit te voeren.
Vereisten
- Azure Synapse Analytics-werkruimte met een Azure Data Lake Storage Gen2-opslagaccount dat is geconfigureerd als de standaardopslag. U moet de bijdrager voor opslagblobgegevens zijn van het Data Lake Storage Gen2-bestandssysteem waarmee u werkt.
- Maak een Apache Spark-pool met GPU in uw Azure Synapse Analytics-werkruimte. Zie Een Apache Spark-pool met GPU maken in Azure Synapse voor meer informatie. Voor deze zelfstudie raden we u aan om de grootte van het GPU-grote cluster met 3 knooppunten te gebruiken.
Notitie
De preview voor azure Synapse GPU-pools is nu afgeschaft.
Let op
Melding over afschaffing en uitschakeling voor GPU's in de Azure Synapse Runtime voor Apache Spark 3.1 en 3.2
- De versnelde preview-versie van GPU is nu afgeschaft in de Apache Spark 3.2-runtime (afgeschaft). Afgeschafte runtimes hebben geen bug- en functiecorrecties. Deze runtime en de bijbehorende gpu versnelde preview op Spark 3.2 zijn vanaf 8 juli 2024 buiten gebruik gesteld en uitgeschakeld.
- De versnelde preview-versie van GPU is nu afgeschaft in de Runtime van Azure Synapse 3.1 (afgeschaft). Azure Synapse Runtime voor Apache Spark 3.1 heeft het einde van de ondersteuning bereikt vanaf 26 januari 2023, waarbij officiële ondersteuning vanaf 26 januari 2024 is stopgezet en geen verdere adressering van ondersteuningstickets, bugfixes of beveiligingsupdates meer dan deze datum.
De Apache Spark-sessie configureren
Aan het begin van de sessie moeten we enkele Apache Spark-instellingen configureren. In de meeste gevallen hoeven we alleen de numExecutors en spark.rapids.memory.gpu.reserve in te stellen. Voor grote modellen moeten gebruikers mogelijk ook de spark.kryoserializer.buffer.max
instelling configureren. Voor Tensorflow-modellen moeten gebruikers de spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH
optie waar instellen.
In het voorbeeld ziet u hoe de Spark-configuraties kunnen worden doorgegeven met de %%configure
opdracht. De gedetailleerde betekenis van elke parameter wordt uitgelegd in de Apache Spark-configuratiedocumentatie. De opgegeven waarden zijn de voorgestelde best practice-waarden voor Azure Synapse GPU-grote pools.
%%configure -f
{
"driverMemory": "30g",
"driverCores": 4,
"executorMemory": "60g",
"executorCores": 12,
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
"spark.kryoserializer.buffer.max": "2000m"
}
}
Voor deze zelfstudie gebruiken we de volgende configuraties:
%%configure -f
{
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g"
}
}
Notitie
Bij het trainen met Horovod moeten gebruikers de Spark-configuratie zo numExecutors
instellen dat ze kleiner of gelijk zijn aan het aantal knooppunten.
Afhankelijkheden importeren
In deze zelfstudie gebruiken we PySpark om de gegevensset te lezen en te verwerken. Vervolgens gebruiken we PyTorch en Horovod om het DNN-model (Distributed Neural Network) te bouwen en het trainingsproces uit te voeren. Om aan de slag te gaan, moeten we de volgende afhankelijkheden importeren:
# base libs
import sys
import uuid
# numpy
import numpy as np
# pyspark related
import pyspark
import pyspark.sql.types as T
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
# pytorch related
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
# horovod related
import horovod.spark.torch as hvd
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store
# azure related
from azure.synapse.ml.horovodutils import AdlsStore
Verbinding maken met een alternatief opslagaccount
We hebben het Azure Data Lake Storage-account (ADLS) nodig voor het opslaan van tussenliggende en modelgegevens. Als u een alternatief opslagaccount gebruikt, moet u de gekoppelde service zo instellen dat deze automatisch wordt geverifieerd en gelezen vanuit het account. Daarnaast moet u de volgende eigenschappen wijzigen: remote_url
, account_name
en linked_service_name
.
num_proc = 3 # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.01 # learning rate for single node code
uuid_str = str(uuid.uuid4()) # with uuid, each run will use a new directory
work_dir = '/tmp/' + uuid_str
# create adls store for model training, use your own adls account info
remote_url = "<<ABFS path to storage account>>"
account_name = "<<name of storage account>>"
linked_service_name = "<<name of linked service>>"
sas_token = TokenLibrary.getConnectionString(linked_service_name)
adls_store_path = remote_url + work_dir
store = AdlsStore.create(adls_store_path,
storage_options={
'account_name': account_name,
'sas_token': sas_token
},
save_runs=True)
print(adls_store_path)
Gegevensset voorbereiden
Vervolgens gaan we de gegevensset voorbereiden voor training. In deze zelfstudie gebruiken we de MNIST-gegevensset van Azure Open Datasets.
# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()
# Download MNIST dataset from Azure Open Datasets
from azureml.opendatasets import MNIST
mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
mnist_df.info()
# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
mnist_df.head()
Gegevens verwerken met Apache Spark
We gaan nu een Apache Spark-gegevensframe maken. Dit dataframe wordt gebruikt met de HorovodEstimator
voor training.
# Create Spark DataFrame for training
df = spark.createDataFrame(mnist_df)
# repartition DataFrame for training
train_df = df.repartition(num_proc)
# Train/test split
train_df, test_df = train_df.randomSplit([0.9, 0.1])
# show the dataset
train_df.show()
train_df.count()
DNN-model definiëren
Zodra we klaar zijn met het verwerken van onze gegevensset, kunnen we nu ons PyTorch-model definiëren. Dezelfde code kan ook worden gebruikt om een PyTorch-model met één knooppunt te trainen.
# Define the PyTorch model without any Horovod-specific parameters
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = x.float()
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x)
model = Net()
optimizer = optim.SGD(model.parameters(),
lr=lr_single_node * num_proc,
momentum=0.5) # notice the lr is scaled up
loss = nn.NLLLoss()
Model trainen
Nu kunnen we een Horovod Spark-estimator trainen boven op ons Apache Spark-dataframe.
# Train a Horovod Spark Estimator on the DataFrame
backend = SparkBackend(num_proc=num_proc,
stdout=sys.stdout,
stderr=sys.stderr,
prefix_output_with_timestamp=True)
torch_estimator = hvd.TorchEstimator(
backend=backend,
store=store,
partitions_per_process=1, # important for GPU training
model=model,
optimizer=optimizer,
loss=lambda input, target: loss(input, target.long()),
input_shapes=[[-1, 1, 28, 28]],
feature_cols=['features'],
label_cols=['label'],
batch_size=batch_size,
epochs=epochs,
validation=0.1,
verbose=2)
torch_model = torch_estimator.fit(train_df).setOutputCols(['label_prob'])
Getraind model evalueren
Zodra het trainingsproces is voltooid, kunnen we het model evalueren op de testgegevensset.
# Evaluate the model on the held-out test DataFrame
pred_df = torch_model.transform(test_df)
argmax = udf(lambda v: float(np.argmax(v)), returnType=T.DoubleType())
pred_df = pred_df.withColumn('label_pred', argmax(pred_df.label_prob))
evaluator = MulticlassClassificationEvaluator(predictionCol='label_pred',
labelCol='label',
metricName='accuracy')
print('Test accuracy:', evaluator.evaluate(pred_df))
Resources opschonen
Om ervoor te zorgen dat de Spark-instantie wordt afgesloten, beëindigt u alle verbonden sessies (notebooks). De pool wordt afgesloten wanneer de niet-actieve tijd is bereikt die is opgegeven in de Apache Spark-pool. U kunt ook stoppen van de sessie selecteren in de statusbalk rechtsboven in het notitieblok.