Поделиться через


Руководство: Распределенное обучение с Horovod Runner и TensorFlow (не рекомендуется)

Horovod — это платформа распределенного обучения для таких библиотек, как TensorFlow и PyTorch. Horovod позволяет масштабировать существующий скрипт обучения таким образом, чтобы его можно выполнять сразу в нескольких сотнях GPU, написав всего несколько строчек кода.

В Azure Synapse Analytics пользователи могут быстро приступить к работе с Horovod с помощью среды выполнения Apache Spark 3 по умолчанию. Для приложений конвейера Машинного обучения Spark с помощью TensorFlow пользователи могут использовать HorovodRunner. В этой записной книжке используется фрейм данных Apache Spark для выполнения распределенного обучения модели нейронной сети (DNN) на наборе данных MNIST. В этом руководстве используются TensorFlow и HorovodRunner, чтобы запускать процесс обучения.

Предварительные требования

  • Рабочая область Azure Synapse Analytics с учетной записью хранения Azure Data Lake Storage 2-го поколения, настроенной в качестве хранилища по умолчанию. Вам необходимо быть участником данных хранилища блобов в файловой системе Data Lake Storage второго поколения, с которой вы работаете.
  • Создайте пул Apache Spark с поддержкой GPU в рабочей области Azure Synapse Analytics. Дополнительные сведения см. в разделе Создание пула Apache Spark с поддержкой GPU в Azure Synapse. В этом руководстве мы рекомендуем использовать кластер размера GPU-Large с тремя узлами.

Примечание.

Предварительная версия пулов Azure Synapse с поддержкой GPU теперь снята с поддержки.

Настройка сеанса Apache Spark

В начале сеанса необходимо настроить несколько параметров Apache Spark. В большинстве случаев нам нужно задать только numExecutors и spark.rapids.memory.gpu.reserve. Для очень больших моделей пользователям возможно также придется настроить параметр spark.kryoserializer.buffer.max. Для моделей TensorFlow пользователям необходимо задать spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH значение true.

В примере показано, как можно передать конфигурации Spark с помощью %%configure команды. Подробное значение каждого параметра см. в документации по конфигурации Apache Spark. Указанные значения являются рекомендуемыми для крупных пулов Azure Synapse GPU.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

В этом руководстве мы используем следующие конфигурации:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
   }
}

Примечание.

При обучении с использованием Horovod пользователи должны настроить конфигурацию Spark таким образом, чтобы значение numExecutors было меньше или равно количеству узлов.

Настройка основной учетной записи хранения

Нам нужна учетная запись Azure Data Lake Storage (ADLS) для хранения промежуточных и моделей данных. Если вы используете альтернативную учетную запись хранения, обязательно настройте в связанной службе автоматическую проверку подлинности и чтение данных из учетной записи.

В этом примере мы считываем данные из основной учетной записи хранения Azure Synapse Analytics. Чтобы прочитать результаты, необходимо изменить следующие свойства: remote_url

# Specify training parameters
num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1  # learning rate for single node code

# configure adls store remote url
remote_url = "<<abfss path to storage account>>

Подготовка набора данных

Затем мы подготовим набор данных для обучения. В этом руководстве мы используем набор данных MNIST из Открытых наборов данных Azure.

def get_dataset(rank=0, size=1):
    # import dependency libs
    from azureml.opendatasets import MNIST
    from sklearn.preprocessing import OneHotEncoder
    import numpy as np

    # Download MNIST dataset from Azure Open Datasets
    mnist = MNIST.get_tabular_dataset()
    mnist_df = mnist.to_pandas_dataframe()

    # Preprocess dataset
    mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
    mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)

    x = np.array(mnist_df['features'].values.tolist())
    y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)

    enc = OneHotEncoder()
    enc.fit(y)
    y = enc.transform(y).toarray()

    (x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
                                                                    y[60000:])

    # Prepare dataset for distributed training
    x_train = x_train[rank::size]
    y_train = y_train[rank::size]
    x_test = x_test[rank::size]
    y_test = y_test[rank::size]

    # Reshape and Normalize data for model input
    x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
    x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255.0
    x_test /= 255.0

    return (x_train, y_train), (x_test, y_test)

