자습서: Horovod Runner 및 TensorFlow를 사용한 분산 학습(사용되지 않음)

Horovod는 TensorFlow 및 PyTorch와 같은 라이브러리를 위한 분산 학습 프레임워크입니다. Horovod를 사용하면 사용자가 몇 줄의 코드만으로 수백 개의 GPU에서 실행되도록 기존 학습 스크립트를 스케일 업할 수 있습니다.

Azure Synapse Analytics 내에서 사용자는 기본 Apache Spark 3 런타임을 사용하여 Horovod를 빠르게 시작할 수 있습니다. TensorFlow를 사용하는 Spark ML 파이프라인 애플리케이션의 경우 사용자는 HorovodRunner를 사용할 수 있습니다. 이 Notebook은 Apache Spark 데이터 프레임을 사용하여 MNIST 데이터 세트에서 DNN(분산 신경망) 모델의 분산 학습을 수행합니다. 이 자습서에서는 TensorFlow와 HorovodRunner를 사용하여 학습 프로세스를 실행합니다.

필수 조건

  • Azure Synapse Analytics 작업 영역(기본 스토리지로 구성된 Azure Data Lake Storage Gen2 스토리지 계정이 있음). 사용하는 Data Lake Storage Gen2 파일 시스템의 Storage Blob 데이터 기여자여야 합니다.
  • Azure Synapse Analytics 작업 영역에서 GPU 지원 Apache Spark 풀을 만듭니다. 자세한 내용은 Azure Synapse에서 GPU 지원 Apache Spark 풀 만들기를 참조하세요. 이 자습서에서는 노드가 3개인 GPU-Large 클러스터 크기를 사용하는 것이 좋습니다.

참고 항목

Azure Synapse GPU 지원 풀 미리 보기는 이제 더 이상 사용되지 않습니다.

Apache Spark 세션 구성

세션 시작 시 몇 가지 Apache Spark 설정을 구성해야 합니다. 대부분의 경우 numExecutorsspark.rapids.memory.gpu.reserve만 설정하면 됩니다. 매우 큰 모델의 경우 사용자가 spark.kryoserializer.buffer.max 설정을 구성해야 할 수도 있습니다. TensorFlow 모델의 경우 사용자는 spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH를 true로 설정해야 합니다.

이 예에서는 %%configure 명령을 사용하여 Spark 구성을 전달하는 방법을 확인할 수 있습니다. 각 매개 변수의 자세한 의미는 Apache Spark 구성 설명서에 설명되어 있습니다. 제공된 값은 Azure Synapse GPU 대규모 풀에 대해 권장되는 모범 사례 값입니다.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

이 자습서에서는 다음 구성을 사용합니다.


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
   }
}

참고 항목

Horovod를 사용하여 학습할 때 사용자는 numExecutors에 대한 Spark 구성을 노드 수보다 작거나 같도록 설정해야 합니다.

기본 스토리지 계정 설정

중간 및 모델 데이터를 저장하려면 ADLS(Azure Data Lake Storage) 계정이 필요합니다. 대체 스토리지 계정을 사용하는 경우 계정에서 자동으로 인증하고 읽도록 연결된 서비스를 설정해야 합니다.

이 예에서는 기본 Azure Synapse Analytics 스토리지 계정에서 데이터를 읽습니다. 결과를 읽으려면 다음 속성을 수정해야 합니다. remote_url.

# Specify training parameters
num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1  # learning rate for single node code

# configure adls store remote url
remote_url = "<<abfss path to storage account>>

데이터 세트 준비

다음으로 학습을 위한 데이터 세트를 준비합니다. 이 자습서에서는 Azure Open Datasets의 MNIST 데이터 세트를 사용합니다.

def get_dataset(rank=0, size=1):
    # import dependency libs
    from azureml.opendatasets import MNIST
    from sklearn.preprocessing import OneHotEncoder
    import numpy as np

    # Download MNIST dataset from Azure Open Datasets
    mnist = MNIST.get_tabular_dataset()
    mnist_df = mnist.to_pandas_dataframe()

    # Preprocess dataset
    mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
    mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)

    x = np.array(mnist_df['features'].values.tolist())
    y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)

    enc = OneHotEncoder()
    enc.fit(y)
    y = enc.transform(y).toarray()

    (x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
                                                                    y[60000:])

    # Prepare dataset for distributed training
    x_train = x_train[rank::size]
    y_train = y_train[rank::size]
    x_test = x_test[rank::size]
    y_test = y_test[rank::size]

    # Reshape and Normalize data for model input
    x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
    x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255.0
    x_test /= 255.0

    return (x_train, y_train), (x_test, y_test)

