Compartilhar via


Tutorial: treinamento distribuído com Horovod Estimator e PyTorch (preterido)

O Horovod é uma estrutura de treinamento distribuída para bibliotecas como TensorFlow e PyTorch. Com o Horovod, os usuários podem escalar verticalmente um script de treinamento existente para ser executado em centenas de GPUs em apenas algumas linhas de código.

No Azure Synapse Analytics, os usuários podem começar rapidamente com o Horovod usando o tempo de execução padrão do Apache Spark 3. Para aplicativos de pipeline do Spark ML usando o PyTorch, os usuários podem usar a API do estimador horovod.spark. Este notebook usa um dataframe do Apache Spark para executar o treinamento distribuído de um modelo de DNN (rede neural distribuída) no conjunto de dados do MNIST. Este tutorial usa o PyTorch e o Horovod Estimator para executar o processo de treinamento.

Pré-requisitos

  • Workspace do Azure Synapse Analytics com uma conta de armazenamento do Azure Data Lake Storage Gen2 configurada como o armazenamento padrão. Você precisa ser Colaborador de Dados do Storage Blob do sistema de arquivos Data Lake Storage Gen2 com o qual você trabalha.
  • Crie um pool do Apache Spark habilitado para GPU no workspace do Azure Synapse Analytics. Para obter detalhes, confira Criar um pool do Apache Spark habilitado para GPU no Azure Synapse. Para este tutorial, sugerimos usar o tamanho de cluster GPU-Large com três nós.

Observação

A versão prévia dos pools habilitados para GPU do Azure Synapse foi preterida.

Cuidado

Notificação de depreciação e desativação para GPUs no Azure Synapse Runtime para Apache Spark 3.1 e 3.2

  • A visualização acelerada por GPU agora está obsoleta no tempo de execução do Apache Spark 3.2 (preterido). Os runtimes preteridos não terão correções de bugs e recursos. Esse tempo de execução e a visualização acelerada de GPU correspondente no Spark 3.2 foram descontinuados e desativados em 8 de julho de 2024.
  • A versão prévia acelerada por GPU agora está obsoleta no tempo de execução Azure Synapse 3.1 (preterido). O Runtime do Azure Synapse para Apache Spark 3.1 atingiu o fim do suporte em 26 de janeiro de 2023, tendo o suporte oficial sido descontinuado a partir de 26 de janeiro de 2024, bem como o endereçamento de tíquetes de suporte, correções de bugs ou atualizações de segurança após essa data.

Configurar a sessão do Apache Spark

No início da sessão, precisamos configurar algumas configurações do Apache Spark. Na maioria dos casos, só precisamos definir numExecutors e spark.rapids.memory.gpu.reserve. Para modelos grandes, os usuários também podem precisar definir a configuração spark.kryoserializer.buffer.max. Para modelos Tensorflow, os usuários precisam definir o spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH para ser true.

No exemplo, você pode ver como as configurações do Spark podem ser passadas com o comando %%configure. O significado detalhado de cada parâmetro é explicado na documentação de configuração do Apache Spark. Os valores fornecidos são os valores de práticas recomendadas sugeridos para pools grandes de GPU do Azure Synapse.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

Para este tutorial, usaremos as seguintes configurações:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g"
   }
}

Observação

Ao treinar com o Horovod, os usuários devem definir a configuração do Spark para numExecutors ser menor ou igual ao número de nós.

Importar dependências

Neste tutorial, usamos o PySpark para ler e processar o conjunto de dados. Em seguida, usamos PyTorch e Horovod para construir o modelo de rede neural distribuída (DNN) e executar o processo de treinamento. Para começar, precisamos importar as seguintes dependências:

# base libs
import sys
import uuid

# numpy
import numpy as np

# pyspark related
import pyspark
import pyspark.sql.types as T
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

# pytorch related
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# horovod related
import horovod.spark.torch as hvd
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store

# azure related
from azure.synapse.ml.horovodutils import AdlsStore

Conexão para a conta de armazenamento alternativa

