Condividi tramite


Esercitazione: Training distribuito con Horovod Runner e TensorFlow (anteprima)

Horovod è un framework di training distribuito per librerie come TensorFlow e PyTorch. Con Horovod, gli utenti possono aumentare le prestazioni di uno script di training esistente per l'esecuzione su centinaia di GPU in poche righe di codice.

In Azure Synapse Analytics gli utenti possono iniziare rapidamente a usare Horovod usando il runtime predefinito di Apache Spark 3. Per le applicazioni pipeline spark ML che usano TensorFlow, gli utenti possono usare HorovodRunner. Questo notebook usa un dataframe Apache Spark per eseguire il training distribuito di un modello di rete neurale distribuito (Distributed Neural Network) nel set di dati MNIST. Questa esercitazione usa TensorFlow e per HorovodRunner eseguire il processo di training.

Prerequisiti

  • Area di lavoro di Azure Synapse Analytics con un account di archiviazione di Azure Data Lake Storage Gen2 configurato come risorsa archiviazione predefinita. È necessario essere il Collaboratore ai dati dei BLOB della risorsa di archiviazione del file system di Data Lake Storage Gen2 con cui si lavora.
  • Creare un pool di Apache Spark abilitato per GPU nell'area di lavoro di Azure Synapse Analytics. Per informazioni dettagliate, vedere Creare un pool di Apache Spark abilitato per GPU in Azure Synapse. Per questa esercitazione è consigliabile usare le dimensioni del cluster GPU-Large con 3 nodi.

Avviso

  • L'anteprima accelerata della GPU è limitata al runtime apache Spark 3.2 (fine del supporto annunciato). La fine del supporto annunciato per Il runtime di Azure Synapse per Apache Spark 3.2 è stata annunciata l'8 luglio 2023. La fine del supporto annuncia i runtime non includerà correzioni di bug e funzionalità. Le correzioni di sicurezza verranno backportate in base alla valutazione dei rischi. Questo runtime e l'anteprima accelerata della GPU corrispondente in Spark 3.2 verranno ritirati e disabilitati a partire dall'8 luglio 2024.
  • L'anteprima accelerata della GPU non è ora supportata nel runtime di Azure Synapse 3.1 (non supportato). Il runtime di Azure Synapse per Apache Spark 3.1 ha raggiunto la fine del supporto a partire dal 26 gennaio 2023, con il supporto ufficiale sospeso a partire dal 26 gennaio 2024 e non sono stati risolti ulteriormente i ticket di supporto, le correzioni di bug o gli aggiornamenti della sicurezza oltre questa data.

Configurare la sessione di Apache Spark

All'inizio della sessione è necessario configurare alcune impostazioni di Apache Spark. Nella maggior parte dei casi, è sufficiente impostare e numExecutorsspark.rapids.memory.gpu.reserve. Per i modelli molto grandi, gli utenti possono anche dover configurare l'impostazione spark.kryoserializer.buffer.max. Per i modelli TensorFlow, gli utenti devono impostare su spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH true.

Nell'esempio è possibile vedere come è possibile passare le configurazioni di Spark con il %%configure comando . Il significato dettagliato di ogni parametro è illustrato nella documentazione di configurazione di Apache Spark. I valori forniti sono i valori consigliati delle procedure consigliate per i pool gpu di Azure Synapse di grandi dimensioni.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

Per questa esercitazione vengono usate le configurazioni seguenti:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
   }
}

Nota

Quando si esegue il training con Horovod, gli utenti devono impostare la configurazione di Spark per numExecutors in modo che sia minore o uguale al numero di nodi.

Configurare l'account di archiviazione primario

È necessario l'account azure Data Lake Archiviazione (ADLS) per l'archiviazione dei dati intermedi e del modello. Se si usa un account di archiviazione alternativo, assicurarsi di configurare il servizio collegato per eseguire automaticamente l'autenticazione e la lettura dall'account.

In questo esempio i dati vengono letti dall'account di archiviazione primario di Azure Synapse Analytics. Per leggere i risultati è necessario modificare le proprietà seguenti: remote_url.

# Specify training parameters
num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1  # learning rate for single node code

# configure adls store remote url
remote_url = "<<abfss path to storage account>>

Preparazione del set di dati

Si prepara quindi il set di dati per il training. In questa esercitazione viene usato il set di dati MNIST da Set di dati aperti di Azure.

