إشعار
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تسجيل الدخول أو تغيير الدلائل.
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تغيير الدلائل.
Horovod هو إطار عمل تدريبي موزع لمكتبات مثل TensorFlow وPyTorch. باستخدام Horovod، يمكن للمستخدمين توسيع نطاق برنامج نصي للتدريب موجود للتشغيل على مئات وحدات معالجة الرسومات في بضعة أسطر فقط من التعليمات البرمجية.
داخل Azure Synapse Analytics، يمكن للمستخدمين البدء بسرعة في Horovod باستخدام وقت تشغيل Apache Spark 3 الافتراضي. بالنسبة لتطبيقات البنية الأساسية لبرنامج ربط العمليات التجارية ل Spark ML باستخدام TensorFlow، يمكن للمستخدمين استخدام HorovodRunner. يستخدم دفتر الملاحظات هذا إطار بيانات Apache Spark لإجراء تدريب موزع لنموذج شبكة عصبية موزعة (DNN) على مجموعة بيانات MNIST. يستخدم هذا البرنامج التعليمي TensorFlow و HorovodRunner لتشغيل عملية التدريب.
المتطلبات الأساسية
- مساحة عمل Azure Synapse Analytics مع حساب تخزين Azure Data Lake Storage Gen2 الذي تم تكوينه كمخزن افتراضي. يجب أن تكون Storage Blob Data Contributor لنظام ملفات Data Lake Storage Gen2 التي تعمل معها.
- قم بإنشاء تجمع Apache Spark ممكّن بواسطة GPU في مساحة عمل تحليلات Azure Synapse. للحصول على التفاصيل، راجع إنشاء تجمع Apache Spark ممكّن بواسطة GPU في Azure Synapse. بالنسبة لهذا البرنامج التعليمي، نقترح استخدام حجم نظام مجموعة GPU-Large مع 3 عقد.
إشعار
تم الآن إهمال معاينة التجمعات الممكنة ل Azure Synapse GPU.
تكوين جلسة Apache Spark
في بداية الجلسة، نحتاج إلى تكوين بعض إعدادات Apache Spark. في معظم الحالات، نحتاج فقط إلى تعيين numExecutors و spark.rapids.memory.gpu.reserve. بالنسبة للنماذج الكبيرة جدًا، قد يحتاج المستخدمون أيضًا إلى تكوين الإعداد spark.kryoserializer.buffer.max. بالنسبة لنماذج TensorFlow، يحتاج المستخدمون إلى تعيين spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH ليكون صحيحا.
في المثال، يمكنك أن ترى كيف يمكن تمرير تكوينات Spark باستخدام %%configure الأمر . يتم شرح المعنى التفصيلي لكل معلمة في وثائق تكوين Apache Spark. القيم المقدمة هي قيم أفضل الممارسات المقترحة لتجمعات Azure Synapse GPU الكبيرة.
%%configure -f
{
"driverMemory": "30g",
"driverCores": 4,
"executorMemory": "60g",
"executorCores": 12,
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
"spark.kryoserializer.buffer.max": "2000m"
}
}
لهذا البرنامج التعليمي، نستخدم التكوينات التالية:
%%configure -f
{
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
}
}
إشعار
عند التدريب باستخدام Horovod، يجب على المستخدمين تعيين تكوين Spark ليكون numExecutors أقل أو مساويًا لعدد العقد.
إعداد حساب التخزين الأساسي
نحن بحاجة إلى حساب Azure Data Lake Storage (ADLS) لتخزين البيانات المتوسطة والنموذجية. إذا كنت تستخدم حساب تخزين بديل، تأكد من إعداد الخدمة المرتبطة للمصادقة والقراءة تلقائيًا من الحساب.
في هذا المثال، نقرأ البيانات من حساب تخزين Azure Synapse Analytics الأساسي. لقراءة النتائج، تحتاج إلى تعديل الخصائص التالية: remote_url.
# Specify training parameters
num_proc = 3 # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1 # learning rate for single node code
# configure adls store remote url
remote_url = "<<abfss path to storage account>>
إعداد مجموعة البيانات
بعد ذلك، نقوم بإعداد مجموعة البيانات للتدريب. في هذا البرنامج التعليمي، نستخدم مجموعة بيانات MNIST من Azure Open Datasets.
def get_dataset(rank=0, size=1):
# import dependency libs
from azureml.opendatasets import MNIST
from sklearn.preprocessing import OneHotEncoder
import numpy as np
# Download MNIST dataset from Azure Open Datasets
mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
x = np.array(mnist_df['features'].values.tolist())
y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)
enc = OneHotEncoder()
enc.fit(y)
y = enc.transform(y).toarray()
(x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
y[60000:])
# Prepare dataset for distributed training
x_train = x_train[rank::size]
y_train = y_train[rank::size]
x_test = x_test[rank::size]
y_test = y_test[rank::size]
# Reshape and Normalize data for model input
x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255.0
x_test /= 255.0
return (x_train, y_train), (x_test, y_test)
تعريف نموذج DNN
بمجرد معالجة مجموعة البيانات الخاصة بنا، يمكننا تحديد نموذج TensorFlow. يمكن أيضا استخدام نفس التعليمات البرمجية لتدريب نموذج TensorFlow أحادي العقدة.
# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
from tensorflow.keras import models
from tensorflow.keras import layers
model = models.Sequential()
model.add(
layers.Conv2D(32,
kernel_size=(3, 3),
activation='relu',
input_shape=(28, 28, 1)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Dropout(0.25))
model.add(layers.Flatten())
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(10, activation='softmax'))
return model
تعريف دالة تدريب لعقدة واحدة
أولا، نقوم بتدريب نموذج TensorFlow الخاص بنا على عقدة برنامج التشغيل لتجمع Apache Spark. بمجرد اكتمال عملية التدريب، نقوم بتقييم النموذج وطباعة درجات الخسارة والدقة.
def train(learning_rate=0.1):
import tensorflow as tf
from tensorflow import keras
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# Prepare dataset
(x_train, y_train), (x_test, y_test) = get_dataset()
# Initialize model
model = get_model()
# Specify the optimizer (Adadelta in this example)
optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
model.fit(x_train,
y_train,
batch_size=batch_size,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
return model
# Run the training process on the driver
model = train(learning_rate=lr_single_node)
# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)
الترحيل إلى HorovodRunner للتدريب الموزع
بعد ذلك، سنلقي نظرة على كيفية إعادة تشغيل نفس التعليمات البرمجية باستخدام HorovodRunner للتدريب الموزع.
تعريف دالة التدريب
لتدريب نموذج، نقوم أولا بتعريف وظيفة تدريب ل HorovodRunner.
# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
# Import base libs
import tempfile
import os
import shutil
import atexit
# Import tensorflow modules to each worker
import tensorflow as tf
from tensorflow import keras
import horovod.tensorflow.keras as hvd
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
# These steps are skipped on a CPU cluster
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
'GPU')
# Call the get_dataset function you created, this time with the Horovod rank and size
(x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())
# Initialize model with random weights
model = get_model()
# Adjust learning rate based on number of GPUs
optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
hvd.size())
# Use the Horovod Distributed Optimizer
optimizer = hvd.DistributedOptimizer(optimizer)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
# Create a callback to broadcast the initial variable states from rank 0 to all other processes.
# This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
callbacks = [
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
# Model checkpoint location.
ckpt_dir = tempfile.mkdtemp()
ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
atexit.register(lambda: shutil.rmtree(ckpt_dir))
# Save checkpoints only on worker 0 to prevent conflicts between workers
if hvd.rank() == 0:
callbacks.append(
keras.callbacks.ModelCheckpoint(ckpt_file,
monitor='val_loss',
mode='min',
save_best_only=True))
model.fit(x_train,
y_train,
batch_size=batch_size,
callbacks=callbacks,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
# Return model bytes only on worker 0
if hvd.rank() == 0:
with open(ckpt_file, 'rb') as f:
return f.read()
تشغيل التدريب
بمجرد تحديد النموذج، يمكننا تشغيل عملية التدريب.
# Run training
import os
import sys
import horovod.spark
best_model_bytes = \
horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
env=os.environ.copy(),
stdout=sys.stdout, stderr=sys.stderr, verbose=2,
prefix_output_with_timestamp=True)[0]
حفظ نقاط التحقق في تخزين ADLS
توضح التعليمات البرمجية كيفية حفظ نقاط التحقق في حساب Azure Data Lake Storage (ADLS).
import tempfile
import fsspec
import os
local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file
with open(local_ckpt_file, 'wb') as f:
f.write(best_model_bytes)
## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)
print(adls_ckpt_file)
تقييم نموذج Horovod المدرب
بمجرد اكتمال تدريب النموذج، يمكننا بعد ذلك إلقاء نظرة على الخسارة والدقة للنموذج النهائي.
import tensorflow as tf
hvd_model = tf.keras.models.load_model(local_ckpt_file)
_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)
تنظيف الموارد
لضمان إيقاف تشغيل مثيل Spark، قم بإنهاء أي جلسات عمل متصلة (دفاتر ملاحظات). يتم إيقاف تشغيل التجمع عند الوصول إلى وقت الخمول المحدد في تجمع Apache Spark. يمكنك أيضا تحديد "stop session" من شريط المعلومات في أسفل دفتر الملاحظات.