Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Horovod to rozproszona struktura szkoleniowa dla bibliotek, takich jak TensorFlow i PyTorch. Dzięki platformie Horovod użytkownicy mogą skalować istniejący skrypt szkoleniowy w górę, aby działał na setkach procesorów GPU w kilku wierszach kodu.
W usłudze Azure Synapse Analytics użytkownicy mogą szybko rozpocząć pracę z platformą Horovod przy użyciu domyślnego środowiska uruchomieniowego platformy Apache Spark 3. W przypadku aplikacji potoków spark ML korzystających z rozwiązania PyTorch użytkownicy mogą używać interfejsu API narzędzia do szacowania horovod.spark. Ten notatnik używa ramki danych platformy Apache Spark do przeprowadzania rozproszonego szkolenia modelu sieci neuronowej (DNN) na zestawie danych MNIST. W tym samouczku użyto narzędzia PyTorch i narzędzia do szacowania horovod do uruchomienia procesu trenowania.
Wymagania wstępne
- Obszar roboczy Azure Synapse Analytics z kontem usługi Azure Data Lake Storage Gen2 skonfigurowanym jako magazyn domyślny. Musisz mieć uprawnienia Storage Blob Data Contributor w systemie plików usługi Data Lake Storage Gen2, z którym pracujesz.
- Utwórz pulę Apache Spark z obsługą GPU w obszarze roboczym usługi Azure Synapse Analytics. Aby uzyskać szczegółowe informacje, zobacz Tworzenie puli platformy Apache Spark z obsługą procesora GPU w usłudze Azure Synapse. Na potrzeby tego samouczka zalecamy użycie dużego rozmiaru klastra GPU z 3 węzłami.
Uwaga / Notatka
Wersja zapoznawcza pul z obsługą procesora GPU usługi Azure Synapse jest przestarzała.
Konfigurowanie sesji platformy Apache Spark
Na początku sesji musimy skonfigurować kilka ustawień platformy Apache Spark. W większości przypadków musimy ustawić tylko numExecutors i spark.rapids.memory.gpu.reserve. W przypadku dużych modeli użytkownicy mogą również wymagać skonfigurowania spark.kryoserializer.buffer.max ustawienia. W przypadku modeli TensorFlow użytkownicy muszą ustawić spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH wartość true.
W tym przykładzie możesz zobaczyć, jak konfiguracje platformy Spark można przekazać za %%configure pomocą polecenia . Szczegółowe znaczenie każdego parametru wyjaśniono w dokumentacji konfiguracji platformy Apache Spark. Podane wartości to sugerowane wartości najlepszych rozwiązań dla dużych pul procesora GPU usługi Azure Synapse.
%%configure -f
{
"driverMemory": "30g",
"driverCores": 4,
"executorMemory": "60g",
"executorCores": 12,
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
"spark.kryoserializer.buffer.max": "2000m"
}
}
Na potrzeby tego samouczka użyjemy następujących konfiguracji:
%%configure -f
{
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g"
}
}
Uwaga / Notatka
Podczas trenowania z Horovod użytkownicy powinni ustawić konfigurację Spark na numExecutors, aby była mniejsza lub równa liczbie węzłów.
Importowanie zależności
W tym samouczku użyjemy narzędzia PySpark do odczytywania i przetwarzania zestawu danych. Następnie użyjemy platformY PyTorch i Horovod do utworzenia modelu rozproszonej sieci neuronowej (DNN) i uruchomienia procesu trenowania. Aby rozpocząć, musimy zaimportować następujące zależności:
# base libs
import sys
import uuid
# numpy
import numpy as np
# pyspark related
import pyspark
import pyspark.sql.types as T
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
# pytorch related
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
# horovod related
import horovod.spark.torch as hvd
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store
# azure related
from azure.synapse.ml.horovodutils import AdlsStore
Nawiązywanie połączenia z alternatywnym kontem magazynu
Potrzebujemy konta usługi Azure Data Lake Storage (ADLS) do przechowywania danych pośrednich i modelowych. Jeśli używasz alternatywnego konta magazynu, pamiętaj, aby skonfigurować połączoną usługę w celu automatycznego uwierzytelniania i odczytywania z konta. Ponadto należy zmodyfikować następujące właściwości: remote_url, account_namei linked_service_name.
num_proc = 3 # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.01 # learning rate for single node code
uuid_str = str(uuid.uuid4()) # with uuid, each run will use a new directory
work_dir = '/tmp/' + uuid_str
# create adls store for model training, use your own adls account info
remote_url = "<<ABFS path to storage account>>"
account_name = "<<name of storage account>>"
linked_service_name = "<<name of linked service>>"
sas_token = TokenLibrary.getConnectionString(linked_service_name)
adls_store_path = remote_url + work_dir
store = AdlsStore.create(adls_store_path,
storage_options={
'account_name': account_name,
'sas_token': sas_token
},
save_runs=True)
print(adls_store_path)
Przygotowywanie zestawu danych
Następnie przygotujemy zestaw danych do trenowania. W tym samouczku użyjemy zestawu danych MNIST z usługi Azure Open Datasets.
# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()
# Download MNIST dataset from Azure Open Datasets
from azureml.opendatasets import MNIST
mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
mnist_df.info()
# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
mnist_df.head()
Przetwarzanie danych za pomocą platformy Apache Spark
Teraz utworzymy ramkę danych platformy Apache Spark. Ta ramka danych będzie używana z elementem HorovodEstimator do trenowania.
# Create Spark DataFrame for training
df = spark.createDataFrame(mnist_df)
# repartition DataFrame for training
train_df = df.repartition(num_proc)
# Train/test split
train_df, test_df = train_df.randomSplit([0.9, 0.1])
# show the dataset
train_df.show()
train_df.count()
Definiowanie modelu sieci neuronowej (DNN)
Po zakończeniu przetwarzania zestawu danych możemy teraz zdefiniować nasz model PyTorch. Ten sam kod może również służyć do trenowania modelu PyTorch z jednym węzłem.
# Define the PyTorch model without any Horovod-specific parameters
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = x.float()
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x)
model = Net()
optimizer = optim.SGD(model.parameters(),
lr=lr_single_node * num_proc,
momentum=0.5) # notice the lr is scaled up
loss = nn.NLLLoss()
Trenowanie modelu
Teraz możemy wytrenować narzędzie do szacowania platformy Horovod Spark na podstawie naszej ramki danych platformy Apache Spark.
# Train a Horovod Spark Estimator on the DataFrame
backend = SparkBackend(num_proc=num_proc,
stdout=sys.stdout,
stderr=sys.stderr,
prefix_output_with_timestamp=True)
torch_estimator = hvd.TorchEstimator(
backend=backend,
store=store,
partitions_per_process=1, # important for GPU training
model=model,
optimizer=optimizer,
loss=lambda input, target: loss(input, target.long()),
input_shapes=[[-1, 1, 28, 28]],
feature_cols=['features'],
label_cols=['label'],
batch_size=batch_size,
epochs=epochs,
validation=0.1,
verbose=2)
torch_model = torch_estimator.fit(train_df).setOutputCols(['label_prob'])
Ocena wytrenowanego modelu
Po zakończeniu procesu trenowania możemy ocenić model w zestawie danych testowych.
# Evaluate the model on the held-out test DataFrame
pred_df = torch_model.transform(test_df)
argmax = udf(lambda v: float(np.argmax(v)), returnType=T.DoubleType())
pred_df = pred_df.withColumn('label_pred', argmax(pred_df.label_prob))
evaluator = MulticlassClassificationEvaluator(predictionCol='label_pred',
labelCol='label',
metricName='accuracy')
print('Test accuracy:', evaluator.evaluate(pred_df))
Uprzątnij zasoby
Aby upewnić się, że instancja platformy Spark jest wyłączona, zakończ wszystkie połączone sesje i notatniki. Zasób zostanie zamknięty, gdy w puli Apache Spark zostanie osiągnięty określony czas bezczynności. Możesz również wybrać pozycję Zatrzymaj sesję na pasku stanu w prawym górnym rogu notesu.