Share via


Zelfstudie: Gedistribueerde training met Horovod Estimator en PyTorch (afgeschaft)

Horovod is een gedistribueerd trainingsframework voor bibliotheken zoals TensorFlow en PyTorch. Met Horovod kunnen gebruikers een bestaand trainingsscript omhoog schalen om in slechts een paar regels code op honderden GPU's uit te voeren.

In Azure Synapse Analytics kunnen gebruikers snel aan de slag met Horovod met behulp van de standaard Apache Spark 3-runtime. Voor Spark ML-pijplijntoepassingen met behulp van PyTorch kunnen gebruikers de horovod.spark estimator-API gebruiken. In dit notebook wordt een Apache Spark-dataframe gebruikt om gedistribueerde training uit te voeren van een DNN-model (gedistribueerd neuraal netwerk) op de MNIST-gegevensset. In deze zelfstudie worden PyTorch en de Horovod Estimator gebruikt om het trainingsproces uit te voeren.

Vereisten

  • Azure Synapse Analytics-werkruimte met een Azure Data Lake Storage Gen2-opslagaccount dat is geconfigureerd als de standaardopslag. U moet de bijdrager voor opslagblobgegevens zijn van het Data Lake Storage Gen2-bestandssysteem waarmee u werkt.
  • Maak een Apache Spark-pool met GPU in uw Azure Synapse Analytics-werkruimte. Zie Een Apache Spark-pool met GPU maken in Azure Synapse voor meer informatie. Voor deze zelfstudie raden we u aan om de grootte van het GPU-grote cluster met 3 knooppunten te gebruiken.

Notitie

De preview voor azure Synapse GPU-pools is nu afgeschaft.

Let op

Melding over afschaffing en uitschakeling voor GPU's in de Azure Synapse Runtime voor Apache Spark 3.1 en 3.2

  • De versnelde preview-versie van GPU is nu afgeschaft in de Apache Spark 3.2-runtime (afgeschaft). Afgeschafte runtimes hebben geen bug- en functiecorrecties. Deze runtime en de bijbehorende gpu versnelde preview op Spark 3.2 zijn vanaf 8 juli 2024 buiten gebruik gesteld en uitgeschakeld.
  • De versnelde preview-versie van GPU is nu afgeschaft in de Runtime van Azure Synapse 3.1 (afgeschaft). Azure Synapse Runtime voor Apache Spark 3.1 heeft het einde van de ondersteuning bereikt vanaf 26 januari 2023, waarbij officiële ondersteuning vanaf 26 januari 2024 is stopgezet en geen verdere adressering van ondersteuningstickets, bugfixes of beveiligingsupdates meer dan deze datum.

De Apache Spark-sessie configureren

Aan het begin van de sessie moeten we enkele Apache Spark-instellingen configureren. In de meeste gevallen hoeven we alleen de numExecutors en spark.rapids.memory.gpu.reserve in te stellen. Voor grote modellen moeten gebruikers mogelijk ook de spark.kryoserializer.buffer.max instelling configureren. Voor Tensorflow-modellen moeten gebruikers de spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH optie waar instellen.

In het voorbeeld ziet u hoe de Spark-configuraties kunnen worden doorgegeven met de %%configure opdracht. De gedetailleerde betekenis van elke parameter wordt uitgelegd in de Apache Spark-configuratiedocumentatie. De opgegeven waarden zijn de voorgestelde best practice-waarden voor Azure Synapse GPU-grote pools.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

Voor deze zelfstudie gebruiken we de volgende configuraties:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g"
   }
}

Notitie

Bij het trainen met Horovod moeten gebruikers de Spark-configuratie zo numExecutors instellen dat ze kleiner of gelijk zijn aan het aantal knooppunten.

Afhankelijkheden importeren

In deze zelfstudie gebruiken we PySpark om de gegevensset te lezen en te verwerken. Vervolgens gebruiken we PyTorch en Horovod om het DNN-model (Distributed Neural Network) te bouwen en het trainingsproces uit te voeren. Om aan de slag te gaan, moeten we de volgende afhankelijkheden importeren:

# base libs
import sys
import uuid

# numpy
import numpy as np

# pyspark related
import pyspark
import pyspark.sql.types as T
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

# pytorch related
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# horovod related
import horovod.spark.torch as hvd
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store

# azure related
from azure.synapse.ml.horovodutils import AdlsStore

