Freigeben über


Tutorial: verteiltes Training mit Horovod Runner und TensorFlow (veraltet)

Horovod ist ein Framework für verteiltes Training für Bibliotheken wie TensorFlow und PyTorch. Mit Horovod können Benutzer ein bestehendes Trainingsskript mit nur wenigen Zeilen Code so hochskalieren, dass es auf Hunderten von GPUs ausgeführt werden kann.

Innerhalb von Azure Synapse Analytics können Benutzer schnell mit Horovod beginnen, indem sie die standardmäßige Apache Spark 3 Runtime verwenden. Für Spark ML-Pipelineanwendungen, die TensorFlow verwenden, können Sie HorovodRunner verwenden. Dieses Notebook verwendet einen Apache Spark-Dataframe, um ein verteiltes Training eines verteilten neuronalen Netzes (DNN) mit einem MNIST-Dataset durchzuführen. Dieses Tutorial nutzt TensorFlow und HorovodRunner, um den Trainingsprozess auszuführen.

Voraussetzungen

  • Azure Synapse Analytics-Arbeitsbereich mit einem als Standardspeicher konfigurierten Azure Data Lake Storage Gen2-Speicherkonto. Für das hier verwendete Data Lake Storage Gen2-Dateisystem müssen Sie über die Rolle Mitwirkender an Storage-Blobdaten verfügen.
  • Erstellen Sie einen GPU-fähigen Apache Spark-Pool in Ihrem Azure Synapse Analytics-Arbeitsbereich. Ausführliche Informationen finden Sie unter Erstellen eines GPU-fähigen Apache Spark-Pools in Azure Synapse. Für dieses Tutorial empfehlen wir die Verwendung der Clustergröße „GPU: Groß“ mit drei Knoten.

Hinweis

Die Vorschau für GPU-fähige Azure Synapse-Pools ist jetzt veraltet.

Achtung

Benachrichtigung zur Einstellung und Deaktivierung für GPUs unter der Azure Synapse-Runtime für Apache Spark 3.1 und 3.2

  • Die Vorschau mit GPU-Beschleunigung ist jetzt für die Apache Spark 3.2-Runtime (veraltet) veraltet. Für veraltete Runtimes werden keine Fehler- und Featurebehebungen mehr bereitgestellt. Diese Runtime und die entsprechende Vorschau mit GPU-Beschleunigung in Spark 3.2 wurden am 8. Juli 2024 eingestellt und deaktiviert.
  • Die Vorschau mit GPU-Beschleunigung ist jetzt für die Azure Synapse 3.1-Runtime (veraltet) veraltet. Azure Synapse Runtime für Apache Spark 3.1 hat am 26. Januar 2023 das Supportende erreicht. Der offizielle Support wurde am 26. Januar 2024 eingestellt, und Supporttickets, Fehlerbehebungen oder Sicherheitsupdates nach diesem Datum werden nicht mehr bearbeitet.

Konfigurieren der Apache Spark-Sitzung

Zu Beginn der Sitzung müssen wir ein paar Einstellungen für Apache Spark konfigurieren. In den meisten Fällen müssen wir nur numExecutors und spark.rapids.memory.gpu.reserve festlegen. Für sehr große Modelle müssen Benutzer möglicherweise auch die Einstellung spark.kryoserializer.buffer.max konfigurieren. Für TensorFlow-Modelle müssen die Benutzer spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH auf „true“ festlegen.

In diesem Beispiel können Sie sehen, wie die Spark-Konfigurationen mit dem Befehl %%configure übergeben werden können. Die genaue Bedeutung der einzelnen Parameter wird in der Dokumentation zur Apache Spark-Konfiguration erläutert. Die angegebenen Werte sind die vorgeschlagenen, bewährten Methoden für Azure Synapse-Pools vom Typ „GPU: Groß“.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

Für dieses Tutorial verwenden wir die folgenden Konfigurationen:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
   }
}

Hinweis

Wenn Sie mit Horovod trainieren, sollten Benutzer die Spark-Konfiguration für numExecutors so festlegen, dass sie kleiner oder gleich der Anzahl der Knoten ist.

Einrichten des primären Speicherkontos

Für die Speicherung von Zwischen- und Modelldaten benötigen wir das Azure Data Lake Storage-Konto (ADLS). Wenn Sie ein alternatives Speicherkonto verwenden, müssen Sie den verknüpften Dienst so einrichten, dass er sich automatisch authentifiziert und von diesem Konto liest.

In diesem Beispiel lesen wir Daten aus dem primären Azure Synapse Analytics-Speicherkonto. Um das Ergebnis zu lesen, müssen Sie die folgenden Eigenschaften ändern: remote_url.

# Specify training parameters
num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1  # learning rate for single node code

# configure adls store remote url
remote_url = "<<abfss path to storage account>>

Vorbereiten des Datasets

Als nächstes bereiten wir das Dataset für das Training vor. In diesem Tutorial verwenden wir das MNIST-Dataset von Azure Open Datasets.

