مشاركة عبر


البرنامج التعليمي: التدريب الموزع باستخدام Horovod Runner وTensorFlow (مهمل)

Horovod هو إطار عمل تدريبي موزع لمكتبات مثل TensorFlow وPyTorch. باستخدام Horovod، يمكن للمستخدمين توسيع نطاق برنامج نصي للتدريب موجود للتشغيل على مئات وحدات معالجة الرسومات في بضعة أسطر فقط من التعليمات البرمجية.

داخل Azure Synapse Analytics، يمكن للمستخدمين البدء بسرعة في Horovod باستخدام وقت تشغيل Apache Spark 3 الافتراضي. بالنسبة لتطبيقات البنية الأساسية لبرنامج ربط العمليات التجارية ل Spark ML باستخدام TensorFlow، يمكن للمستخدمين استخدام HorovodRunner. يستخدم دفتر الملاحظات هذا إطار بيانات Apache Spark لإجراء تدريب موزع لنموذج شبكة عصبية موزعة (DNN) على مجموعة بيانات MNIST. يستخدم هذا البرنامج التعليمي TensorFlow و HorovodRunner لتشغيل عملية التدريب.

المتطلبات الأساسية

  • مساحة عمل Azure Synapse Analytics مع حساب تخزين Azure Data Lake Storage Gen2 الذي تم تكوينه كمخزن افتراضي. يجب أن تكون Storage Blob Data Contributor لنظام ملفات Data Lake Storage Gen2 التي تعمل معها.
  • قم بإنشاء تجمع Apache Spark ممكّن بواسطة GPU في مساحة عمل تحليلات Azure Synapse. للحصول على التفاصيل، راجع إنشاء تجمع Apache Spark ممكّن بواسطة GPU في Azure Synapse. بالنسبة لهذا البرنامج التعليمي، نقترح استخدام حجم نظام مجموعة GPU-Large مع 3 عقد.

إشعار

تم الآن إهمال معاينة التجمعات الممكنة ل Azure Synapse GPU.

تكوين جلسة Apache Spark

في بداية الجلسة، نحتاج إلى تكوين بعض إعدادات Apache Spark. في معظم الحالات، نحتاج فقط إلى تعيين numExecutors و spark.rapids.memory.gpu.reserve. بالنسبة للنماذج الكبيرة جدًا، قد يحتاج المستخدمون أيضًا إلى تكوين الإعداد spark.kryoserializer.buffer.max. بالنسبة لنماذج TensorFlow، يحتاج المستخدمون إلى تعيين spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH ليكون صحيحا.

في المثال، يمكنك أن ترى كيف يمكن تمرير تكوينات Spark باستخدام %%configure الأمر . يتم شرح المعنى التفصيلي لكل معلمة في وثائق تكوين Apache Spark. القيم المقدمة هي قيم أفضل الممارسات المقترحة لتجمعات Azure Synapse GPU الكبيرة.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

لهذا البرنامج التعليمي، نستخدم التكوينات التالية:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
   }
}

إشعار

عند التدريب باستخدام Horovod، يجب على المستخدمين تعيين تكوين Spark ليكون numExecutors أقل أو مساويًا لعدد العقد.

إعداد حساب التخزين الأساسي

نحن بحاجة إلى حساب Azure Data Lake Storage (ADLS) لتخزين البيانات المتوسطة والنموذجية. إذا كنت تستخدم حساب تخزين بديل، تأكد من إعداد الخدمة المرتبطة للمصادقة والقراءة تلقائيًا من الحساب.

في هذا المثال، نقرأ البيانات من حساب تخزين Azure Synapse Analytics الأساسي. لقراءة النتائج، تحتاج إلى تعديل الخصائص التالية: remote_url.

# Specify training parameters
num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1  # learning rate for single node code

# configure adls store remote url
remote_url = "<<abfss path to storage account>>

إعداد مجموعة البيانات

بعد ذلك، نقوم بإعداد مجموعة البيانات للتدريب. في هذا البرنامج التعليمي، نستخدم مجموعة بيانات MNIST من Azure Open Datasets.

def get_dataset(rank=0, size=1):
    # import dependency libs
    from azureml.opendatasets import MNIST
    from sklearn.preprocessing import OneHotEncoder
    import numpy as np

    # Download MNIST dataset from Azure Open Datasets
    mnist = MNIST.get_tabular_dataset()
    mnist_df = mnist.to_pandas_dataframe()

    # Preprocess dataset
    mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
    mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)

    x = np.array(mnist_df['features'].values.tolist())
    y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)

    enc = OneHotEncoder()
    enc.fit(y)
    y = enc.transform(y).toarray()

    (x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
                                                                    y[60000:])

    # Prepare dataset for distributed training
    x_train = x_train[rank::size]
    y_train = y_train[rank::size]
    x_test = x_test[rank::size]
    y_test = y_test[rank::size]

    # Reshape and Normalize data for model input
    x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
    x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255.0
    x_test /= 255.0

    return (x_train, y_train), (x_test, y_test)

