Condividi tramite


Esercitazione: Training distribuito con il servizio di stima Horovod e PyTorch (anteprima)

Horovod è un framework di training distribuito per librerie come TensorFlow e PyTorch. Con Horovod, gli utenti possono aumentare le prestazioni di uno script di training esistente per l'esecuzione su centinaia di GPU in poche righe di codice.

In Azure Synapse Analytics gli utenti possono iniziare rapidamente a usare Horovod con il runtime predefinito di Apache Spark 3. Per le applicazioni della pipeline di Spark ML con PyTorch, gli utenti possono usare l'API di stima horovod.spark. Questo notebook usa un dataframe Apache Spark per eseguire il training distribuito di un modello di rete neurale distribuito (Distributed Neural Network) nel set di dati MNIST. In questa esercitazione viene usato PyTorch e il servizio di stima Horovod per eseguire il processo di training.

Prerequisiti

  • Area di lavoro di Azure Synapse Analytics con un account di archiviazione di Azure Data Lake Storage Gen2 configurato come risorsa archiviazione predefinita. È necessario essere il Collaboratore ai dati dei BLOB della risorsa di archiviazione del file system di Data Lake Storage Gen2 con cui si lavora.
  • Creare un pool di Apache Spark abilitato per GPU nell'area di lavoro di Azure Synapse Analytics. Per informazioni dettagliate, vedere Creare un pool di Apache Spark abilitato per GPU in Azure Synapse. Per questa esercitazione è consigliabile usare le dimensioni del cluster GPU-Large con 3 nodi.

Nota

L'anteprima per i pool abilitati per la GPU di Azure Synapse è stata deprecata.

Attenzione

Notifica relativa alla deprecazione e disabilitazione per le GPU nel runtime di Azure Synapse per Apache Spark 3.1 e 3.2

  • L'anteprima con accelerazione della GPU è ora deprecata nel runtime di Apache Spark 3.2 (deprecato). Per i runtime deprecati non sono disponibili correzioni di bug e funzionalità. Questo runtime e l'anteprima con accelerazione della GPU corrispondente in Spark 3.2 sono stati ritirati e disabilitati a partire dall'8 luglio 2024.
  • L'anteprima accelerata della GPU è ora deprecata nel runtime di Azure Synapse 3.1 (deprecato). Il runtime di Azure Synapse per Apache Spark 3.1 ha raggiunto la fine del supporto lo scorso 26 gennaio 2023. Il supporto ufficiale è stato sospeso il 26 gennaio 2024 e a partire da tale data non sono stati più risolti i problemi relativi ai ticket di supporto né fornite correzioni ai bug o aggiornamenti della sicurezza.

Configurare la sessione di Apache Spark

All'inizio della sessione, è necessario configurare alcune impostazioni di Apache Spark. Nella maggior parte dei casi, è sufficiente impostare numExecutors e spark.rapids.memory.gpu.reserve. Per i modelli molto grandi, è talvolta necessario configurare l'impostazione spark.kryoserializer.buffer.max. Per i modelli TensorFlow, gli utenti devono impostare il spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH su true.

Nell'esempio è possibile vedere come le configurazioni di Spark possono essere passate con il comando %%configure. Il significato dettagliato di ogni parametro è illustrato nella documentazione di configurazione di Apache Spark. I valori indicati sono i valori suggeriti delle procedure consigliate per i pool della GPU di Azure Synapse di grandi dimensioni.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

Per questa esercitazione verranno usate le configurazioni seguenti:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g"
   }
}

Nota

Quando si esegue il training con Horovod, gli utenti devono impostare la configurazione di Spark per numExecutors in modo che sia minore o uguale al numero di nodi.

Importare le dipendenze

In questa esercitazione viene usato PySpark per leggere ed elaborare il set di dati. Vengono quindi usati PyTorch e Horovod per compilare il modello DNN (Distributed Neural Network) ed eseguire il processo di training. Per iniziare, è necessario importare le dipendenze seguenti:

# base libs
import sys
import uuid

# numpy
import numpy as np

# pyspark related
import pyspark
import pyspark.sql.types as T
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

# pytorch related
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# horovod related
import horovod.spark.torch as hvd
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store

# azure related
from azure.synapse.ml.horovodutils import AdlsStore

