閱讀英文

共用方式為


教學課程:使用 Horovod 估算器和 PyTorch 的分散式定型 (已淘汰)

Horovod 為分散式定型架構,適用於 TensorFlow 和 PyTorch 這類程式庫。 透過 Horovod,使用者可相應增加現有的定型指令碼,只要幾行程式碼就能在數百個 GPU 上執行。

在 Azure Synapse Analytics 中,使用者可以使用預設的 Apache Spark 3 執行階段快速開始使用 Horovod。 針對使用 PyTorch 的 Spark ML 管線應用程式,使用者可以使用 horovod.spark 估計工具 API。 此筆記本會使用 Apache Spark 資料框架,在 MNIST 資料集上執行分散式神經網路 (DNN) 模型的分散式定型。 本教學課程會使用 PyTorch 和 Horovod 估算器來執行定型流程。

必要條件

  • Azure Synapse Analytics 工作區,其中 Azure Data Lake Storage Gen2 儲存體帳戶已設定為預設儲存體。 使用 Data Lake Storage Gen2 檔案系統時,您必須是該檔案系統的儲存體 Blob 資料參與者
  • 在 Azure Synapse Analytics 工作區中啟用 GPU 的 Apache Spark 集區。 如需詳細資訊,請參閱在 Azure Synapse 中建立已啟用 GPU 的 Apache Spark 集區。 在本教學課程中,我們建議使用具有 3 個節點 GPU-Large 叢集大小。

注意

Azure Synapse 啟用 GPU 預覽功能集區現已取代。

警告

取代和停用適用於 Apache Spark 3.1 和 3.2 的 Azure Synapse 執行階段上的 GPU 通知

  • Apache Spark 3.2 (取代) 執行階段上的 GPU 加速預覽現已被取代。 取代的執行階段不會有 Bug 和功能修正。 自 2024 年 7 月 8 日起,Spark 3.2 上的此執行階段和對應的 GPU 加速預覽已淘汰和停用。
  • Azure Synapse 3.1 (取代) 執行階段上的 GPU 加速預覽現已被取代。 自 2023 年 1 月 26 日起,適用於 Apache Spark 3.1 的 Azure Synapse 執行階段已終止支援,官方支援已於 2024 年 1 月 26 日失效,且自此日之後,無法再處理任何支援票證、錯誤修正或安全性更新。

設定 Apache Spark 工作階段

在工作階段開始時,我們必須設定一些 Apache Spark 設定。 在大部分情況下,我們只需要設定 numExecutors 和 spark.rapids.memory.gpu.reserve。 對於大型的模型,使用者可能也須進行 spark.kryoserializer.buffer.max 設定。 對於 Tensorflow 模型,使用者必須將 spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH 設為 true。

在範例中,您可以看到如何使用 %%configure 命令傳遞 Spark 設定。 Apache Spark 設定文件中會說明每個參數的詳細意義。 所提供的值是 Azure Synapse GPU-large 集區的建議最佳做法值。


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

在本教學課程中,我們將使用下列設定:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g"
   }
}

注意

使用 Horovod 進行定型時,使用者應將 numExecutors 的 Spark 設定設為小於或等於節點數目。

匯入相依性

在本教學課程中,我們會使用 PySpark 來讀取和處理資料集。 接著,我們會使用 PyTorch 和 Horovod 來建置分散式神經網路 (DNN) 模型,並執行定型流程。 若要開始使用,我們必須匯入下列相依性:

# base libs
import sys
import uuid

# numpy
import numpy as np

# pyspark related
import pyspark
import pyspark.sql.types as T
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

# pytorch related
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# horovod related
import horovod.spark.torch as hvd
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store

# azure related
from azure.synapse.ml.horovodutils import AdlsStore

連線至替代儲存體帳戶

我們需要 Azure Data Lake Storage (ADLS) 帳戶來儲存中繼和模型資料。 若使用替代的儲存體帳戶,請務必將連結的服務設為從帳戶自動驗證和讀取。 此外,您必須修改下列屬性:remote_urlaccount_namelinked_service_name

num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.01  # learning rate for single node code

uuid_str = str(uuid.uuid4())  # with uuid, each run will use a new directory
work_dir = '/tmp/' + uuid_str

# create adls store for model training, use your own adls account info
remote_url = "<<ABFS path to storage account>>"
account_name = "<<name of storage account>>"
linked_service_name = "<<name of linked service>>"
sas_token = TokenLibrary.getConnectionString(linked_service_name)
adls_store_path = remote_url + work_dir

store = AdlsStore.create(adls_store_path,
                         storage_options={
                             'account_name': account_name,
                             'sas_token': sas_token
                         },
                         save_runs=True)

print(adls_store_path)

準備資料集

接下來,我們將準備資料集以進行定型。 在本教學課程中,我們將使用來自 Azure 開放資料集的 MNIST 資料集。

# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

# Download MNIST dataset from Azure Open Datasets
from azureml.opendatasets import MNIST

mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
mnist_df.info()

# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
mnist_df.head()

使用 Apache Spark 處理資料

現在我們將建立 Apache Spark 資料框架。 此資料框架將用於 HorovodEstimator 進行定型。

# Create Spark DataFrame for training
df = spark.createDataFrame(mnist_df)

# repartition DataFrame for training
train_df = df.repartition(num_proc)

# Train/test split
train_df, test_df = train_df.randomSplit([0.9, 0.1])

# show the dataset
train_df.show()
train_df.count()

定義 DNN 模型

完成處理資料集後,現在便可定義 PyTorch 模型。 相同的程式碼也可用於單一節點 PyTorch 模型的定型。

# Define the PyTorch model without any Horovod-specific parameters
class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = x.float()
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)


model = Net()
optimizer = optim.SGD(model.parameters(),
                      lr=lr_single_node * num_proc,
                      momentum=0.5)  # notice the lr is scaled up
loss = nn.NLLLoss()

定型模型

現在我們可以 Apache Spark 資料框架為基礎,以定型 Horovod Spark 估算器。

# Train a Horovod Spark Estimator on the DataFrame
backend = SparkBackend(num_proc=num_proc,
                       stdout=sys.stdout,
                       stderr=sys.stderr,
                       prefix_output_with_timestamp=True)

torch_estimator = hvd.TorchEstimator(
    backend=backend,
    store=store,
    partitions_per_process=1,  # important for GPU training
    model=model,
    optimizer=optimizer,
    loss=lambda input, target: loss(input, target.long()),
    input_shapes=[[-1, 1, 28, 28]],
    feature_cols=['features'],
    label_cols=['label'],
    batch_size=batch_size,
    epochs=epochs,
    validation=0.1,
    verbose=2)

torch_model = torch_estimator.fit(train_df).setOutputCols(['label_prob'])

評估定型的模型

定型程式完成後,即可使用測試資料集評估模型。

# Evaluate the model on the held-out test DataFrame
pred_df = torch_model.transform(test_df)

argmax = udf(lambda v: float(np.argmax(v)), returnType=T.DoubleType())
pred_df = pred_df.withColumn('label_pred', argmax(pred_df.label_prob))
evaluator = MulticlassClassificationEvaluator(predictionCol='label_pred',
                                              labelCol='label',
                                              metricName='accuracy')

print('Test accuracy:', evaluator.evaluate(pred_df))

清除資源

為確保 Spark 執行個體已關閉,請結束任何已連線的工作階段 (Notebook)。 當達到 Apache Spark 集區中指定的閒置時間時,集區就會關閉。 您也可以在筆記本右上方的狀態列,選取 [停止工作階段]

顯示狀態列上 [停止工作階段] 按鈕的螢幕擷取畫面。

下一步