Поделиться через


Учебное пособие: Распределенное обучение с Horovod Estimator и PyTorch (не рекомендуется)

Horovod — это платформа распределенного обучения для таких библиотек, как TensorFlow и PyTorch. Horovod позволяет масштабировать существующий скрипт обучения таким образом, чтобы его можно выполнять сразу в нескольких сотнях GPU, написав всего несколько строчек кода.

В Azure Synapse Analytics пользователи могут быстро приступить к работе с Horovod с помощью среды выполнения Apache Spark 3 по умолчанию. Для приложений конвейера Spark ML пользователи могут использовать оценщик API horovod.spark с помощью PyTorch. В этой записной книжке используется датафрейм Apache Spark для распределенного обучения модели нейронной сети (DNN) на наборе данных MNIST. В этом руководстве используется PyTorch и Horovod Estimator для выполнения процесса обучения.

Необходимые условия

  • Рабочая область Azure Synapse Analytics с учетной записью хранения Azure Data Lake Storage 2-го поколения, настроенной в качестве хранилища по умолчанию. Для работы с файловой системой Data Lake Storage Gen2 вам необходимо быть участником данных Хранилища BLOB-объектов.
  • Создайте пул Apache Spark с поддержкой GPU в рабочей области Azure Synapse Analytics. Дополнительные сведения см. в разделе Создание пула Apache Spark с поддержкой GPU в Azure Synapse. В этом руководстве мы рекомендуем использовать кластер размера GPU-Large с тремя узлами.

Примечание.

Предварительный просмотр пулов Azure Synapse с возможностью использования GPU снят с поддержки.

Настройка сеанса Apache Spark

В начале сеанса необходимо настроить несколько параметров Apache Spark. В большинстве случаев нам нужно задать только параметры numExecutors и spark.rapids.memory.gpu.reserve. Для больших моделей пользователям также может потребоваться настроить параметр spark.kryoserializer.buffer.max. Для моделей TensorFlow пользователям необходимо задать spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH значение true.

В примере показано, как можно передать конфигурации Spark с помощью %%configure команды. Подробное значение каждого параметра см. в документации по конфигурации Apache Spark. Указанные значения — это предлагаемые значения, основанные на лучших практиках для пулов Azure Synapse GPU большого размера.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

В этом руководстве мы будем использовать следующие конфигурации:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g"
   }
}

Примечание.

При обучении с использованием Horovod пользователи должны настроить конфигурацию Spark таким образом, чтобы значение numExecutors было меньше или равно количеству узлов.

Импорт зависимостей

В этом руководстве мы используем PySpark для чтения и обработки набора данных. Затем мы используем PyTorch и Horovod для создания модели распределенной нейронной сети (DNN) и запускаем учебный процесс. Чтобы приступить к работе, необходимо импортировать следующие зависимости:

# base libs
import sys
import uuid

# numpy
import numpy as np

# pyspark related
import pyspark
import pyspark.sql.types as T
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

# pytorch related
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# horovod related
import horovod.spark.torch as hvd
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store

# azure related
from azure.synapse.ml.horovodutils import AdlsStore

Подключение к альтернативной учетной записи хранения

Нам нужна учетная запись Azure Data Lake Storage (ADLS) для хранения промежуточных и моделей данных. Если вы используете альтернативную учетную запись хранения, обязательно настройте связанную службу для автоматической проверки подлинности и чтения из учетной записи. Кроме того, необходимо изменить следующие свойства: remote_url, account_nameи linked_service_name.

num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.01  # learning rate for single node code

uuid_str = str(uuid.uuid4())  # with uuid, each run will use a new directory
work_dir = '/tmp/' + uuid_str

# create adls store for model training, use your own adls account info
remote_url = "<<ABFS path to storage account>>"
account_name = "<<name of storage account>>"
linked_service_name = "<<name of linked service>>"
sas_token = TokenLibrary.getConnectionString(linked_service_name)
adls_store_path = remote_url + work_dir

store = AdlsStore.create(adls_store_path,
                         storage_options={
                             'account_name': account_name,
                             'sas_token': sas_token
                         },
                         save_runs=True)

print(adls_store_path)

Подготовка набора данных

Затем мы подготовим набор данных для обучения. В этом руководстве мы будем использовать набор данных MNIST из Открытых наборов данных Azure.

# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

# Download MNIST dataset from Azure Open Datasets
from azureml.opendatasets import MNIST

mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
mnist_df.info()

# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
mnist_df.head()

Обработка данных с помощью Apache Spark

Теперь мы создадим DataFrame Apache Spark. Этот фрейм данных будет использоваться для обучения вместе с HorovodEstimator.

# Create Spark DataFrame for training
df = spark.createDataFrame(mnist_df)

# repartition DataFrame for training
train_df = df.repartition(num_proc)

# Train/test split
train_df, test_df = train_df.randomSplit([0.9, 0.1])

# show the dataset
train_df.show()
train_df.count()

Определение модели DNN

Завершив обработку набора данных, теперь можно определить модель PyTorch. Тот же код можно также использовать для обучения модели PyTorch с одним узлом.

# Define the PyTorch model without any Horovod-specific parameters
class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = x.float()
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)


model = Net()
optimizer = optim.SGD(model.parameters(),
                      lr=lr_single_node * num_proc,
                      momentum=0.5)  # notice the lr is scaled up
loss = nn.NLLLoss()

Обучение модели

Теперь мы можем обучать оценщик Horovod Spark на основе нашего dataframe Apache Spark.

# Train a Horovod Spark Estimator on the DataFrame
backend = SparkBackend(num_proc=num_proc,
                       stdout=sys.stdout,
                       stderr=sys.stderr,
                       prefix_output_with_timestamp=True)

torch_estimator = hvd.TorchEstimator(
    backend=backend,
    store=store,
    partitions_per_process=1,  # important for GPU training
    model=model,
    optimizer=optimizer,
    loss=lambda input, target: loss(input, target.long()),
    input_shapes=[[-1, 1, 28, 28]],
    feature_cols=['features'],
    label_cols=['label'],
    batch_size=batch_size,
    epochs=epochs,
    validation=0.1,
    verbose=2)

torch_model = torch_estimator.fit(train_df).setOutputCols(['label_prob'])

Оценка обученной модели

После завершения процесса обучения мы можем оценить модель в тестовом наборе данных.

# Evaluate the model on the held-out test DataFrame
pred_df = torch_model.transform(test_df)

argmax = udf(lambda v: float(np.argmax(v)), returnType=T.DoubleType())
pred_df = pred_df.withColumn('label_pred', argmax(pred_df.label_prob))
evaluator = MulticlassClassificationEvaluator(predictionCol='label_pred',
                                              labelCol='label',
                                              metricName='accuracy')

print('Test accuracy:', evaluator.evaluate(pred_df))

Очистка ресурсов

Чтобы правильно завершить работу экземпляра Spark, завершите все подключенные сессии и записные книжки. Пул Apache Spark завершит работу автоматически, когда истечет указанное для него время простоя. Можно также выполнить команду остановки сеанса из строки состояния в верхней правой части записной книжки.

Снимок экрана: кнопка остановки сеанса в строке состояния.

Следующие шаги