Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Horovod — это платформа распределенного обучения для таких библиотек, как TensorFlow и PyTorch. Horovod позволяет масштабировать существующий скрипт обучения таким образом, чтобы его можно выполнять сразу в нескольких сотнях GPU, написав всего несколько строчек кода.
В Azure Synapse Analytics пользователи могут быстро приступить к работе с Horovod с помощью среды выполнения Apache Spark 3 по умолчанию. Для приложений конвейера Spark ML пользователи могут использовать оценщик API horovod.spark с помощью PyTorch. В этой записной книжке используется датафрейм Apache Spark для распределенного обучения модели нейронной сети (DNN) на наборе данных MNIST. В этом руководстве используется PyTorch и Horovod Estimator для выполнения процесса обучения.
Необходимые условия
- Рабочая область Azure Synapse Analytics с учетной записью хранения Azure Data Lake Storage 2-го поколения, настроенной в качестве хранилища по умолчанию. Для работы с файловой системой Data Lake Storage Gen2 вам необходимо быть участником данных Хранилища BLOB-объектов.
- Создайте пул Apache Spark с поддержкой GPU в рабочей области Azure Synapse Analytics. Дополнительные сведения см. в разделе Создание пула Apache Spark с поддержкой GPU в Azure Synapse. В этом руководстве мы рекомендуем использовать кластер размера GPU-Large с тремя узлами.
Примечание.
Предварительный просмотр пулов Azure Synapse с возможностью использования GPU снят с поддержки.
Настройка сеанса Apache Spark
В начале сеанса необходимо настроить несколько параметров Apache Spark. В большинстве случаев нам нужно задать только параметры numExecutors и spark.rapids.memory.gpu.reserve. Для больших моделей пользователям также может потребоваться настроить параметр spark.kryoserializer.buffer.max
. Для моделей TensorFlow пользователям необходимо задать spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH
значение true.
В примере показано, как можно передать конфигурации Spark с помощью %%configure
команды. Подробное значение каждого параметра см. в документации по конфигурации Apache Spark. Указанные значения — это предлагаемые значения, основанные на лучших практиках для пулов Azure Synapse GPU большого размера.
%%configure -f
{
"driverMemory": "30g",
"driverCores": 4,
"executorMemory": "60g",
"executorCores": 12,
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
"spark.kryoserializer.buffer.max": "2000m"
}
}
В этом руководстве мы будем использовать следующие конфигурации:
%%configure -f
{
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g"
}
}
Примечание.
При обучении с использованием Horovod пользователи должны настроить конфигурацию Spark таким образом, чтобы значение numExecutors
было меньше или равно количеству узлов.
Импорт зависимостей
В этом руководстве мы используем PySpark для чтения и обработки набора данных. Затем мы используем PyTorch и Horovod для создания модели распределенной нейронной сети (DNN) и запускаем учебный процесс. Чтобы приступить к работе, необходимо импортировать следующие зависимости:
# base libs
import sys
import uuid
# numpy
import numpy as np
# pyspark related
import pyspark
import pyspark.sql.types as T
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
# pytorch related
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
# horovod related
import horovod.spark.torch as hvd
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store
# azure related
from azure.synapse.ml.horovodutils import AdlsStore
Подключение к альтернативной учетной записи хранения
Нам нужна учетная запись Azure Data Lake Storage (ADLS) для хранения промежуточных и моделей данных. Если вы используете альтернативную учетную запись хранения, обязательно настройте связанную службу для автоматической проверки подлинности и чтения из учетной записи. Кроме того, необходимо изменить следующие свойства: remote_url
, account_name
и linked_service_name
.
num_proc = 3 # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.01 # learning rate for single node code
uuid_str = str(uuid.uuid4()) # with uuid, each run will use a new directory
work_dir = '/tmp/' + uuid_str
# create adls store for model training, use your own adls account info
remote_url = "<<ABFS path to storage account>>"
account_name = "<<name of storage account>>"
linked_service_name = "<<name of linked service>>"
sas_token = TokenLibrary.getConnectionString(linked_service_name)
adls_store_path = remote_url + work_dir
store = AdlsStore.create(adls_store_path,
storage_options={
'account_name': account_name,
'sas_token': sas_token
},
save_runs=True)
print(adls_store_path)
Подготовка набора данных
Затем мы подготовим набор данных для обучения. В этом руководстве мы будем использовать набор данных MNIST из Открытых наборов данных Azure.
# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()
# Download MNIST dataset from Azure Open Datasets
from azureml.opendatasets import MNIST
mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
mnist_df.info()
# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
mnist_df.head()
Обработка данных с помощью Apache Spark
Теперь мы создадим DataFrame Apache Spark. Этот фрейм данных будет использоваться для обучения вместе с HorovodEstimator
.
# Create Spark DataFrame for training
df = spark.createDataFrame(mnist_df)
# repartition DataFrame for training
train_df = df.repartition(num_proc)
# Train/test split
train_df, test_df = train_df.randomSplit([0.9, 0.1])
# show the dataset
train_df.show()
train_df.count()
Определение модели DNN
Завершив обработку набора данных, теперь можно определить модель PyTorch. Тот же код можно также использовать для обучения модели PyTorch с одним узлом.
# Define the PyTorch model without any Horovod-specific parameters
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = x.float()
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x)
model = Net()
optimizer = optim.SGD(model.parameters(),
lr=lr_single_node * num_proc,
momentum=0.5) # notice the lr is scaled up
loss = nn.NLLLoss()
Обучение модели
Теперь мы можем обучать оценщик Horovod Spark на основе нашего dataframe Apache Spark.
# Train a Horovod Spark Estimator on the DataFrame
backend = SparkBackend(num_proc=num_proc,
stdout=sys.stdout,
stderr=sys.stderr,
prefix_output_with_timestamp=True)
torch_estimator = hvd.TorchEstimator(
backend=backend,
store=store,
partitions_per_process=1, # important for GPU training
model=model,
optimizer=optimizer,
loss=lambda input, target: loss(input, target.long()),
input_shapes=[[-1, 1, 28, 28]],
feature_cols=['features'],
label_cols=['label'],
batch_size=batch_size,
epochs=epochs,
validation=0.1,
verbose=2)
torch_model = torch_estimator.fit(train_df).setOutputCols(['label_prob'])
Оценка обученной модели
После завершения процесса обучения мы можем оценить модель в тестовом наборе данных.
# Evaluate the model on the held-out test DataFrame
pred_df = torch_model.transform(test_df)
argmax = udf(lambda v: float(np.argmax(v)), returnType=T.DoubleType())
pred_df = pred_df.withColumn('label_pred', argmax(pred_df.label_prob))
evaluator = MulticlassClassificationEvaluator(predictionCol='label_pred',
labelCol='label',
metricName='accuracy')
print('Test accuracy:', evaluator.evaluate(pred_df))
Очистка ресурсов
Чтобы правильно завершить работу экземпляра Spark, завершите все подключенные сессии и записные книжки. Пул Apache Spark завершит работу автоматически, когда истечет указанное для него время простоя. Можно также выполнить команду остановки сеанса из строки состояния в верхней правой части записной книжки.