تعريف نموذج DNN

بمجرد معالجة مجموعة البيانات الخاصة بنا، يمكننا تحديد نموذج TensorFlow. يمكن أيضا استخدام نفس التعليمات البرمجية لتدريب نموذج TensorFlow أحادي العقدة.

# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
    from tensorflow.keras import models
    from tensorflow.keras import layers

    model = models.Sequential()
    model.add(
        layers.Conv2D(32,
                      kernel_size=(3, 3),
                      activation='relu',
                      input_shape=(28, 28, 1)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(layers.Dropout(0.25))
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(10, activation='softmax'))
    return model

تعريف دالة تدريب لعقدة واحدة

أولا، نقوم بتدريب نموذج TensorFlow الخاص بنا على عقدة برنامج التشغيل لتجمع Apache Spark. بمجرد اكتمال عملية التدريب، نقوم بتقييم النموذج وطباعة درجات الخسارة والدقة.


def train(learning_rate=0.1):
    import tensorflow as tf
    from tensorflow import keras

    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

    # Prepare dataset
    (x_train, y_train), (x_test, y_test) = get_dataset()

    # Initialize model
    model = get_model()

    # Specify the optimizer (Adadelta in this example)
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))
    return model

# Run the training process on the driver
model = train(learning_rate=lr_single_node)

# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)

الترحيل إلى HorovodRunner للتدريب الموزع

بعد ذلك، سنلقي نظرة على كيفية إعادة تشغيل نفس التعليمات البرمجية باستخدام HorovodRunner للتدريب الموزع.

تعريف دالة التدريب

لتدريب نموذج، نقوم أولا بتعريف وظيفة تدريب ل HorovodRunner.

# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
    # Import base libs
    import tempfile
    import os
    import shutil
    import atexit

    # Import tensorflow modules to each worker
    import tensorflow as tf
    from tensorflow import keras
    import horovod.tensorflow.keras as hvd

    # Initialize Horovod
    hvd.init()

    # Pin GPU to be used to process local rank (one GPU per process)
    # These steps are skipped on a CPU cluster
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
                                                   'GPU')

    # Call the get_dataset function you created, this time with the Horovod rank and size
    (x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())

    # Initialize model with random weights
    model = get_model()

    # Adjust learning rate based on number of GPUs
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
                                          hvd.size())

    # Use the Horovod Distributed Optimizer
    optimizer = hvd.DistributedOptimizer(optimizer)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    # Create a callback to broadcast the initial variable states from rank 0 to all other processes.
    # This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
    callbacks = [
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    ]

    # Model checkpoint location.
    ckpt_dir = tempfile.mkdtemp()
    ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
    atexit.register(lambda: shutil.rmtree(ckpt_dir))

    # Save checkpoints only on worker 0 to prevent conflicts between workers
    if hvd.rank() == 0:
        callbacks.append(
            keras.callbacks.ModelCheckpoint(ckpt_file,
                                            monitor='val_loss',
                                            mode='min',
                                            save_best_only=True))

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              callbacks=callbacks,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))

    # Return model bytes only on worker 0
    if hvd.rank() == 0:
        with open(ckpt_file, 'rb') as f:
            return f.read()

تشغيل التدريب

بمجرد تحديد النموذج، يمكننا تشغيل عملية التدريب.

# Run training
import os
import sys
import horovod.spark


best_model_bytes = \
    horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
                    env=os.environ.copy(),
                    stdout=sys.stdout, stderr=sys.stderr, verbose=2,
                    prefix_output_with_timestamp=True)[0]

حفظ نقاط التحقق في تخزين ADLS

توضح التعليمات البرمجية كيفية حفظ نقاط التحقق في حساب Azure Data Lake Storage (ADLS).

import tempfile
import fsspec
import os

local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file

with open(local_ckpt_file, 'wb') as f:
    f.write(best_model_bytes)

## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)

print(adls_ckpt_file)

تقييم نموذج Horovod المدرب

بمجرد اكتمال تدريب النموذج، يمكننا بعد ذلك إلقاء نظرة على الخسارة والدقة للنموذج النهائي.

import tensorflow as tf

hvd_model = tf.keras.models.load_model(local_ckpt_file)

_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)

تنظيف الموارد

لضمان إيقاف تشغيل مثيل Spark، قم بإنهاء أي جلسات عمل متصلة (دفاتر ملاحظات). يتم إيقاف تشغيل التجمع عند الوصول إلى وقت الخمول المحدد في تجمع Apache Spark. يمكنك أيضا تحديد "stop session" من شريط المعلومات في أسفل دفتر الملاحظات.

لقطة شاشة تعرض زر إيقاف الجلسة على شريط الحالة.

الخطوات التالية