def get_dataset(rank=0, size=1):
    # import dependency libs
    from azureml.opendatasets import MNIST
    from sklearn.preprocessing import OneHotEncoder
    import numpy as np

    # Download MNIST dataset from Azure Open Datasets
    mnist = MNIST.get_tabular_dataset()
    mnist_df = mnist.to_pandas_dataframe()

    # Preprocess dataset
    mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
    mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)

    x = np.array(mnist_df['features'].values.tolist())
    y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)

    enc = OneHotEncoder()
    enc.fit(y)
    y = enc.transform(y).toarray()

    (x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
                                                                    y[60000:])

    # Prepare dataset for distributed training
    x_train = x_train[rank::size]
    y_train = y_train[rank::size]
    x_test = x_test[rank::size]
    y_test = y_test[rank::size]

    # Reshape and Normalize data for model input
    x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
    x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255.0
    x_test /= 255.0

    return (x_train, y_train), (x_test, y_test)

Definire il modelli DNN

Una volta elaborato il set di dati, è possibile definire il modello TensorFlow. Lo stesso codice può essere usato anche per eseguire il training di un modello TensorFlow a nodo singolo.

# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
    from tensorflow.keras import models
    from tensorflow.keras import layers

    model = models.Sequential()
    model.add(
        layers.Conv2D(32,
                      kernel_size=(3, 3),
                      activation='relu',
                      input_shape=(28, 28, 1)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(layers.Dropout(0.25))
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(10, activation='softmax'))
    return model

Definire una funzione di training per un singolo nodo

Prima di tutto, viene eseguito il training del modello TensorFlow nel nodo driver del pool di Apache Spark. Una volta completato il processo di training, si valuta il modello e si stampano i punteggi di perdita e accuratezza.


def train(learning_rate=0.1):
    import tensorflow as tf
    from tensorflow import keras

    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

    # Prepare dataset
    (x_train, y_train), (x_test, y_test) = get_dataset()

    # Initialize model
    model = get_model()

    # Specify the optimizer (Adadelta in this example)
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))
    return model

# Run the training process on the driver
model = train(learning_rate=lr_single_node)

# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)

Eseguire la migrazione a HorovodRunner per il training distribuito

Si esaminerà quindi come potrebbe essere eseguito di nuovo lo stesso codice usando HorovodRunner per il training distribuito.

Definire la funzione di training

Per eseguire il training di un modello, viene prima definita una funzione di training per HorovodRunner.

# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
    # Import base libs
    import tempfile
    import os
    import shutil
    import atexit

    # Import tensorflow modules to each worker
    import tensorflow as tf
    from tensorflow import keras
    import horovod.tensorflow.keras as hvd

    # Initialize Horovod
    hvd.init()

    # Pin GPU to be used to process local rank (one GPU per process)
    # These steps are skipped on a CPU cluster
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
                                                   'GPU')

    # Call the get_dataset function you created, this time with the Horovod rank and size
    (x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())

    # Initialize model with random weights
    model = get_model()

    # Adjust learning rate based on number of GPUs
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
                                          hvd.size())

    # Use the Horovod Distributed Optimizer
    optimizer = hvd.DistributedOptimizer(optimizer)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    # Create a callback to broadcast the initial variable states from rank 0 to all other processes.
    # This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
    callbacks = [
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    ]

    # Model checkpoint location.
    ckpt_dir = tempfile.mkdtemp()
    ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
    atexit.register(lambda: shutil.rmtree(ckpt_dir))

    # Save checkpoints only on worker 0 to prevent conflicts between workers
    if hvd.rank() == 0:
        callbacks.append(
            keras.callbacks.ModelCheckpoint(ckpt_file,
                                            monitor='val_loss',
                                            mode='min',
                                            save_best_only=True))

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              callbacks=callbacks,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))

    # Return model bytes only on worker 0
    if hvd.rank() == 0:
        with open(ckpt_file, 'rb') as f:
            return f.read()

Eseguire il training

Dopo aver definito il modello, è possibile eseguire il processo di training.

# Run training
import os
import sys
import horovod.spark


best_model_bytes = \
    horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
                    env=os.environ.copy(),
                    stdout=sys.stdout, stderr=sys.stderr, verbose=2,
                    prefix_output_with_timestamp=True)[0]

Salvare i checkpoint nell'archiviazione Azure Data Lake Storage

Il codice illustra come salvare i checkpoint nell'account azure Data Lake Archiviazione (ADLS).

import tempfile
import fsspec
import os

local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file

with open(local_ckpt_file, 'wb') as f:
    f.write(best_model_bytes)

## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)

print(adls_ckpt_file)

Valutare il modello con training di Horovod

Una volta completato il training del modello, è possibile esaminare la perdita e l'accuratezza del modello finale.

import tensorflow as tf

hvd_model = tf.keras.models.load_model(local_ckpt_file)

_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)

Pulire le risorse

Per assicurarsi che l'istanza di Spark venga arrestata, terminare tutte le sessioni connesse (notebook). Il pool si arresta quando viene raggiunto il tempo di inattività specificato nel pool di Apache Spark. Si può anche selezionare Termina sessione sulla barra di stato nella parte destra superiore del notebook.

Screenshot che mostra il pulsante Arresta sessione sulla barra di stato.

Passaggi successivi