def get_dataset(rank=0, size=1):
    # import dependency libs
    from azureml.opendatasets import MNIST
    from sklearn.preprocessing import OneHotEncoder
    import numpy as np

    # Download MNIST dataset from Azure Open Datasets
    mnist = MNIST.get_tabular_dataset()
    mnist_df = mnist.to_pandas_dataframe()

    # Preprocess dataset
    mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
    mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)

    x = np.array(mnist_df['features'].values.tolist())
    y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)

    enc = OneHotEncoder()
    enc.fit(y)
    y = enc.transform(y).toarray()

    (x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
                                                                    y[60000:])

    # Prepare dataset for distributed training
    x_train = x_train[rank::size]
    y_train = y_train[rank::size]
    x_test = x_test[rank::size]
    y_test = y_test[rank::size]

    # Reshape and Normalize data for model input
    x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
    x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255.0
    x_test /= 255.0

    return (x_train, y_train), (x_test, y_test)

Definieren von DNN-Modellen

Sobald unser Dataset verarbeitet wurde, können wir unser TensorFlow-Modell definieren. Derselbe Code könnte auch verwendet werden, um ein TensorFlow-Modell mit einem einzelnen Knoten zu trainieren.

# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
    from tensorflow.keras import models
    from tensorflow.keras import layers

    model = models.Sequential()
    model.add(
        layers.Conv2D(32,
                      kernel_size=(3, 3),
                      activation='relu',
                      input_shape=(28, 28, 1)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(layers.Dropout(0.25))
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(10, activation='softmax'))
    return model

Definieren einer Trainingsfunktion für einen einzelnen Knoten

Zunächst trainieren wir unser TensorFlow-Modell auf dem Treiberknoten des Apache Spark-Pools. Sobald das Training abgeschlossen ist, werten wir das Modell aus und geben den Score für Verlust und Genauigkeit aus.


def train(learning_rate=0.1):
    import tensorflow as tf
    from tensorflow import keras

    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

    # Prepare dataset
    (x_train, y_train), (x_test, y_test) = get_dataset()

    # Initialize model
    model = get_model()

    # Specify the optimizer (Adadelta in this example)
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))
    return model

# Run the training process on the driver
model = train(learning_rate=lr_single_node)

# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)

Migrieren zu HorovodRunner für verteiltes Training

Als nächstes sehen wir uns an, wie derselbe Code mithilfe von HorovodRunner für verteiltes Training ausgeführt werden kann.

Definieren der Trainingsfunktion

Um ein Modell zu trainieren, definieren wir zunächst eine Trainingsfunktion für HorovodRunner.

# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
    # Import base libs
    import tempfile
    import os
    import shutil
    import atexit

    # Import tensorflow modules to each worker
    import tensorflow as tf
    from tensorflow import keras
    import horovod.tensorflow.keras as hvd

    # Initialize Horovod
    hvd.init()

    # Pin GPU to be used to process local rank (one GPU per process)
    # These steps are skipped on a CPU cluster
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
                                                   'GPU')

    # Call the get_dataset function you created, this time with the Horovod rank and size
    (x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())

    # Initialize model with random weights
    model = get_model()

    # Adjust learning rate based on number of GPUs
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
                                          hvd.size())

    # Use the Horovod Distributed Optimizer
    optimizer = hvd.DistributedOptimizer(optimizer)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    # Create a callback to broadcast the initial variable states from rank 0 to all other processes.
    # This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
    callbacks = [
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    ]

    # Model checkpoint location.
    ckpt_dir = tempfile.mkdtemp()
    ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
    atexit.register(lambda: shutil.rmtree(ckpt_dir))

    # Save checkpoints only on worker 0 to prevent conflicts between workers
    if hvd.rank() == 0:
        callbacks.append(
            keras.callbacks.ModelCheckpoint(ckpt_file,
                                            monitor='val_loss',
                                            mode='min',
                                            save_best_only=True))

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              callbacks=callbacks,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))

    # Return model bytes only on worker 0
    if hvd.rank() == 0:
        with open(ckpt_file, 'rb') as f:
            return f.read()

Ausführen des Trainings

Sobald das Modell definiert ist, können wir das Training ausführen.

# Run training
import os
import sys
import horovod.spark


best_model_bytes = \
    horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
                    env=os.environ.copy(),
                    stdout=sys.stdout, stderr=sys.stderr, verbose=2,
                    prefix_output_with_timestamp=True)[0]

Speichern von Prüfpunkten in ADLS-Speicher

Der Code zeigt, wie Sie die Prüfpunkte im ADLS-Konto (Azure Data Lake Storage) speichern.

import tempfile
import fsspec
import os

local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file

with open(local_ckpt_file, 'wb') as f:
    f.write(best_model_bytes)

## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)

print(adls_ckpt_file)

Auswerten des mit Horovod trainierten Modells

Nachdem das Training des Modells abgeschlossen ist, können wir einen Blick auf den Verlust und die Genauigkeit des endgültigen Modells werfen.

import tensorflow as tf

hvd_model = tf.keras.models.load_model(local_ckpt_file)

_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)

Bereinigen von Ressourcen

Beenden Sie alle verbundenen Sitzungen (Notebooks), um sicherzustellen, dass die Spark-Instanz heruntergefahren wird. Der Pool wird heruntergefahren, wenn die im Apache Spark-Pool angegebene Leerlaufzeit erreicht wird. Sie können auch auf der Statusleiste am oberen Rand des Notebooks die Option Sitzung beenden auswählen.

Screenshot der Schaltfläche „Sitzung beenden“ auf der Statusleiste

Nächste Schritte