Verbinding maken met een alternatief opslagaccount

We hebben het Azure Data Lake Storage-account (ADLS) nodig voor het opslaan van tussenliggende en modelgegevens. Als u een alternatief opslagaccount gebruikt, moet u de gekoppelde service zo instellen dat deze automatisch wordt geverifieerd en gelezen vanuit het account. Daarnaast moet u de volgende eigenschappen wijzigen: remote_url, account_nameen linked_service_name.

num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.01  # learning rate for single node code

uuid_str = str(uuid.uuid4())  # with uuid, each run will use a new directory
work_dir = '/tmp/' + uuid_str

# create adls store for model training, use your own adls account info
remote_url = "<<ABFS path to storage account>>"
account_name = "<<name of storage account>>"
linked_service_name = "<<name of linked service>>"
sas_token = TokenLibrary.getConnectionString(linked_service_name)
adls_store_path = remote_url + work_dir

store = AdlsStore.create(adls_store_path,
                         storage_options={
                             'account_name': account_name,
                             'sas_token': sas_token
                         },
                         save_runs=True)

print(adls_store_path)

Gegevensset voorbereiden

Vervolgens gaan we de gegevensset voorbereiden voor training. In deze zelfstudie gebruiken we de MNIST-gegevensset van Azure Open Datasets.

# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

# Download MNIST dataset from Azure Open Datasets
from azureml.opendatasets import MNIST

mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
mnist_df.info()

# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
mnist_df.head()

Gegevens verwerken met Apache Spark

We gaan nu een Apache Spark-gegevensframe maken. Dit dataframe wordt gebruikt met de HorovodEstimator voor training.

# Create Spark DataFrame for training
df = spark.createDataFrame(mnist_df)

# repartition DataFrame for training
train_df = df.repartition(num_proc)

# Train/test split
train_df, test_df = train_df.randomSplit([0.9, 0.1])

# show the dataset
train_df.show()
train_df.count()

DNN-model definiëren

Zodra we klaar zijn met het verwerken van onze gegevensset, kunnen we nu ons PyTorch-model definiëren. Dezelfde code kan ook worden gebruikt om een PyTorch-model met één knooppunt te trainen.

# Define the PyTorch model without any Horovod-specific parameters
class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = x.float()
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)


model = Net()
optimizer = optim.SGD(model.parameters(),
                      lr=lr_single_node * num_proc,
                      momentum=0.5)  # notice the lr is scaled up
loss = nn.NLLLoss()

Model trainen

Nu kunnen we een Horovod Spark-estimator trainen boven op ons Apache Spark-dataframe.

# Train a Horovod Spark Estimator on the DataFrame
backend = SparkBackend(num_proc=num_proc,
                       stdout=sys.stdout,
                       stderr=sys.stderr,
                       prefix_output_with_timestamp=True)

torch_estimator = hvd.TorchEstimator(
    backend=backend,
    store=store,
    partitions_per_process=1,  # important for GPU training
    model=model,
    optimizer=optimizer,
    loss=lambda input, target: loss(input, target.long()),
    input_shapes=[[-1, 1, 28, 28]],
    feature_cols=['features'],
    label_cols=['label'],
    batch_size=batch_size,
    epochs=epochs,
    validation=0.1,
    verbose=2)

torch_model = torch_estimator.fit(train_df).setOutputCols(['label_prob'])

Getraind model evalueren

Zodra het trainingsproces is voltooid, kunnen we het model evalueren op de testgegevensset.

# Evaluate the model on the held-out test DataFrame
pred_df = torch_model.transform(test_df)

argmax = udf(lambda v: float(np.argmax(v)), returnType=T.DoubleType())
pred_df = pred_df.withColumn('label_pred', argmax(pred_df.label_prob))
evaluator = MulticlassClassificationEvaluator(predictionCol='label_pred',
                                              labelCol='label',
                                              metricName='accuracy')

print('Test accuracy:', evaluator.evaluate(pred_df))

Resources opschonen

Om ervoor te zorgen dat de Spark-instantie wordt afgesloten, beëindigt u alle verbonden sessies (notebooks). De pool wordt afgesloten wanneer de niet-actieve tijd is bereikt die is opgegeven in de Apache Spark-pool. U kunt ook stoppen van de sessie selecteren in de statusbalk rechtsboven in het notitieblok.

Schermopname van de knop Sessie stoppen op de statusbalk.

Volgende stappen