Precisamos da conta do Azure Data Lake Storage (ADLS) para armazenar dados intermediários e de modelo. Se você estiver usando uma conta de armazenamento alternativa, configure o serviço vinculado para autenticar e ler automaticamente a partir da conta. Além disso, você precisa modificar as seguintes propriedades: remote_url, account_name e linked_service_name.

num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.01  # learning rate for single node code

uuid_str = str(uuid.uuid4())  # with uuid, each run will use a new directory
work_dir = '/tmp/' + uuid_str

# create adls store for model training, use your own adls account info
remote_url = "<<ABFS path to storage account>>"
account_name = "<<name of storage account>>"
linked_service_name = "<<name of linked service>>"
sas_token = TokenLibrary.getConnectionString(linked_service_name)
adls_store_path = remote_url + work_dir

store = AdlsStore.create(adls_store_path,
                         storage_options={
                             'account_name': account_name,
                             'sas_token': sas_token
                         },
                         save_runs=True)

print(adls_store_path)

Preparar o conjunto de dados

Em seguida, prepararemos o conjunto de dados para treinamento. Neste tutorial, usaremos o conjunto de dados MNIST do Azure Open Datasets.

# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

# Download MNIST dataset from Azure Open Datasets
from azureml.opendatasets import MNIST

mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
mnist_df.info()

# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
mnist_df.head()

Processar dados com o Apache Spark

Agora, criaremos um dataframe do Apache Spark. Esse dataframe será usado com o HorovodEstimator para treinamento.

# Create Spark DataFrame for training
df = spark.createDataFrame(mnist_df)

# repartition DataFrame for training
train_df = df.repartition(num_proc)

# Train/test split
train_df, test_df = train_df.randomSplit([0.9, 0.1])

# show the dataset
train_df.show()
train_df.count()

Definir modelo de DNN

Quando terminarmos de processar nosso conjunto de dados, agora podemos definir nosso modelo PyTorch. O mesmo código também pode ser usado para treinar um modelo PyTorch de nó único.

# Define the PyTorch model without any Horovod-specific parameters
class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = x.float()
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)


model = Net()
optimizer = optim.SGD(model.parameters(),
                      lr=lr_single_node * num_proc,
                      momentum=0.5)  # notice the lr is scaled up
loss = nn.NLLLoss()

Treinar um modelo

Agora, podemos treinar um avaliador do Horovod Spark sobre nosso dataframe do Apache Spark.

# Train a Horovod Spark Estimator on the DataFrame
backend = SparkBackend(num_proc=num_proc,
                       stdout=sys.stdout,
                       stderr=sys.stderr,
                       prefix_output_with_timestamp=True)

torch_estimator = hvd.TorchEstimator(
    backend=backend,
    store=store,
    partitions_per_process=1,  # important for GPU training
    model=model,
    optimizer=optimizer,
    loss=lambda input, target: loss(input, target.long()),
    input_shapes=[[-1, 1, 28, 28]],
    feature_cols=['features'],
    label_cols=['label'],
    batch_size=batch_size,
    epochs=epochs,
    validation=0.1,
    verbose=2)

torch_model = torch_estimator.fit(train_df).setOutputCols(['label_prob'])

Avaliar modelo treinado

Uma vez concluído o processo de treinamento, podemos então avaliar o modelo no conjunto de dados de teste.

# Evaluate the model on the held-out test DataFrame
pred_df = torch_model.transform(test_df)

argmax = udf(lambda v: float(np.argmax(v)), returnType=T.DoubleType())
pred_df = pred_df.withColumn('label_pred', argmax(pred_df.label_prob))
evaluator = MulticlassClassificationEvaluator(predictionCol='label_pred',
                                              labelCol='label',
                                              metricName='accuracy')

print('Test accuracy:', evaluator.evaluate(pred_df))

Limpar os recursos

Para garantir que a instância do Spark seja desligada, encerre todas as sessões conectadas (notebooks). O pool é desligado quando o tempo ocioso especificado no Pool do Apache Spark é atingido. Você também pode selecionar encerrar sessão na barra de status na parte superior direita do notebook.

Captura de tela mostrando o botão Interromper sessão na barra de status.

Próximas etapas