Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Horovod to rozproszona struktura szkoleniowa dla bibliotek, takich jak TensorFlow i PyTorch. Dzięki platformie Horovod użytkownicy mogą skalować istniejący skrypt szkoleniowy w górę, aby działał na setkach procesorów GPU w kilku wierszach kodu.
W usłudze Azure Synapse Analytics użytkownicy mogą szybko rozpocząć pracę z platformą Horovod przy użyciu domyślnego środowiska uruchomieniowego platformy Apache Spark 3. W przypadku aplikacji potoków Spark ML korzystających z biblioteki TensorFlow użytkownicy mogą używać HorovodRunner. Ten notatnik używa ramki danych platformy Apache Spark do przeprowadzania rozproszonego szkolenia modelu sieci neuronowej (DNN) na zestawie danych MNIST. W tym samouczku użyto biblioteki TensorFlow i elementu HorovodRunner , aby uruchomić proces trenowania.
Wymagania wstępne
- Obszar roboczy Azure Synapse Analytics z kontem usługi Azure Data Lake Storage Gen2 skonfigurowanym jako magazyn domyślny. Musisz być kontrybutorem danych obiektu blob usługi Storage w systemie plików usługi Data Lake Storage Gen2, z którym pracujesz.
- Utwórz pulę Apache Spark z obsługą GPU w obszarze roboczym usługi Azure Synapse Analytics. Aby uzyskać szczegółowe informacje, zobacz Tworzenie puli platformy Apache Spark z obsługą procesora GPU w usłudze Azure Synapse. Na potrzeby tego samouczka zalecamy użycie dużego rozmiaru klastra GPU z 3 węzłami.
Uwaga
Wersja zapoznawcza pul z obsługą GPU dla Azure Synapse przestała być wspierana.
Konfigurowanie sesji platformy Apache Spark
Na początku sesji musimy skonfigurować kilka ustawień platformy Apache Spark. W większości przypadków musimy ustawić tylko wartości numExecutors i spark.rapids.memory.gpu.reserve. W przypadku bardzo dużych modeli użytkownicy mogą również wymagać skonfigurowania ustawienia spark.kryoserializer.buffer.max. W przypadku modeli TensorFlow użytkownicy muszą ustawić spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH wartość true.
W tym przykładzie możesz zobaczyć, jak konfiguracje platformy Spark można przekazać za %%configure pomocą polecenia . Szczegółowe znaczenie każdego parametru wyjaśniono w dokumentacji konfiguracji platformy Apache Spark. Podane wartości to sugerowane wartości najlepszych rozwiązań dla dużych pul procesora GPU usługi Azure Synapse.
%%configure -f
{
"driverMemory": "30g",
"driverCores": 4,
"executorMemory": "60g",
"executorCores": 12,
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
"spark.kryoserializer.buffer.max": "2000m"
}
}
Na potrzeby tego samouczka użyjemy następujących konfiguracji:
%%configure -f
{
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
}
}
Uwaga
Podczas trenowania z Horovod użytkownicy powinni ustawić konfigurację Spark na numExecutors, aby była mniejsza lub równa liczbie węzłów.
Konfigurowanie podstawowego konta magazynowego
Potrzebujemy konta usługi Azure Data Lake Storage (ADLS) do przechowywania danych pośrednich i modelowych. Jeśli używasz alternatywnego konta przechowywania, pamiętaj, aby skonfigurować połączoną usługę, aby automatycznie uwierzytelniała i odczytywała dane z konta.
W tym przykładzie odczytujemy dane z podstawowego konta magazynowego usługi Azure Synapse Analytics. Aby odczytać wyniki, należy zmodyfikować następujące właściwości: remote_url.
# Specify training parameters
num_proc = 3 # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1 # learning rate for single node code
# configure adls store remote url
remote_url = "<<abfss path to storage account>>
Przygotowywanie zestawu danych
Następnie przygotujemy zestaw danych do trenowania. W tym samouczku użyjemy zestawu danych MNIST z usługi Azure Open Datasets.
def get_dataset(rank=0, size=1):
# import dependency libs
from azureml.opendatasets import MNIST
from sklearn.preprocessing import OneHotEncoder
import numpy as np
# Download MNIST dataset from Azure Open Datasets
mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
x = np.array(mnist_df['features'].values.tolist())
y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)
enc = OneHotEncoder()
enc.fit(y)
y = enc.transform(y).toarray()
(x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
y[60000:])
# Prepare dataset for distributed training
x_train = x_train[rank::size]
y_train = y_train[rank::size]
x_test = x_test[rank::size]
y_test = y_test[rank::size]
# Reshape and Normalize data for model input
x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255.0
x_test /= 255.0
return (x_train, y_train), (x_test, y_test)
Definiowanie modelu sieci neuronowej (DNN)
Po przetworzeniu zestawu danych możemy zdefiniować nasz model TensorFlow. Ten sam kod może również służyć do trenowania modelu TensorFlow z jednym węzłem.
# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
from tensorflow.keras import models
from tensorflow.keras import layers
model = models.Sequential()
model.add(
layers.Conv2D(32,
kernel_size=(3, 3),
activation='relu',
input_shape=(28, 28, 1)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Dropout(0.25))
model.add(layers.Flatten())
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(10, activation='softmax'))
return model
Zdefiniować funkcję trenowania dla pojedynczego węzła
Najpierw trenujemy nasz model TensorFlow w węźle sterownika puli Apache Spark. Po zakończeniu procesu trenowania oceniamy model i wyświetlamy wyniki utraty i dokładności.
def train(learning_rate=0.1):
import tensorflow as tf
from tensorflow import keras
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# Prepare dataset
(x_train, y_train), (x_test, y_test) = get_dataset()
# Initialize model
model = get_model()
# Specify the optimizer (Adadelta in this example)
optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
model.fit(x_train,
y_train,
batch_size=batch_size,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
return model
# Run the training process on the driver
model = train(learning_rate=lr_single_node)
# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)
Migrowanie do modułu HorovodRunner na potrzeby trenowania rozproszonego
Następnie przyjrzymy się, jak ten sam kod można ponownie uruchomić na potrzeby trenowania rozproszonego przy użyciu HorovodRunner.
Definiowanie funkcji trenowania
Aby wytrenować model, najpierw zdefiniujemy funkcję szkoleniową dla elementu HorovodRunner.
# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
# Import base libs
import tempfile
import os
import shutil
import atexit
# Import tensorflow modules to each worker
import tensorflow as tf
from tensorflow import keras
import horovod.tensorflow.keras as hvd
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
# These steps are skipped on a CPU cluster
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
'GPU')
# Call the get_dataset function you created, this time with the Horovod rank and size
(x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())
# Initialize model with random weights
model = get_model()
# Adjust learning rate based on number of GPUs
optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
hvd.size())
# Use the Horovod Distributed Optimizer
optimizer = hvd.DistributedOptimizer(optimizer)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
# Create a callback to broadcast the initial variable states from rank 0 to all other processes.
# This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
callbacks = [
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
# Model checkpoint location.
ckpt_dir = tempfile.mkdtemp()
ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
atexit.register(lambda: shutil.rmtree(ckpt_dir))
# Save checkpoints only on worker 0 to prevent conflicts between workers
if hvd.rank() == 0:
callbacks.append(
keras.callbacks.ModelCheckpoint(ckpt_file,
monitor='val_loss',
mode='min',
save_best_only=True))
model.fit(x_train,
y_train,
batch_size=batch_size,
callbacks=callbacks,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
# Return model bytes only on worker 0
if hvd.rank() == 0:
with open(ckpt_file, 'rb') as f:
return f.read()
Uruchamianie szkolenia
Po zdefiniowaniu modelu możemy uruchomić proces trenowania.
# Run training
import os
import sys
import horovod.spark
best_model_bytes = \
horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
env=os.environ.copy(),
stdout=sys.stdout, stderr=sys.stderr, verbose=2,
prefix_output_with_timestamp=True)[0]
Zapisz punkty kontrolne do magazynu ADLS
Kod przedstawia sposób zapisywania punktów kontrolnych na koncie usługi Azure Data Lake Storage (ADLS).
import tempfile
import fsspec
import os
local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file
with open(local_ckpt_file, 'wb') as f:
f.write(best_model_bytes)
## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)
print(adls_ckpt_file)
Ocena modelu wytrenowanego przez Horovod
Po zakończeniu trenowania modelu możemy przyjrzeć się utracie i dokładności końcowego modelu.
import tensorflow as tf
hvd_model = tf.keras.models.load_model(local_ckpt_file)
_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)
Czyszczenie zasobów
Aby upewnić się, że instancja platformy Spark jest wyłączona, zakończ wszystkie połączone sesje i notatniki. Pula zostanie zamknięta po osiągnięciu czasu bezczynności określonego w puli platformy Apache Spark. Możesz również wybrać pozycję Zatrzymaj sesję na pasku stanu w prawym górnym rogu notesu.