Compartilhar via


Tutorial: Treinamento distribuído com o Horovod Runner e o TensorFlow (preterido)

O Horovod é uma estrutura de treinamento distribuída para bibliotecas como TensorFlow e PyTorch. Com o Horovod, os usuários podem escalar verticalmente um script de treinamento existente para ser executado em centenas de GPUs em apenas algumas linhas de código.

No Azure Synapse Analytics, os usuários podem começar rapidamente a usar o Horovod usando o runtime padrão do Apache Spark 3. Para aplicativos de pipeline do Spark ML usando o TensorFlow, os usuários podem usar HorovodRunner. Este notebook usa um dataframe do Apache Spark para executar o treinamento distribuído de um modelo de DNN (rede neural distribuída) no conjunto de dados do MNIST. Este tutorial usa o TensorFlow e o HorovodRunner para executar o processo de treinamento.

Pré-requisitos

  • Workspace do Azure Synapse Analytics com uma conta de armazenamento do Azure Data Lake Storage Gen2 configurada como o armazenamento padrão. Você precisa ser Colaborador de Dados do Storage Blob do sistema de arquivos Data Lake Storage Gen2 com o qual você trabalha.
  • Crie um pool do Apache Spark habilitado para GPU no workspace do Azure Synapse Analytics. Para obter detalhes, confira Criar um pool do Apache Spark habilitado para GPU no Azure Synapse. Para este tutorial, sugerimos usar o tamanho de cluster GPU-Large com três nós.

Observação

A versão prévia dos pools habilitados para GPU do Azure Synapse foi preterida.

Cuidado

Notificação de depreciação e desativação para GPUs no Azure Synapse Runtime para Apache Spark 3.1 e 3.2

  • A visualização acelerada por GPU agora está obsoleta no tempo de execução do Apache Spark 3.2 (preterido). Os runtimes preteridos não terão correções de bugs e recursos. Esse tempo de execução e a visualização acelerada de GPU correspondente no Spark 3.2 foram descontinuados e desativados em 8 de julho de 2024.
  • A versão prévia acelerada por GPU agora está obsoleta no tempo de execução Azure Synapse 3.1 (preterido). O Runtime do Azure Synapse para Apache Spark 3.1 atingiu o fim do suporte em 26 de janeiro de 2023, tendo o suporte oficial sido descontinuado a partir de 26 de janeiro de 2024, bem como o endereçamento de tíquetes de suporte, correções de bugs ou atualizações de segurança após essa data.

Configurar a sessão do Apache Spark

No início da sessão, precisamos definir algumas configurações do Apache Spark. Na maioria dos casos, só precisamos definir numExecutors e spark.rapids.memory.gpu.reserve. Para modelos muito grandes, os usuários também podem precisar definir a configuração spark.kryoserializer.buffer.max. Para modelos TensorFlow, os usuários precisam definir spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH como verdadeiro.

No exemplo, você pode ver como as configurações do Spark podem ser passadas com o comando %%configure. O significado detalhado de cada parâmetro é explicado na documentação de configuração do Apache Spark. Os valores fornecidos são os valores sugeridos e de prática recomendada para pools grandes de GPU do Azure Synapse.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

Para este tutorial, usamos as seguintes configurações:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
   }
}

Observação

Ao treinar com o Horovod, os usuários devem definir a configuração do Spark para numExecutors ser menor ou igual ao número de nós.

Configurar conta de armazenamento primário

Precisamos da conta do Azure Data Lake Storage (ADLS) para armazenar dados intermediários e de modelo. Se você estiver usando uma conta de armazenamento alternativa, configure o serviço vinculado para autenticar e ler automaticamente a partir da conta.

Neste exemplo, lemos dados da conta de armazenamento primária do Azure Synapse Analytics. Para ler os resultados, você precisa modificar as seguintes propriedades: remote_url.

# Specify training parameters
num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1  # learning rate for single node code

# configure adls store remote url
remote_url = "<<abfss path to storage account>>

Preparar o conjunto de dados

Em seguida, preparamos o conjunto de dados para treinamento. Neste tutorial, usamos o conjunto de dados do MNIST do Conjunto de Dados em Aberto no Azure.

def get_dataset(rank=0, size=1):
    # import dependency libs
    from azureml.opendatasets import MNIST
    from sklearn.preprocessing import OneHotEncoder
    import numpy as np

    # Download MNIST dataset from Azure Open Datasets
    mnist = MNIST.get_tabular_dataset()
    mnist_df = mnist.to_pandas_dataframe()

    # Preprocess dataset
    mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
    mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)

    x = np.array(mnist_df['features'].values.tolist())
    y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)

    enc = OneHotEncoder()
    enc.fit(y)
    y = enc.transform(y).toarray()

    (x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
                                                                    y[60000:])

    # Prepare dataset for distributed training
    x_train = x_train[rank::size]
    y_train = y_train[rank::size]
    x_test = x_test[rank::size]
    y_test = y_test[rank::size]

    # Reshape and Normalize data for model input
    x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
    x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255.0
    x_test /= 255.0

    return (x_train, y_train), (x_test, y_test)