Определение модели DNN

После обработки набора данных мы можем определить модель TensorFlow. Тот же код также можно использовать для обучения модели TensorFlow с одним узлом.

# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
    from tensorflow.keras import models
    from tensorflow.keras import layers

    model = models.Sequential()
    model.add(
        layers.Conv2D(32,
                      kernel_size=(3, 3),
                      activation='relu',
                      input_shape=(28, 28, 1)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(layers.Dropout(0.25))
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(10, activation='softmax'))
    return model

Определение функции обучения для одного узла

Сначала мы обучим модель TensorFlow на узле драйвера пула Apache Spark. После завершения процесса обучения мы оцениваем модель и выводим значения потерь и точности.


def train(learning_rate=0.1):
    import tensorflow as tf
    from tensorflow import keras

    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

    # Prepare dataset
    (x_train, y_train), (x_test, y_test) = get_dataset()

    # Initialize model
    model = get_model()

    # Specify the optimizer (Adadelta in this example)
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))
    return model

# Run the training process on the driver
model = train(learning_rate=lr_single_node)

# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)

Переход на HorovodRunner для распределенного обучения

Далее мы рассмотрим, как выполнять один и тот же код, используя HorovodRunner для распределенного обучения.

Определение функции обучения

Чтобы обучить модель, сначала мы определим функцию обучения для HorovodRunner.

# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
    # Import base libs
    import tempfile
    import os
    import shutil
    import atexit

    # Import tensorflow modules to each worker
    import tensorflow as tf
    from tensorflow import keras
    import horovod.tensorflow.keras as hvd

    # Initialize Horovod
    hvd.init()

    # Pin GPU to be used to process local rank (one GPU per process)
    # These steps are skipped on a CPU cluster
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
                                                   'GPU')

    # Call the get_dataset function you created, this time with the Horovod rank and size
    (x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())

    # Initialize model with random weights
    model = get_model()

    # Adjust learning rate based on number of GPUs
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
                                          hvd.size())

    # Use the Horovod Distributed Optimizer
    optimizer = hvd.DistributedOptimizer(optimizer)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    # Create a callback to broadcast the initial variable states from rank 0 to all other processes.
    # This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
    callbacks = [
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    ]

    # Model checkpoint location.
    ckpt_dir = tempfile.mkdtemp()
    ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
    atexit.register(lambda: shutil.rmtree(ckpt_dir))

    # Save checkpoints only on worker 0 to prevent conflicts between workers
    if hvd.rank() == 0:
        callbacks.append(
            keras.callbacks.ModelCheckpoint(ckpt_file,
                                            monitor='val_loss',
                                            mode='min',
                                            save_best_only=True))

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              callbacks=callbacks,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))

    # Return model bytes only on worker 0
    if hvd.rank() == 0:
        with open(ckpt_file, 'rb') as f:
            return f.read()

Запустить обучение

После определения модели мы можем запустить процесс обучения.

# Run training
import os
import sys
import horovod.spark


best_model_bytes = \
    horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
                    env=os.environ.copy(),
                    stdout=sys.stdout, stderr=sys.stderr, verbose=2,
                    prefix_output_with_timestamp=True)[0]

Сохранение контрольных точек в хранилище ADLS

В коде показано, как сохранить контрольные точки в учетной записи Azure Data Lake Storage (ADLS).

import tempfile
import fsspec
import os

local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file

with open(local_ckpt_file, 'wb') as f:
    f.write(best_model_bytes)

## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)

print(adls_ckpt_file)

Процесс оценки модели, обученной с использованием Horovod

После завершения обучения модели мы можем взглянуть на потерю и точность конечной модели.

import tensorflow as tf

hvd_model = tf.keras.models.load_model(local_ckpt_file)

_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)

Очистка ресурсов

Чтобы гарантировать завершение работы экземпляра Spark, завершите все подключенные сеансы (ноутбуки). Пул Apache Spark завершит работу автоматически, когда истечет указанное для него время простоя. Можно также выполнить команду остановки сеанса из строки состояния в верхней правой части записной книжки.

Снимок экрана: кнопка остановки сеанса в строке состояния.

Следующие шаги