Öğretici: Horovod Runner ve TensorFlow ile Dağıtılmış Eğitim (Önizleme)
Horovod , TensorFlow ve PyTorch gibi kitaplıklar için dağıtılmış bir eğitim çerçevesidir. Horovod ile kullanıcılar mevcut bir eğitim betiğinin ölçeğini artırarak yalnızca birkaç satır kodda yüzlerce GPU üzerinde çalışabilir.
Azure Synapse Analytics'in içinde kullanıcılar, varsayılan Apache Spark 3 çalışma zamanını kullanarak Horovod kullanmaya hızlı bir şekilde başlayabilir. TensorFlow kullanan Spark ML işlem hattı uygulamaları için kullanıcılar kullanabilir HorovodRunner
. Bu not defteri, MNIST veri kümesinde dağıtılmış bir sinir ağı (DNN) modelinin dağıtılmış eğitimini gerçekleştirmek için apache Spark veri çerçevesini kullanır. Bu öğreticide TensorFlow ve eğitim sürecini çalıştırmak için kullanılır HorovodRunner
.
Önkoşullar
- Varsayılan depolama alanı olarak yapılandırılmış Azure Data Lake Storage 2. Nesil depolama hesabıyla Azure Synapse Analytics çalışma alanı. Birlikte çalıştığınız Data Lake Storage 2. Nesil dosya sisteminin Depolama Blob Veri Katkıda Bulunanı olmanız gerekir.
- Azure Synapse Analytics çalışma alanınızda GPU özellikli bir Apache Spark havuzu oluşturun. Ayrıntılar için bkz . Azure Synapse'te GPU özellikli Apache Spark havuzu oluşturma. Bu öğretici için 3 düğümlü GPU-Large küme boyutunu kullanmanızı öneririz.
Uyarı
- GPU hızlandırılmış önizlemesi Apache Spark 3.2 (Destek Sonu duyuruldu) çalışma zamanıyla sınırlıdır. Apache Spark 3.2 için Azure Synapse Runtime desteğinin sona erdiği 8 Temmuz 2023'te duyuruldu. Destek Sonu duyurulu çalışma zamanlarında hata ve özellik düzeltmeleri olmayacaktır. Güvenlik düzeltmeleri, risk değerlendirmesine göre geri aktarılacaktır. Spark 3.2'de bu çalışma zamanı ve ilgili GPU hızlandırılmış önizlemesi 8 Temmuz 2024 itibarıyla kullanımdan kaldırılacak ve devre dışı bırakılacaktır.
- GPU hızlandırılmış önizlemesi artık Azure Synapse 3.1 (desteklenmeyen) çalışma zamanında desteklenmiyor. Apache Spark 3.1 için Azure Synapse Runtime, 26 Ocak 2023 itibarıyla Destek Sonuna ulaşmıştır ve resmi destek 26 Ocak 2024 tarihinden itibaren sona ermiştir ve bu tarihten sonra destek biletlerini, hata düzeltmelerini veya güvenlik güncelleştirmelerini ele almamaktadır.
Apache Spark oturumunu yapılandırma
Oturumun başında birkaç Apache Spark ayarı yapılandırmamız gerekir. Çoğu durumda, yalnızca ve spark.rapids.memory.gpu.reserve
'yi numExecutors
ayarlamamız gerekir. Çok büyük modeller için kullanıcıların ayarı da yapılandırması spark.kryoserializer.buffer.max
gerekebilir. TensorFlow modellerinde kullanıcıların değerini true olarak ayarlaması spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH
gerekir.
Örnekte Spark yapılandırmalarının komutuyla %%configure
nasıl geçirilebileceğini görebilirsiniz. Her parametrenin ayrıntılı anlamı Apache Spark yapılandırma belgelerinde açıklanmıştır. Sağlanan değerler, Azure Synapse GPU büyük havuzları için önerilen, en iyi yöntem değerleridir.
%%configure -f
{
"driverMemory": "30g",
"driverCores": 4,
"executorMemory": "60g",
"executorCores": 12,
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
"spark.kryoserializer.buffer.max": "2000m"
}
}
Bu öğretici için aşağıdaki yapılandırmaları kullanırız:
%%configure -f
{
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
}
}
Not
Horovod ile eğitim yaparken kullanıcıların Spark yapılandırmasını numExecutors
düğüm sayısına eşit veya daha az olacak şekilde ayarlamaları gerekir.
Birincil depolama hesabını ayarlama
Ara ve model verilerini depolamak için Azure Data Lake Depolama (ADLS) hesabına ihtiyacımız var. Alternatif bir depolama hesabı kullanıyorsanız, bağlı hizmeti otomatik olarak kimlik doğrulaması yapmak ve hesaptan okumak için ayarladığınızdan emin olun.
Bu örnekte, birincil Azure Synapse Analytics depolama hesabından verileri okuyoruz. Sonuçları okumak için aşağıdaki özellikleri değiştirmeniz gerekir: remote_url
.
# Specify training parameters
num_proc = 3 # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1 # learning rate for single node code
# configure adls store remote url
remote_url = "<<abfss path to storage account>>
Veri kümesini hazırlama
Ardından, veri kümesini eğitim için hazırlarız. Bu öğreticide Azure Açık Veri Kümeleri'ndeki MNIST veri kümesini kullanacağız.
def get_dataset(rank=0, size=1):
# import dependency libs
from azureml.opendatasets import MNIST
from sklearn.preprocessing import OneHotEncoder
import numpy as np
# Download MNIST dataset from Azure Open Datasets
mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
x = np.array(mnist_df['features'].values.tolist())
y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)
enc = OneHotEncoder()
enc.fit(y)
y = enc.transform(y).toarray()
(x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
y[60000:])
# Prepare dataset for distributed training
x_train = x_train[rank::size]
y_train = y_train[rank::size]
x_test = x_test[rank::size]
y_test = y_test[rank::size]
# Reshape and Normalize data for model input
x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255.0
x_test /= 255.0
return (x_train, y_train), (x_test, y_test)
DNN modelini tanımlama
Veri kümemiz işlendikten sonra TensorFlow modelimizi tanımlayabiliriz. Aynı kod tek düğümlü tensorflow modelini eğitmek için de kullanılabilir.
# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
from tensorflow.keras import models
from tensorflow.keras import layers
model = models.Sequential()
model.add(
layers.Conv2D(32,
kernel_size=(3, 3),
activation='relu',
input_shape=(28, 28, 1)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Dropout(0.25))
model.add(layers.Flatten())
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(10, activation='softmax'))
return model
Tek bir düğüm için eğitim işlevi tanımlama
İlk olarak, Apache Spark havuzunun sürücü düğümünde TensorFlow modelimizi eğiteceğiz. Eğitim süreci tamamlandıktan sonra modeli değerlendirir ve kayıp ve doğruluk puanlarını yazdırırız.
def train(learning_rate=0.1):
import tensorflow as tf
from tensorflow import keras
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# Prepare dataset
(x_train, y_train), (x_test, y_test) = get_dataset()
# Initialize model
model = get_model()
# Specify the optimizer (Adadelta in this example)
optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
model.fit(x_train,
y_train,
batch_size=batch_size,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
return model
# Run the training process on the driver
model = train(learning_rate=lr_single_node)
# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)
Dağıtılmış eğitim için HorovodRunner'a geçiş
Ardından, aynı kodun dağıtılmış eğitim için kullanılarak HorovodRunner
nasıl yeniden çalıştırılabildiğine göz atacağız.
Eğitim işlevini tanımlama
Modeli eğitmek için öncelikle için HorovodRunner
bir eğitim işlevi tanımlayacağız.
# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
# Import base libs
import tempfile
import os
import shutil
import atexit
# Import tensorflow modules to each worker
import tensorflow as tf
from tensorflow import keras
import horovod.tensorflow.keras as hvd
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
# These steps are skipped on a CPU cluster
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
'GPU')
# Call the get_dataset function you created, this time with the Horovod rank and size
(x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())
# Initialize model with random weights
model = get_model()
# Adjust learning rate based on number of GPUs
optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
hvd.size())
# Use the Horovod Distributed Optimizer
optimizer = hvd.DistributedOptimizer(optimizer)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
# Create a callback to broadcast the initial variable states from rank 0 to all other processes.
# This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
callbacks = [
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
# Model checkpoint location.
ckpt_dir = tempfile.mkdtemp()
ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
atexit.register(lambda: shutil.rmtree(ckpt_dir))
# Save checkpoints only on worker 0 to prevent conflicts between workers
if hvd.rank() == 0:
callbacks.append(
keras.callbacks.ModelCheckpoint(ckpt_file,
monitor='val_loss',
mode='min',
save_best_only=True))
model.fit(x_train,
y_train,
batch_size=batch_size,
callbacks=callbacks,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
# Return model bytes only on worker 0
if hvd.rank() == 0:
with open(ckpt_file, 'rb') as f:
return f.read()
Eğitimi çalıştırma
Model tanımlandıktan sonra eğitim sürecini çalıştırabiliriz.
# Run training
import os
import sys
import horovod.spark
best_model_bytes = \
horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
env=os.environ.copy(),
stdout=sys.stdout, stderr=sys.stderr, verbose=2,
prefix_output_with_timestamp=True)[0]
Denetim noktalarını ADLS depolama alanına kaydetme
Kod, denetim noktalarının Azure Data Lake Depolama (ADLS) hesabına nasıl kaydedileceklerini gösterir.
import tempfile
import fsspec
import os
local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file
with open(local_ckpt_file, 'wb') as f:
f.write(best_model_bytes)
## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)
print(adls_ckpt_file)
Horovod eğitilmiş modelini değerlendirme
Model eğitimi tamamlandıktan sonra, son modelin kaybına ve doğruluğuna göz atabiliriz.
import tensorflow as tf
hvd_model = tf.keras.models.load_model(local_ckpt_file)
_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)
Kaynakları temizleme
Spark örneğinin kapalı olduğundan emin olmak için bağlı oturumları (not defterlerini) sonlandırın. Apache Spark havuzunda belirtilen boşta kalma süresine ulaşıldığında havuz kapatılır. Not defterinin sağ üst kısmındaki durum çubuğunda oturumu durdur'u da seçebilirsiniz.
Sonraki adımlar
Geri Bildirim
https://aka.ms/ContentUserFeedback.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz.Gönderin ve geri bildirimi görüntüleyin