Definir modelo de DNN

Depois que nosso conjunto de dados for processado, poderemos definir nosso modelo TensorFlow. O mesmo código também pode ser usado para treinar um modelo TensorFlow de nó único.

# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
    from tensorflow.keras import models
    from tensorflow.keras import layers

    model = models.Sequential()
    model.add(
        layers.Conv2D(32,
                      kernel_size=(3, 3),
                      activation='relu',
                      input_shape=(28, 28, 1)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(layers.Dropout(0.25))
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(10, activation='softmax'))
    return model

Definir uma função de treinamento para um único nó

Primeiro, treinamos nosso modelo TensorFlow no nó de driver do Pool do Apache Spark. Depois que o processo de treinamento for concluído, avaliaremos o modelo e imprimiremos as pontuações de perda e precisão.


def train(learning_rate=0.1):
    import tensorflow as tf
    from tensorflow import keras

    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

    # Prepare dataset
    (x_train, y_train), (x_test, y_test) = get_dataset()

    # Initialize model
    model = get_model()

    # Specify the optimizer (Adadelta in this example)
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))
    return model

# Run the training process on the driver
model = train(learning_rate=lr_single_node)

# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)

Migrar para HorovodRunner para treinamento distribuído

A seguir, veremos como o mesmo código pode ser executado novamente usando HorovodRunner para treinamento distribuído.

Definir função de treinamento

Para treinar um modelo, primeiro definimos uma função de treinamento para HorovodRunner.

# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
    # Import base libs
    import tempfile
    import os
    import shutil
    import atexit

    # Import tensorflow modules to each worker
    import tensorflow as tf
    from tensorflow import keras
    import horovod.tensorflow.keras as hvd

    # Initialize Horovod
    hvd.init()

    # Pin GPU to be used to process local rank (one GPU per process)
    # These steps are skipped on a CPU cluster
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
                                                   'GPU')

    # Call the get_dataset function you created, this time with the Horovod rank and size
    (x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())

    # Initialize model with random weights
    model = get_model()

    # Adjust learning rate based on number of GPUs
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
                                          hvd.size())

    # Use the Horovod Distributed Optimizer
    optimizer = hvd.DistributedOptimizer(optimizer)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    # Create a callback to broadcast the initial variable states from rank 0 to all other processes.
    # This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
    callbacks = [
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    ]

    # Model checkpoint location.
    ckpt_dir = tempfile.mkdtemp()
    ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
    atexit.register(lambda: shutil.rmtree(ckpt_dir))

    # Save checkpoints only on worker 0 to prevent conflicts between workers
    if hvd.rank() == 0:
        callbacks.append(
            keras.callbacks.ModelCheckpoint(ckpt_file,
                                            monitor='val_loss',
                                            mode='min',
                                            save_best_only=True))

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              callbacks=callbacks,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))

    # Return model bytes only on worker 0
    if hvd.rank() == 0:
        with open(ckpt_file, 'rb') as f:
            return f.read()

Executar treinamento

Depois que o modelo for definido, poderemos executar o processo de treinamento.

# Run training
import os
import sys
import horovod.spark


best_model_bytes = \
    horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
                    env=os.environ.copy(),
                    stdout=sys.stdout, stderr=sys.stderr, verbose=2,
                    prefix_output_with_timestamp=True)[0]

Salvar pontos de verificação no armazenamento do ADLS

O código mostra como salvar os pontos de verificação na conta do Azure Data Lake Storage (ADLS).

import tempfile
import fsspec
import os

local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file

with open(local_ckpt_file, 'wb') as f:
    f.write(best_model_bytes)

## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)

print(adls_ckpt_file)

Avaliar o modelo treinado do Horovod

Depois que o treinamento do modelo for concluído, poderemos dar uma olhada na perda e precisão do modelo final.

import tensorflow as tf

hvd_model = tf.keras.models.load_model(local_ckpt_file)

_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)

Limpar os recursos

Para garantir que a instância do Spark seja desligada, encerre todas as sessões conectadas (notebooks). O pool é desligado quando o tempo ocioso especificado no Pool do Apache Spark é atingido. Você também pode selecionar encerrar sessão na barra de status na parte superior direita do notebook.

Captura de tela mostrando o botão Interromper sessão na barra de status.

Próximas etapas