Share via


Zelfstudie: Gedistribueerde training met Horovod Runner en TensorFlow (afgeschaft)

Horovod is een gedistribueerd trainingsframework voor bibliotheken zoals TensorFlow en PyTorch. Met Horovod kunnen gebruikers een bestaand trainingsscript omhoog schalen om in slechts een paar regels code op honderden GPU's uit te voeren.

In Azure Synapse Analytics kunnen gebruikers snel aan de slag met Horovod met behulp van de standaard Apache Spark 3-runtime. Voor Spark ML-pijplijntoepassingen die TensorFlow gebruiken, kunnen gebruikers dit gebruiken HorovodRunner. In dit notebook wordt een Apache Spark-dataframe gebruikt om gedistribueerde training uit te voeren van een DNN-model (gedistribueerd neuraal netwerk) op de MNIST-gegevensset. In deze zelfstudie wordt TensorFlow en het HorovodRunner trainingsproces uitgevoerd.

Vereisten

  • Azure Synapse Analytics-werkruimte met een Azure Data Lake Storage Gen2-opslagaccount dat is geconfigureerd als de standaardopslag. U moet de bijdrager voor opslagblobgegevens zijn van het Data Lake Storage Gen2-bestandssysteem waarmee u werkt.
  • Maak een Apache Spark-pool met GPU in uw Azure Synapse Analytics-werkruimte. Zie Een Apache Spark-pool met GPU maken in Azure Synapse voor meer informatie. Voor deze zelfstudie raden we u aan om de grootte van het GPU-grote cluster met 3 knooppunten te gebruiken.

Notitie

De preview voor azure Synapse GPU-pools is nu afgeschaft.

Let op

Melding over afschaffing en uitschakeling voor GPU's in de Azure Synapse Runtime voor Apache Spark 3.1 en 3.2

  • De versnelde preview-versie van GPU is nu afgeschaft in de Apache Spark 3.2-runtime (afgeschaft). Afgeschafte runtimes hebben geen bug- en functiecorrecties. Deze runtime en de bijbehorende gpu versnelde preview op Spark 3.2 zijn vanaf 8 juli 2024 buiten gebruik gesteld en uitgeschakeld.
  • De versnelde preview-versie van GPU is nu afgeschaft in de Runtime van Azure Synapse 3.1 (afgeschaft). Azure Synapse Runtime voor Apache Spark 3.1 heeft het einde van de ondersteuning bereikt vanaf 26 januari 2023, waarbij officiële ondersteuning vanaf 26 januari 2024 is stopgezet en geen verdere adressering van ondersteuningstickets, bugfixes of beveiligingsupdates meer dan deze datum.

De Apache Spark-sessie configureren

Aan het begin van de sessie moeten we enkele Apache Spark-instellingen configureren. In de meeste gevallen hoeven we alleen de numExecutors en spark.rapids.memory.gpu.reservein te stellen. Voor zeer grote modellen moeten gebruikers mogelijk ook de spark.kryoserializer.buffer.max instelling configureren. Voor TensorFlow-modellen moeten gebruikers de spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH optie waar instellen.

In het voorbeeld ziet u hoe de Spark-configuraties kunnen worden doorgegeven met de %%configure opdracht. De gedetailleerde betekenis van elke parameter wordt uitgelegd in de Apache Spark-configuratiedocumentatie. De opgegeven waarden zijn de voorgestelde best practice-waarden voor Azure Synapse GPU-grote pools.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

Voor deze zelfstudie gebruiken we de volgende configuraties:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
   }
}

Notitie

Bij het trainen met Horovod moeten gebruikers de Spark-configuratie zo numExecutors instellen dat ze kleiner of gelijk zijn aan het aantal knooppunten.

Primair opslagaccount instellen

We hebben het Azure Data Lake Storage-account (ADLS) nodig voor het opslaan van tussenliggende en modelgegevens. Als u een alternatief opslagaccount gebruikt, moet u de gekoppelde service zo instellen dat deze automatisch wordt geverifieerd en gelezen vanuit het account.

In dit voorbeeld lezen we gegevens uit het primaire Azure Synapse Analytics-opslagaccount. Als u de resultaten wilt lezen, moet u de volgende eigenschappen wijzigen: remote_url

# Specify training parameters
num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1  # learning rate for single node code

# configure adls store remote url
remote_url = "<<abfss path to storage account>>

Gegevensset voorbereiden

Vervolgens bereiden we de gegevensset voor op training. In deze zelfstudie gebruiken we de MNIST-gegevensset van Azure Open Datasets.

