Horovod는 TensorFlow 및 PyTorch와 같은 라이브러리를 위한 분산 학습 프레임워크입니다. Horovod를 사용하면 사용자가 몇 줄의 코드만으로 수백 개의 GPU에서 실행되도록 기존 학습 스크립트를 스케일 업할 수 있습니다.
Azure Synapse Analytics 내에서 사용자는 기본 Apache Spark 3 런타임을 사용하여 Horovod를 빠르게 시작할 수 있습니다. TensorFlow를 사용하는 Spark ML 파이프라인 애플리케이션의 경우 사용자는 HorovodRunner를 사용할 수 있습니다. 이 Notebook은 Apache Spark 데이터 프레임을 사용하여 MNIST 데이터 세트에서 DNN(분산 신경망) 모델의 분산 학습을 수행합니다. 이 자습서에서는 TensorFlow와 HorovodRunner를 사용하여 학습 프로세스를 실행합니다.
필수 조건
- Azure Synapse Analytics 작업 영역(기본 스토리지로 구성된 Azure Data Lake Storage Gen2 스토리지 계정이 있음). 사용하는 Data Lake Storage Gen2 파일 시스템의 Storage Blob 데이터 기여자여야 합니다.
- Azure Synapse Analytics 작업 영역에서 GPU 지원 Apache Spark 풀을 만듭니다. 자세한 내용은 Azure Synapse에서 GPU 지원 Apache Spark 풀 만들기를 참조하세요. 이 자습서에서는 노드가 3개인 GPU-Large 클러스터 크기를 사용하는 것이 좋습니다.
참고 항목
Azure Synapse GPU 지원 풀 미리 보기는 이제 더 이상 사용되지 않습니다.
Apache Spark 세션 구성
세션 시작 시 몇 가지 Apache Spark 설정을 구성해야 합니다. 대부분의 경우 numExecutors 및 spark.rapids.memory.gpu.reserve만 설정하면 됩니다. 매우 큰 모델의 경우 사용자가 spark.kryoserializer.buffer.max 설정을 구성해야 할 수도 있습니다. TensorFlow 모델의 경우 사용자는 spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH를 true로 설정해야 합니다.
이 예에서는 %%configure 명령을 사용하여 Spark 구성을 전달하는 방법을 확인할 수 있습니다. 각 매개 변수의 자세한 의미는 Apache Spark 구성 설명서에 설명되어 있습니다. 제공된 값은 Azure Synapse GPU 대규모 풀에 대해 권장되는 모범 사례 값입니다.
%%configure -f
{
"driverMemory": "30g",
"driverCores": 4,
"executorMemory": "60g",
"executorCores": 12,
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
"spark.kryoserializer.buffer.max": "2000m"
}
}
이 자습서에서는 다음 구성을 사용합니다.
%%configure -f
{
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
}
}
참고 항목
Horovod를 사용하여 학습할 때 사용자는 numExecutors에 대한 Spark 구성을 노드 수보다 작거나 같도록 설정해야 합니다.
기본 스토리지 계정 설정
중간 및 모델 데이터를 저장하려면 ADLS(Azure Data Lake Storage) 계정이 필요합니다. 대체 스토리지 계정을 사용하는 경우 계정에서 자동으로 인증하고 읽도록 연결된 서비스를 설정해야 합니다.
이 예에서는 기본 Azure Synapse Analytics 스토리지 계정에서 데이터를 읽습니다. 결과를 읽으려면 다음 속성을 수정해야 합니다. remote_url.
# Specify training parameters
num_proc = 3 # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1 # learning rate for single node code
# configure adls store remote url
remote_url = "<<abfss path to storage account>>
데이터 세트 준비
다음으로 학습을 위한 데이터 세트를 준비합니다. 이 자습서에서는 Azure Open Datasets의 MNIST 데이터 세트를 사용합니다.
def get_dataset(rank=0, size=1):
# import dependency libs
from azureml.opendatasets import MNIST
from sklearn.preprocessing import OneHotEncoder
import numpy as np
# Download MNIST dataset from Azure Open Datasets
mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
x = np.array(mnist_df['features'].values.tolist())
y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)
enc = OneHotEncoder()
enc.fit(y)
y = enc.transform(y).toarray()
(x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
y[60000:])
# Prepare dataset for distributed training
x_train = x_train[rank::size]
y_train = y_train[rank::size]
x_test = x_test[rank::size]
y_test = y_test[rank::size]
# Reshape and Normalize data for model input
x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255.0
x_test /= 255.0
return (x_train, y_train), (x_test, y_test)
DNN 모델 정의
데이터 세트가 처리되면 TensorFlow 모델을 정의할 수 있습니다. 동일한 코드를 사용하여 단일 노드 TensorFlow 모델을 학습시킬 수도 있습니다.
# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
from tensorflow.keras import models
from tensorflow.keras import layers
model = models.Sequential()
model.add(
layers.Conv2D(32,
kernel_size=(3, 3),
activation='relu',
input_shape=(28, 28, 1)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Dropout(0.25))
model.add(layers.Flatten())
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(10, activation='softmax'))
return model
단일 노드에 대한 학습 함수 정의
먼저 Apache Spark 풀의 드라이버 노드에서 TensorFlow 모델을 학습합니다. 학습 과정이 완료되면 모델을 평가하고 손실 및 정확도 점수를 인쇄합니다.
def train(learning_rate=0.1):
import tensorflow as tf
from tensorflow import keras
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# Prepare dataset
(x_train, y_train), (x_test, y_test) = get_dataset()
# Initialize model
model = get_model()
# Specify the optimizer (Adadelta in this example)
optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
model.fit(x_train,
y_train,
batch_size=batch_size,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
return model
# Run the training process on the driver
model = train(learning_rate=lr_single_node)
# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)
분산 학습을 위해 HorovodRunner로 마이그레이션
다음으로, 분산 학습에 HorovodRunner를 사용하여 동일한 코드를 다시 실행하는 방법을 살펴보겠습니다.
학습 함수 정의
모델을 학습하려면 먼저 HorovodRunner에 대한 학습 함수를 정의합니다.
# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
# Import base libs
import tempfile
import os
import shutil
import atexit
# Import tensorflow modules to each worker
import tensorflow as tf
from tensorflow import keras
import horovod.tensorflow.keras as hvd
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
# These steps are skipped on a CPU cluster
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
'GPU')
# Call the get_dataset function you created, this time with the Horovod rank and size
(x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())
# Initialize model with random weights
model = get_model()
# Adjust learning rate based on number of GPUs
optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
hvd.size())
# Use the Horovod Distributed Optimizer
optimizer = hvd.DistributedOptimizer(optimizer)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
# Create a callback to broadcast the initial variable states from rank 0 to all other processes.
# This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
callbacks = [
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
# Model checkpoint location.
ckpt_dir = tempfile.mkdtemp()
ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
atexit.register(lambda: shutil.rmtree(ckpt_dir))
# Save checkpoints only on worker 0 to prevent conflicts between workers
if hvd.rank() == 0:
callbacks.append(
keras.callbacks.ModelCheckpoint(ckpt_file,
monitor='val_loss',
mode='min',
save_best_only=True))
model.fit(x_train,
y_train,
batch_size=batch_size,
callbacks=callbacks,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
# Return model bytes only on worker 0
if hvd.rank() == 0:
with open(ckpt_file, 'rb') as f:
return f.read()
학습 실행
모델이 정의되면 학습 프로세스를 실행할 수 있습니다.
# Run training
import os
import sys
import horovod.spark
best_model_bytes = \
horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
env=os.environ.copy(),
stdout=sys.stdout, stderr=sys.stderr, verbose=2,
prefix_output_with_timestamp=True)[0]
ADLS 스토리지에 검사점 저장
이 코드는 검사점을 ADLS(Azure Data Lake Storage) 계정에 저장하는 방법을 보여 줍니다.
import tempfile
import fsspec
import os
local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file
with open(local_ckpt_file, 'wb') as f:
f.write(best_model_bytes)
## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)
print(adls_ckpt_file)
Horovod 학습 모델 평가
모델 학습이 완료되면 최종 모델의 손실과 정확도를 확인할 수 있습니다.
import tensorflow as tf
hvd_model = tf.keras.models.load_model(local_ckpt_file)
_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)
리소스 정리
Spark 인스턴스가 종료되도록 하려면 연결된 세션(Notebook)을 종료합니다. Apache Spark 풀에 지정된 유휴 시간에 도달하면 풀이 종료됩니다. Notebook 오른쪽 상단에 있는 상태 표시줄에서 세션 중지를 선택할 수도 있습니다.