Udostępnij za pośrednictwem


Samouczek: trenowanie rozproszone za pomocą narzędzia Horovod Runner i TensorFlow (przestarzałe)

Horovod to rozproszona struktura szkoleniowa dla bibliotek, takich jak TensorFlow i PyTorch. Dzięki platformie Horovod użytkownicy mogą skalować istniejący skrypt szkoleniowy w górę, aby działał na setkach procesorów GPU w kilku wierszach kodu.

W usłudze Azure Synapse Analytics użytkownicy mogą szybko rozpocząć pracę z platformą Horovod przy użyciu domyślnego środowiska uruchomieniowego platformy Apache Spark 3. W przypadku aplikacji potoków Spark ML korzystających z biblioteki TensorFlow użytkownicy mogą używać HorovodRunner. Ten notatnik używa ramki danych platformy Apache Spark do przeprowadzania rozproszonego szkolenia modelu sieci neuronowej (DNN) na zestawie danych MNIST. W tym samouczku użyto biblioteki TensorFlow i elementu HorovodRunner , aby uruchomić proces trenowania.

Wymagania wstępne

Uwaga

Wersja zapoznawcza pul z obsługą GPU dla Azure Synapse przestała być wspierana.

Konfigurowanie sesji platformy Apache Spark

Na początku sesji musimy skonfigurować kilka ustawień platformy Apache Spark. W większości przypadków musimy ustawić tylko wartości numExecutors i spark.rapids.memory.gpu.reserve. W przypadku bardzo dużych modeli użytkownicy mogą również wymagać skonfigurowania ustawienia spark.kryoserializer.buffer.max. W przypadku modeli TensorFlow użytkownicy muszą ustawić spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH wartość true.

W tym przykładzie możesz zobaczyć, jak konfiguracje platformy Spark można przekazać za %%configure pomocą polecenia . Szczegółowe znaczenie każdego parametru wyjaśniono w dokumentacji konfiguracji platformy Apache Spark. Podane wartości to sugerowane wartości najlepszych rozwiązań dla dużych pul procesora GPU usługi Azure Synapse.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

Na potrzeby tego samouczka użyjemy następujących konfiguracji:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
   }
}

Uwaga

Podczas trenowania z Horovod użytkownicy powinni ustawić konfigurację Spark na numExecutors, aby była mniejsza lub równa liczbie węzłów.

Konfigurowanie podstawowego konta magazynowego

Potrzebujemy konta usługi Azure Data Lake Storage (ADLS) do przechowywania danych pośrednich i modelowych. Jeśli używasz alternatywnego konta przechowywania, pamiętaj, aby skonfigurować połączoną usługę, aby automatycznie uwierzytelniała i odczytywała dane z konta.

W tym przykładzie odczytujemy dane z podstawowego konta magazynowego usługi Azure Synapse Analytics. Aby odczytać wyniki, należy zmodyfikować następujące właściwości: remote_url.

# Specify training parameters
num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1  # learning rate for single node code

# configure adls store remote url
remote_url = "<<abfss path to storage account>>

Przygotowywanie zestawu danych

Następnie przygotujemy zestaw danych do trenowania. W tym samouczku użyjemy zestawu danych MNIST z usługi Azure Open Datasets.

def get_dataset(rank=0, size=1):
    # import dependency libs
    from azureml.opendatasets import MNIST
    from sklearn.preprocessing import OneHotEncoder
    import numpy as np

    # Download MNIST dataset from Azure Open Datasets
    mnist = MNIST.get_tabular_dataset()
    mnist_df = mnist.to_pandas_dataframe()

    # Preprocess dataset
    mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
    mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)

    x = np.array(mnist_df['features'].values.tolist())
    y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)

    enc = OneHotEncoder()
    enc.fit(y)
    y = enc.transform(y).toarray()

    (x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
                                                                    y[60000:])

    # Prepare dataset for distributed training
    x_train = x_train[rank::size]
    y_train = y_train[rank::size]
    x_test = x_test[rank::size]
    y_test = y_test[rank::size]

    # Reshape and Normalize data for model input
    x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
    x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255.0
    x_test /= 255.0

    return (x_train, y_train), (x_test, y_test)

Definiowanie modelu sieci neuronowej (DNN)

Po przetworzeniu zestawu danych możemy zdefiniować nasz model TensorFlow. Ten sam kod może również służyć do trenowania modelu TensorFlow z jednym węzłem.

# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
    from tensorflow.keras import models
    from tensorflow.keras import layers

    model = models.Sequential()
    model.add(
        layers.Conv2D(32,
                      kernel_size=(3, 3),
                      activation='relu',
                      input_shape=(28, 28, 1)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(layers.Dropout(0.25))
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(10, activation='softmax'))
    return model

Zdefiniować funkcję trenowania dla pojedynczego węzła

Najpierw trenujemy nasz model TensorFlow w węźle sterownika puli Apache Spark. Po zakończeniu procesu trenowania oceniamy model i wyświetlamy wyniki utraty i dokładności.


def train(learning_rate=0.1):
    import tensorflow as tf
    from tensorflow import keras

    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

    # Prepare dataset
    (x_train, y_train), (x_test, y_test) = get_dataset()

    # Initialize model
    model = get_model()

    # Specify the optimizer (Adadelta in this example)
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))
    return model

# Run the training process on the driver
model = train(learning_rate=lr_single_node)

# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)

Migrowanie do modułu HorovodRunner na potrzeby trenowania rozproszonego

Następnie przyjrzymy się, jak ten sam kod można ponownie uruchomić na potrzeby trenowania rozproszonego przy użyciu HorovodRunner.

Definiowanie funkcji trenowania

Aby wytrenować model, najpierw zdefiniujemy funkcję szkoleniową dla elementu HorovodRunner.

# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
    # Import base libs
    import tempfile
    import os
    import shutil
    import atexit

    # Import tensorflow modules to each worker
    import tensorflow as tf
    from tensorflow import keras
    import horovod.tensorflow.keras as hvd

    # Initialize Horovod
    hvd.init()

    # Pin GPU to be used to process local rank (one GPU per process)
    # These steps are skipped on a CPU cluster
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
                                                   'GPU')

    # Call the get_dataset function you created, this time with the Horovod rank and size
    (x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())

    # Initialize model with random weights
    model = get_model()

    # Adjust learning rate based on number of GPUs
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
                                          hvd.size())

    # Use the Horovod Distributed Optimizer
    optimizer = hvd.DistributedOptimizer(optimizer)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    # Create a callback to broadcast the initial variable states from rank 0 to all other processes.
    # This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
    callbacks = [
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    ]

    # Model checkpoint location.
    ckpt_dir = tempfile.mkdtemp()
    ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
    atexit.register(lambda: shutil.rmtree(ckpt_dir))

    # Save checkpoints only on worker 0 to prevent conflicts between workers
    if hvd.rank() == 0:
        callbacks.append(
            keras.callbacks.ModelCheckpoint(ckpt_file,
                                            monitor='val_loss',
                                            mode='min',
                                            save_best_only=True))

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              callbacks=callbacks,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))

    # Return model bytes only on worker 0
    if hvd.rank() == 0:
        with open(ckpt_file, 'rb') as f:
            return f.read()

Uruchamianie szkolenia

Po zdefiniowaniu modelu możemy uruchomić proces trenowania.

# Run training
import os
import sys
import horovod.spark


best_model_bytes = \
    horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
                    env=os.environ.copy(),
                    stdout=sys.stdout, stderr=sys.stderr, verbose=2,
                    prefix_output_with_timestamp=True)[0]

Zapisz punkty kontrolne do magazynu ADLS

Kod przedstawia sposób zapisywania punktów kontrolnych na koncie usługi Azure Data Lake Storage (ADLS).

import tempfile
import fsspec
import os

local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file

with open(local_ckpt_file, 'wb') as f:
    f.write(best_model_bytes)

## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)

print(adls_ckpt_file)

Ocena modelu wytrenowanego przez Horovod

Po zakończeniu trenowania modelu możemy przyjrzeć się utracie i dokładności końcowego modelu.

import tensorflow as tf

hvd_model = tf.keras.models.load_model(local_ckpt_file)

_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)

Czyszczenie zasobów

Aby upewnić się, że instancja platformy Spark jest wyłączona, zakończ wszystkie połączone sesje i notatniki. Pula zostanie zamknięta po osiągnięciu czasu bezczynności określonego w puli platformy Apache Spark. Możesz również wybrać pozycję Zatrzymaj sesję na pasku stanu w prawym górnym rogu notesu.

Zrzut ekranu przedstawiający przycisk Zatrzymaj sesję na pasku stanu.

Następne kroki