Comparteix a través de


Tutorial: Entrenamiento distribuido con estimador Horovod y PyTorch (versión preliminar)

Horovod es un marco de entrenamiento distribuido para librerías como TensorFlow y PyTorch. Con Horovod, los usuarios pueden escalar verticalmente un script de entrenamiento existente para ejecutarse en cientos de GPU en unas pocas líneas de código.

En Azure Synapse Analytics, los usuarios pueden empezar a trabajar rápidamente con Horovod mediante el entorno de ejecución predeterminado de Apache Spark 3. En el caso de las aplicaciones de canalización de Spark ML mediante PyTorch, los usuarios pueden usar la API del estimador horovod.spark. Este cuaderno usa un dataframe de Apache Spark para realizar el entrenamiento distribuido de un modelo de red neuronal distribuida (DNN) en el conjunto de datos de MNIST. En este tutorial se usa PyTorch y el estimador Horovod para ejecutar el proceso de entrenamiento.

Requisitos previos

  • Necesitará un área de trabajo de Azure Synapse Analytics con una cuenta de almacenamiento de Azure Data Lake Storage Gen2 que esté configurada como almacenamiento predeterminado. Asegúrese de que es el colaborador de datos de Storage Blob en el sistema de archivos de Data Lake Storage Gen2 con el que trabaja.
  • Cree un grupo de Apache Spark habilitado para GPU en el área de trabajo de Azure Synapse Analytics. Para más información, consulte Creación de un grupo de Apache Spark habilitado para GPU en Azure Synapse. En este tutorial, se recomienda usar el tamaño del clúster de GPU grande con 3 nodos.

Advertencia

  • La versión preliminar acelerada por GPU se limita al entorno de ejecución deApache Spark 3.2 (fin de soporte técnico anunciado). Final del soporte técnico anunciado para Azure Synapse Runtime para Apache Spark 3.2 se ha anunciado el 8 de julio de 2023. El final del soporte técnico anunciado en tiempo de ejecución no tendrá correcciones de errores y características. Las correcciones de seguridad se realizarán en función de la evaluación de riesgos. Este tiempo de ejecución y la versión preliminar acelerada de GPU correspondiente en Spark 3.2 se retirarán y deshabilitarán a partir del 8 de julio de 2024.
  • La versión preliminar acelerada por GPU ahora no se admite en el entorno de ejecución de Azure Synapse 3.1 (no compatible). El 26 de enero de 2023 el entorno de ejecución de Azure Synapse para Apache Spark 3.1 ha alcanzado su fin de soporte técnico, por lo que a partir del 26 de enero de 2024 se interrumpirá el soporte técnico oficial y no se atenderán más incidencias de soporte técnico, correcciones de errores ni actualizaciones de seguridad después de esta fecha.

Configuración de la sesión de Apache Spark

Al principio de la sesión, es necesario configurar algunas opciones de Apache Spark. En la mayoría de los casos, solo es necesario establecer los valores numExecutors y spark.rapids.memory.gpu.reserve. En el caso de los modelos grandes, es posible que los usuarios también necesiten configurar el valor spark.kryoserializer.buffer.max. En el caso de los modelos de Tensorflow, los usuarios deberán establecer spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH en true.

En el ejemplo, puede ver cómo se pueden pasar las configuraciones de Spark con el comando %%configure. El significado detallado de cada parámetro se explica en la documentación sobre la configuración de Apache Spark. Los valores que se proporcionan son los valores recomendados para grupos de GPU grande de Azure Synapse.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

En este tutorial, usaremos las siguientes configuraciones:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g"
   }
}

Nota

Al entrenar con Horovod, los usuarios deben establecer la configuración de Spark para que numExecutors sea menor o igual que el número de nodos.

Dependencias de importación

En este tutorial, se usa PySpark para leer y procesar el conjunto de datos. A continuación, usamos PyTorch y Horovod para crear el modelo de red neuronal distribuida (DNN) y ejecutar el proceso de entrenamiento. Para empezar, es necesario importar las siguientes dependencias:

# base libs
import sys
import uuid

# numpy
import numpy as np

# pyspark related
import pyspark
import pyspark.sql.types as T
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

# pytorch related
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# horovod related
import horovod.spark.torch as hvd
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store

# azure related
from azure.synapse.ml.horovodutils import AdlsStore