DNN 모델 정의

데이터 세트가 처리되면 TensorFlow 모델을 정의할 수 있습니다. 동일한 코드를 사용하여 단일 노드 TensorFlow 모델을 학습시킬 수도 있습니다.

# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
    from tensorflow.keras import models
    from tensorflow.keras import layers

    model = models.Sequential()
    model.add(
        layers.Conv2D(32,
                      kernel_size=(3, 3),
                      activation='relu',
                      input_shape=(28, 28, 1)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(layers.Dropout(0.25))
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(10, activation='softmax'))
    return model

단일 노드에 대한 학습 함수 정의

먼저 Apache Spark 풀의 드라이버 노드에서 TensorFlow 모델을 학습합니다. 학습 과정이 완료되면 모델을 평가하고 손실 및 정확도 점수를 인쇄합니다.


def train(learning_rate=0.1):
    import tensorflow as tf
    from tensorflow import keras

    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

    # Prepare dataset
    (x_train, y_train), (x_test, y_test) = get_dataset()

    # Initialize model
    model = get_model()

    # Specify the optimizer (Adadelta in this example)
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))
    return model

# Run the training process on the driver
model = train(learning_rate=lr_single_node)

# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)

분산 학습을 위해 HorovodRunner로 마이그레이션

다음으로, 분산 학습에 HorovodRunner를 사용하여 동일한 코드를 다시 실행하는 방법을 살펴보겠습니다.

학습 함수 정의

모델을 학습하려면 먼저 HorovodRunner에 대한 학습 함수를 정의합니다.

# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
    # Import base libs
    import tempfile
    import os
    import shutil
    import atexit

    # Import tensorflow modules to each worker
    import tensorflow as tf
    from tensorflow import keras
    import horovod.tensorflow.keras as hvd

    # Initialize Horovod
    hvd.init()

    # Pin GPU to be used to process local rank (one GPU per process)
    # These steps are skipped on a CPU cluster
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
                                                   'GPU')

    # Call the get_dataset function you created, this time with the Horovod rank and size
    (x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())

    # Initialize model with random weights
    model = get_model()

    # Adjust learning rate based on number of GPUs
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
                                          hvd.size())

    # Use the Horovod Distributed Optimizer
    optimizer = hvd.DistributedOptimizer(optimizer)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    # Create a callback to broadcast the initial variable states from rank 0 to all other processes.
    # This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
    callbacks = [
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    ]

    # Model checkpoint location.
    ckpt_dir = tempfile.mkdtemp()
    ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
    atexit.register(lambda: shutil.rmtree(ckpt_dir))

    # Save checkpoints only on worker 0 to prevent conflicts between workers
    if hvd.rank() == 0:
        callbacks.append(
            keras.callbacks.ModelCheckpoint(ckpt_file,
                                            monitor='val_loss',
                                            mode='min',
                                            save_best_only=True))

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              callbacks=callbacks,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))

    # Return model bytes only on worker 0
    if hvd.rank() == 0:
        with open(ckpt_file, 'rb') as f:
            return f.read()

학습 실행

모델이 정의되면 학습 프로세스를 실행할 수 있습니다.

# Run training
import os
import sys
import horovod.spark


best_model_bytes = \
    horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
                    env=os.environ.copy(),
                    stdout=sys.stdout, stderr=sys.stderr, verbose=2,
                    prefix_output_with_timestamp=True)[0]

ADLS 스토리지에 검사점 저장

이 코드는 검사점을 ADLS(Azure Data Lake Storage) 계정에 저장하는 방법을 보여 줍니다.

import tempfile
import fsspec
import os

local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file

with open(local_ckpt_file, 'wb') as f:
    f.write(best_model_bytes)

## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)

print(adls_ckpt_file)

Horovod 학습 모델 평가

모델 학습이 완료되면 최종 모델의 손실과 정확도를 확인할 수 있습니다.

import tensorflow as tf

hvd_model = tf.keras.models.load_model(local_ckpt_file)

_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)

리소스 정리

Spark 인스턴스가 종료되도록 하려면 연결된 세션(Notebook)을 종료합니다. Apache Spark 풀에 지정된 유휴 시간에 도달하면 풀이 종료됩니다. Notebook 오른쪽 상단에 있는 상태 표시줄에서 세션 중지를 선택할 수도 있습니다.

상태 표시줄의 세션 중지 단추를 보여주는 스크린샷입니다.

다음 단계