def get_dataset(rank=0, size=1):
    # import dependency libs
    from azureml.opendatasets import MNIST
    from sklearn.preprocessing import OneHotEncoder
    import numpy as np

    # Download MNIST dataset from Azure Open Datasets
    mnist = MNIST.get_tabular_dataset()
    mnist_df = mnist.to_pandas_dataframe()

    # Preprocess dataset
    mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
    mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)

    x = np.array(mnist_df['features'].values.tolist())
    y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)

    enc = OneHotEncoder()
    enc.fit(y)
    y = enc.transform(y).toarray()

    (x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
                                                                    y[60000:])

    # Prepare dataset for distributed training
    x_train = x_train[rank::size]
    y_train = y_train[rank::size]
    x_test = x_test[rank::size]
    y_test = y_test[rank::size]

    # Reshape and Normalize data for model input
    x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
    x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255.0
    x_test /= 255.0

    return (x_train, y_train), (x_test, y_test)

DNN-model definiëren

Zodra de gegevensset is verwerkt, kunnen we ons TensorFlow-model definiëren. Dezelfde code kan ook worden gebruikt om een TensorFlow-model met één knooppunt te trainen.

# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
    from tensorflow.keras import models
    from tensorflow.keras import layers

    model = models.Sequential()
    model.add(
        layers.Conv2D(32,
                      kernel_size=(3, 3),
                      activation='relu',
                      input_shape=(28, 28, 1)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(layers.Dropout(0.25))
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(10, activation='softmax'))
    return model

Een trainingsfunctie definiëren voor één knooppunt

Eerst trainen we ons TensorFlow-model op het stuurprogrammaknooppunt van de Apache Spark-pool. Zodra het trainingsproces is voltooid, evalueren we het model en drukken we de scores voor verlies en nauwkeurigheid af.


def train(learning_rate=0.1):
    import tensorflow as tf
    from tensorflow import keras

    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

    # Prepare dataset
    (x_train, y_train), (x_test, y_test) = get_dataset()

    # Initialize model
    model = get_model()

    # Specify the optimizer (Adadelta in this example)
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))
    return model

# Run the training process on the driver
model = train(learning_rate=lr_single_node)

# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)

Migreren naar HorovodRunner voor gedistribueerde training

Vervolgens bekijken we hoe dezelfde code opnieuw kan worden uitgevoerd voor HorovodRunner gedistribueerde training.

Trainingsfunctie definiëren

Als u een model wilt trainen, definiëren we eerst een trainingsfunctie voor HorovodRunner.

# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
    # Import base libs
    import tempfile
    import os
    import shutil
    import atexit

    # Import tensorflow modules to each worker
    import tensorflow as tf
    from tensorflow import keras
    import horovod.tensorflow.keras as hvd

    # Initialize Horovod
    hvd.init()

    # Pin GPU to be used to process local rank (one GPU per process)
    # These steps are skipped on a CPU cluster
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
                                                   'GPU')

    # Call the get_dataset function you created, this time with the Horovod rank and size
    (x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())

    # Initialize model with random weights
    model = get_model()

    # Adjust learning rate based on number of GPUs
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
                                          hvd.size())

    # Use the Horovod Distributed Optimizer
    optimizer = hvd.DistributedOptimizer(optimizer)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    # Create a callback to broadcast the initial variable states from rank 0 to all other processes.
    # This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
    callbacks = [
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    ]

    # Model checkpoint location.
    ckpt_dir = tempfile.mkdtemp()
    ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
    atexit.register(lambda: shutil.rmtree(ckpt_dir))

    # Save checkpoints only on worker 0 to prevent conflicts between workers
    if hvd.rank() == 0:
        callbacks.append(
            keras.callbacks.ModelCheckpoint(ckpt_file,
                                            monitor='val_loss',
                                            mode='min',
                                            save_best_only=True))

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              callbacks=callbacks,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))

    # Return model bytes only on worker 0
    if hvd.rank() == 0:
        with open(ckpt_file, 'rb') as f:
            return f.read()

Training uitvoeren

Zodra het model is gedefinieerd, kunnen we het trainingsproces uitvoeren.

# Run training
import os
import sys
import horovod.spark


best_model_bytes = \
    horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
                    env=os.environ.copy(),
                    stdout=sys.stdout, stderr=sys.stderr, verbose=2,
                    prefix_output_with_timestamp=True)[0]

Controlepunten opslaan in ADLS-opslag

De code laat zien hoe u de controlepunten opslaat in het Azure Data Lake Storage-account (ADLS).

import tempfile
import fsspec
import os

local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file

with open(local_ckpt_file, 'wb') as f:
    f.write(best_model_bytes)

## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)

print(adls_ckpt_file)

Getrainde Horovod-model evalueren

Zodra de modeltraining is voltooid, kunnen we kijken naar het verlies en de nauwkeurigheid van het uiteindelijke model.

import tensorflow as tf

hvd_model = tf.keras.models.load_model(local_ckpt_file)

_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)

Resources opschonen

Om ervoor te zorgen dat de Spark-instantie wordt afgesloten, beëindigt u alle verbonden sessies (notebooks). De pool wordt afgesloten wanneer de niet-actieve tijd is bereikt die is opgegeven in de Apache Spark-pool. U kunt ook stoppen van de sessie selecteren in de statusbalk rechtsboven in het notitieblok.

Schermopname van de knop Sessie stoppen op de statusbalk.

Volgende stappen