Conectar a una cuenta de almacenamiento alternativa

Necesitaremos la cuenta de Azure Data Lake Storage (ADLS) para almacenar datos intermedios y de modelos. Si usa una cuenta de almacenamiento alternativa, asegúrese de configurar el servicio vinculado para autenticarse y leer automáticamente desde la cuenta. Además, debe modificar las siguientes propiedades: remote_url, account_namey linked_service_name.

num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.01  # learning rate for single node code

uuid_str = str(uuid.uuid4())  # with uuid, each run will use a new directory
work_dir = '/tmp/' + uuid_str

# create adls store for model training, use your own adls account info
remote_url = "<<ABFS path to storage account>>"
account_name = "<<name of storage account>>"
linked_service_name = "<<name of linked service>>"
sas_token = TokenLibrary.getConnectionString(linked_service_name)
adls_store_path = remote_url + work_dir

store = AdlsStore.create(adls_store_path,
                         storage_options={
                             'account_name': account_name,
                             'sas_token': sas_token
                         },
                         save_runs=True)

print(adls_store_path)

Preparación del conjunto de datos

A continuación, prepararemos el conjunto de datos para el entrenamiento. En este tutorial, usaremos el conjunto de datos de MNIST de Azure Open Datasets.

# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

# Download MNIST dataset from Azure Open Datasets
from azureml.opendatasets import MNIST

mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
mnist_df.info()

# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
mnist_df.head()

Procesar datos con Apache Spark

Ahora crearemos un dataframe de Apache Spark. Este dataframe se usará con HorovodEstimator para el entrenamiento.

# Create Spark DataFrame for training
df = spark.createDataFrame(mnist_df)

# repartition DataFrame for training
train_df = df.repartition(num_proc)

# Train/test split
train_df, test_df = train_df.randomSplit([0.9, 0.1])

# show the dataset
train_df.show()
train_df.count()

Definición de los modelos DNN

Una vez que hayamos terminado de procesar el conjunto de datos, podremos definir nuestro modelo de PyTorch. El mismo código también se puede usar para entrenar un modelo PyTorch de un solo nodo.

# Define the PyTorch model without any Horovod-specific parameters
class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = x.float()
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)


model = Net()
optimizer = optim.SGD(model.parameters(),
                      lr=lr_single_node * num_proc,
                      momentum=0.5)  # notice the lr is scaled up
loss = nn.NLLLoss()

Entrenamiento de un modelo

Podemos entrenar un estimador de Horovod Spark sobre nuestro dataframe de Apache Spark.

# Train a Horovod Spark Estimator on the DataFrame
backend = SparkBackend(num_proc=num_proc,
                       stdout=sys.stdout,
                       stderr=sys.stderr,
                       prefix_output_with_timestamp=True)

torch_estimator = hvd.TorchEstimator(
    backend=backend,
    store=store,
    partitions_per_process=1,  # important for GPU training
    model=model,
    optimizer=optimizer,
    loss=lambda input, target: loss(input, target.long()),
    input_shapes=[[-1, 1, 28, 28]],
    feature_cols=['features'],
    label_cols=['label'],
    batch_size=batch_size,
    epochs=epochs,
    validation=0.1,
    verbose=2)

torch_model = torch_estimator.fit(train_df).setOutputCols(['label_prob'])

Evaluar el modelo entrenado

Una vez finalizado el proceso de entrenamiento, podemos evaluar el modelo en el conjunto de datos de prueba.

# Evaluate the model on the held-out test DataFrame
pred_df = torch_model.transform(test_df)

argmax = udf(lambda v: float(np.argmax(v)), returnType=T.DoubleType())
pred_df = pred_df.withColumn('label_pred', argmax(pred_df.label_prob))
evaluator = MulticlassClassificationEvaluator(predictionCol='label_pred',
                                              labelCol='label',
                                              metricName='accuracy')

print('Test accuracy:', evaluator.evaluate(pred_df))

Limpieza de recursos

Para asegurarse de que se cierra la instancia de Spark, finalice todas las sesiones (cuadernos) conectadas. El grupo se cierra cuando se alcanza el tiempo de inactividad especificado en el grupo de Apache Spark. También puede decidir finalizar la sesión en la barra de estado en la parte superior derecha del cuaderno.

Captura de pantalla que muestra el botón Detener sesión en la barra de estado.

Pasos siguientes