你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
Horovod 是一个分布式训练框架,适用于 TensorFlow 和 PyTorch 等库。 使用 Horovod,用户只需几行代码即可纵向扩展现有训练脚本,以在数百个 GPU 上运行。
在 Azure Synapse Analytics 中,用户可以使用默认的 Apache Spark 3 运行时快速开始使用 Horovod。 对于使用 Tensorflow 的 Spark ML 管道应用程序,用户可以使用 HorovodRunner
。 此笔记本使用 Apache Spark 数据帧在 MNIST 数据集上执行分布式神经网络 (DNN) 模型的分布式训练。 本教程使用 Tensorflow 和 HorovodRunner
来运行训练过程。
先决条件
- Azure Synapse Analytics 工作区,其中 Azure Data Lake Storage Gen2 存储帐户配置为默认存储。 你需要成为所使用的 Data Lake Storage Gen2 文件系统的存储 Blob 数据参与者。
- 在 Azure Synapse Analytics 工作区中创建支持 GPU 的 Apache Spark 池。 有关详细信息,请参阅在 Azure Synapse 中创建支持 GPU 的 Apache Spark 池。 对于本教程,建议使用具有 3 个节点的大型 GPU 群集。
注意
已启用 Azure Synapse GPU 的池的预览版现已弃用。
配置 Apache Spark 会话
在会话开始时,我们需要配置一些 Apache Spark 设置。 在大多数情况下,我们只需要设置 numExecutors
和 spark.rapids.memory.gpu.reserve
。 对于非常大的模型,用户可能还需要配置 spark.kryoserializer.buffer.max
设置。 对于 TensorFlow 模型,用户需要将 spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH
设置为 true。
在此示例中,你可以看到如何使用 %%configure
命令传递 Spark 配置。
Apache Spark 配置文档中详细解释了每个参数的含义。 提供的值是建议用于 Azure Synapse GPU 大型池的最佳做法值。
%%configure -f
{
"driverMemory": "30g",
"driverCores": 4,
"executorMemory": "60g",
"executorCores": 12,
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
"spark.kryoserializer.buffer.max": "2000m"
}
}
在本教程中,我们将使用以下配置:
%%configure -f
{
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
}
}
注意
使用 Horovod 进行训练时,用户应将 numExecutors
的 Spark 配置设置为小于或等于节点数。
设置主存储帐户
需要使用 Azure Data Lake Storage (ADLS) 帐户来存储中间数据和模型数据。 如果使用备用存储帐户,请确保对链接服务进行设置,以自动对帐户进行身份验证和读取。
在此示例中,我们将从主 Azure Synapse Analytics 存储帐户读取数据。 若要读取结果,你需要修改以下属性:remote_url
。
# Specify training parameters
num_proc = 3 # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1 # learning rate for single node code
# configure adls store remote url
remote_url = "<<abfss path to storage account>>
准备数据集
接下来,我们将准备要用于训练的数据集。 在本教程中,我们将使用 Azure 开放数据集中的 MNIST 数据集。
def get_dataset(rank=0, size=1):
# import dependency libs
from azureml.opendatasets import MNIST
from sklearn.preprocessing import OneHotEncoder
import numpy as np
# Download MNIST dataset from Azure Open Datasets
mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
x = np.array(mnist_df['features'].values.tolist())
y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)
enc = OneHotEncoder()
enc.fit(y)
y = enc.transform(y).toarray()
(x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
y[60000:])
# Prepare dataset for distributed training
x_train = x_train[rank::size]
y_train = y_train[rank::size]
x_test = x_test[rank::size]
y_test = y_test[rank::size]
# Reshape and Normalize data for model input
x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255.0
x_test /= 255.0
return (x_train, y_train), (x_test, y_test)
定义 DNN 模型
处理数据集后,我们可以定义 TensorFlow 模型。 同一代码还可用于训练单节点 TensorFlow 模型。
# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
from tensorflow.keras import models
from tensorflow.keras import layers
model = models.Sequential()
model.add(
layers.Conv2D(32,
kernel_size=(3, 3),
activation='relu',
input_shape=(28, 28, 1)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Dropout(0.25))
model.add(layers.Flatten())
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(10, activation='softmax'))
return model
为单个节点定义训练函数
首先,我们将在 Apache Spark 池的驱动节点上训练我们的 TensorFlow 模型。 在训练过程完成后,我们将评估模型并打印损失和准确度分数。
def train(learning_rate=0.1):
import tensorflow as tf
from tensorflow import keras
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# Prepare dataset
(x_train, y_train), (x_test, y_test) = get_dataset()
# Initialize model
model = get_model()
# Specify the optimizer (Adadelta in this example)
optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
model.fit(x_train,
y_train,
batch_size=batch_size,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
return model
# Run the training process on the driver
model = train(learning_rate=lr_single_node)
# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)
迁移到 HorovodRunner 进行分布式训练
接下来,我们将看看如何使用 HorovodRunner
重新运行相同的代码以进行分布式训练。
定义训练函数
若要训练模型,我们将首先为 HorovodRunner
定义一个训练函数。
# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
# Import base libs
import tempfile
import os
import shutil
import atexit
# Import tensorflow modules to each worker
import tensorflow as tf
from tensorflow import keras
import horovod.tensorflow.keras as hvd
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
# These steps are skipped on a CPU cluster
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
'GPU')
# Call the get_dataset function you created, this time with the Horovod rank and size
(x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())
# Initialize model with random weights
model = get_model()
# Adjust learning rate based on number of GPUs
optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
hvd.size())
# Use the Horovod Distributed Optimizer
optimizer = hvd.DistributedOptimizer(optimizer)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
# Create a callback to broadcast the initial variable states from rank 0 to all other processes.
# This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
callbacks = [
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
# Model checkpoint location.
ckpt_dir = tempfile.mkdtemp()
ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
atexit.register(lambda: shutil.rmtree(ckpt_dir))
# Save checkpoints only on worker 0 to prevent conflicts between workers
if hvd.rank() == 0:
callbacks.append(
keras.callbacks.ModelCheckpoint(ckpt_file,
monitor='val_loss',
mode='min',
save_best_only=True))
model.fit(x_train,
y_train,
batch_size=batch_size,
callbacks=callbacks,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
# Return model bytes only on worker 0
if hvd.rank() == 0:
with open(ckpt_file, 'rb') as f:
return f.read()
运行训练
定义模型后,我们可以运行训练过程。
# Run training
import os
import sys
import horovod.spark
best_model_bytes = \
horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
env=os.environ.copy(),
stdout=sys.stdout, stderr=sys.stderr, verbose=2,
prefix_output_with_timestamp=True)[0]
将检查点保存到 ADLS 存储
下面的代码显示了如何将检查点保存到 Azure Data Lake Storage (ADLS) 帐户。
import tempfile
import fsspec
import os
local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file
with open(local_ckpt_file, 'wb') as f:
f.write(best_model_bytes)
## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)
print(adls_ckpt_file)
评估 Horovod 训练的模型
在模型训练完成后,我们可以查看最终模型的损失和准确度。
import tensorflow as tf
hvd_model = tf.keras.models.load_model(local_ckpt_file)
_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)
清理资源
为了确保关闭 Spark 实例,请结束任何已连接的会话(笔记本)。 达到 Apache Spark 池中指定的空闲时间时,池将会关闭。 也可以从笔记本右上角的状态栏中选择“停止会话”。