Connettersi a un account di archiviazione alternativo

Per archiviare i dati intermedi e del modello, è necessario l'account di Azure Data Lake Storage (ADLS). Se si usa un account di archiviazione alternativo, assicurarsi di configurare il servizio collegato per eseguire automaticamente l'autenticazione e la lettura dall'account. È inoltre necessario modificare le proprietà seguenti: remote_url, account_name e linked_service_name.

num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.01  # learning rate for single node code

uuid_str = str(uuid.uuid4())  # with uuid, each run will use a new directory
work_dir = '/tmp/' + uuid_str

# create adls store for model training, use your own adls account info
remote_url = "<<ABFS path to storage account>>"
account_name = "<<name of storage account>>"
linked_service_name = "<<name of linked service>>"
sas_token = TokenLibrary.getConnectionString(linked_service_name)
adls_store_path = remote_url + work_dir

store = AdlsStore.create(adls_store_path,
                         storage_options={
                             'account_name': account_name,
                             'sas_token': sas_token
                         },
                         save_runs=True)

print(adls_store_path)

Preparazione del set di dati

Si preparerà quindi il set di dati per il training. In questa esercitazione si userà il set di dati MNIST da set di dati aperti di Azure.

# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

# Download MNIST dataset from Azure Open Datasets
from azureml.opendatasets import MNIST

mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
mnist_df.info()

# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
mnist_df.head()

Elaborare i dati con Apache Spark

Verrà ora creato un dataframe apache Spark. Questo dataframe verrà usato con il HorovodEstimator per il training.

# Create Spark DataFrame for training
df = spark.createDataFrame(mnist_df)

# repartition DataFrame for training
train_df = df.repartition(num_proc)

# Train/test split
train_df, test_df = train_df.randomSplit([0.9, 0.1])

# show the dataset
train_df.show()
train_df.count()

Definire il modelli DNN

Dopo aver completato l'elaborazione del set di dati, è possibile definire il modello PyTorch. Lo stesso codice può essere usato anche per eseguire il training di un modello PyTorch a nodo singolo.

# Define the PyTorch model without any Horovod-specific parameters
class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = x.float()
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)


model = Net()
optimizer = optim.SGD(model.parameters(),
                      lr=lr_single_node * num_proc,
                      momentum=0.5)  # notice the lr is scaled up
loss = nn.NLLLoss()

Eseguire il training del modello

È ora possibile eseguire il training di uno strumento di stima Horovod Spark sopra il dataframe di Apache Spark.

# Train a Horovod Spark Estimator on the DataFrame
backend = SparkBackend(num_proc=num_proc,
                       stdout=sys.stdout,
                       stderr=sys.stderr,
                       prefix_output_with_timestamp=True)

torch_estimator = hvd.TorchEstimator(
    backend=backend,
    store=store,
    partitions_per_process=1,  # important for GPU training
    model=model,
    optimizer=optimizer,
    loss=lambda input, target: loss(input, target.long()),
    input_shapes=[[-1, 1, 28, 28]],
    feature_cols=['features'],
    label_cols=['label'],
    batch_size=batch_size,
    epochs=epochs,
    validation=0.1,
    verbose=2)

torch_model = torch_estimator.fit(train_df).setOutputCols(['label_prob'])

Valutare il modello sottoposto a training

Al termine del processo di training, è possibile valutare il modello nel set di dati di test.

# Evaluate the model on the held-out test DataFrame
pred_df = torch_model.transform(test_df)

argmax = udf(lambda v: float(np.argmax(v)), returnType=T.DoubleType())
pred_df = pred_df.withColumn('label_pred', argmax(pred_df.label_prob))
evaluator = MulticlassClassificationEvaluator(predictionCol='label_pred',
                                              labelCol='label',
                                              metricName='accuracy')

print('Test accuracy:', evaluator.evaluate(pred_df))

Pulire le risorse

Per assicurarsi che l'istanza di Spark venga arrestata, terminare tutte le sessioni connesse (notebook). Il pool si arresta quando viene raggiunto il tempo di inattività specificato nel pool di Apache Spark. Si può anche selezionare Termina sessione sulla barra di stato nella parte destra superiore del notebook.

Screenshot che mostra il pulsante Arresta sessione sulla barra di stato.

Passaggi successivi