你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

教程:使用 Horovod 估算器和 PyTorch 进行分布式训练(预览版)

Horovod 是一个分布式训练框架,适用于 TensorFlow 和 PyTorch 等库。 使用 Horovod,用户只需几行代码即可纵向扩展现有训练脚本,以在数百个 GPU 上运行。

在 Azure Synapse Analytics 中,用户可以使用默认的 Apache Spark 3 运行时快速开始使用 Horovod。 对于使用 PyTorch 的 Spark ML 管道应用程序,用户可以使用 horovod.spark 估算器 API。 此笔记本使用 Apache Spark 数据帧在 MNIST 数据集上执行分布式神经网络 (DNN) 模型的分布式训练。 本教程使用 PyTorch 和 Horovod 估算器运行训练过程。

先决条件

警告

  • GPU 加速预览版仅限用于 Apache Spark 3.2(已宣布终止支持)运行时。 已于 2023 年 7 月 8 日宣布终止对 Azure Synapse Runtime for Apache Spark 3.2 的支持。 已宣布终止支持的运行时不会收到 bug 和功能修复。 安全修补程序将基于风险评估进行向后移植。 自 2024 年 7 月 8 日起,此运行时以及 Spark 3.2 上的相应 GPU 加速预览版将会停用并被禁用。
  • 目前,GPU 加速预览版在 Azure Synapse 3.1(不支持)运行时上不受支持。 Azure Synapse Runtime for Apache Spark 3.1 已于 2023 年 1 月 26 日终止支持,并于 2024 年 1 月 26 日起停止提供官方支持,即在此日期之后不再处理支持工单以及提供 bug 修复或安全更新。

配置 Apache Spark 会话

在会话开始时,我们需要配置一些 Apache Spark 设置。 在大多数情况下,只需设置 numExecutors 和 spark.rapids.memory.gpu.reserve。 对于大型模型,用户可能还需要配置 spark.kryoserializer.buffer.max 设置。 对于 Tensorflow 模型,用户需要将 spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH 设置为 true。

在此示例中,你可以看到如何使用 %%configure 命令传递 Spark 配置。 Apache Spark 配置文档中详细解释了每个参数的含义。 提供的值是建议用于 Azure Synapse GPU 大型池的最佳做法值。


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

在本教程中,我们将使用以下配置:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g"
   }
}

注意

使用 Horovod 进行训练时,用户应将 numExecutors 的 Spark 配置设置为小于或等于节点数。

导入依赖项

在本教程中,我们使用 PySpark 读取和处理数据集。 然后使用 PyTorch 和 Horovod 构建分布式神经网络 (DNN) 模型并运行训练过程。 若要开始操作,需要导入以下依赖项:

# base libs
import sys
import uuid

# numpy
import numpy as np

# pyspark related
import pyspark
import pyspark.sql.types as T
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

# pytorch related
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# horovod related
import horovod.spark.torch as hvd
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store

# azure related
from azure.synapse.ml.horovodutils import AdlsStore

连接到备用存储帐户

需要使用 Azure Data Lake Storage (ADLS) 帐户来存储中间数据和模型数据。 如果使用备用存储帐户,请确保对链接服务进行设置,以自动对帐户进行身份验证和读取。 此外,还需要修改以下属性:remote_urlaccount_namelinked_service_name

num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.01  # learning rate for single node code

uuid_str = str(uuid.uuid4())  # with uuid, each run will use a new directory
work_dir = '/tmp/' + uuid_str

# create adls store for model training, use your own adls account info
remote_url = "<<ABFS path to storage account>>"
account_name = "<<name of storage account>>"
linked_service_name = "<<name of linked service>>"
sas_token = TokenLibrary.getConnectionString(linked_service_name)
adls_store_path = remote_url + work_dir

store = AdlsStore.create(adls_store_path,
                         storage_options={
                             'account_name': account_name,
                             'sas_token': sas_token
                         },
                         save_runs=True)

print(adls_store_path)

准备数据集

接下来,我们将准备数据集进行训练。 在本教程中,我们将使用 Azure 开放数据集中的 MNIST 数据集。

# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

# Download MNIST dataset from Azure Open Datasets
from azureml.opendatasets import MNIST

mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
mnist_df.info()

# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
mnist_df.head()

使用 Apache Spark 处理数据

现在,我们将创建 Apache Spark 数据帧。 此数据框将与 HorovodEstimator 一起用于训练。

# Create Spark DataFrame for training
df = spark.createDataFrame(mnist_df)

# repartition DataFrame for training
train_df = df.repartition(num_proc)

# Train/test split
train_df, test_df = train_df.randomSplit([0.9, 0.1])

# show the dataset
train_df.show()
train_df.count()

定义 DNN 模型

数据集处理完成后,现在可以定义 PyTorch 模型。 同一代码还可用于训练单节点 PyTorch 模型。

# Define the PyTorch model without any Horovod-specific parameters
class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = x.float()
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)


model = Net()
optimizer = optim.SGD(model.parameters(),
                      lr=lr_single_node * num_proc,
                      momentum=0.5)  # notice the lr is scaled up
loss = nn.NLLLoss()

定型模型

现在,我们可以在 Apache Spark 数据帧的基础上训练 Horovod Spark 估算器。

# Train a Horovod Spark Estimator on the DataFrame
backend = SparkBackend(num_proc=num_proc,
                       stdout=sys.stdout,
                       stderr=sys.stderr,
                       prefix_output_with_timestamp=True)

torch_estimator = hvd.TorchEstimator(
    backend=backend,
    store=store,
    partitions_per_process=1,  # important for GPU training
    model=model,
    optimizer=optimizer,
    loss=lambda input, target: loss(input, target.long()),
    input_shapes=[[-1, 1, 28, 28]],
    feature_cols=['features'],
    label_cols=['label'],
    batch_size=batch_size,
    epochs=epochs,
    validation=0.1,
    verbose=2)

torch_model = torch_estimator.fit(train_df).setOutputCols(['label_prob'])

评估已训练的模型

训练过程完成后,可以在测试数据集上评估模型。

# Evaluate the model on the held-out test DataFrame
pred_df = torch_model.transform(test_df)

argmax = udf(lambda v: float(np.argmax(v)), returnType=T.DoubleType())
pred_df = pred_df.withColumn('label_pred', argmax(pred_df.label_prob))
evaluator = MulticlassClassificationEvaluator(predictionCol='label_pred',
                                              labelCol='label',
                                              metricName='accuracy')

print('Test accuracy:', evaluator.evaluate(pred_df))

清理资源

为了确保关闭 Spark 实例,请结束任何已连接的会话(笔记本)。 达到 Apache Spark 池中指定的空闲时间时,池将会关闭。 也可以从笔记本右上角的状态栏中选择“停止会话”。

显示状态栏上的“停止会话”按钮的屏幕截图。

后续步骤