Aracılığıyla paylaş


Öğretici: Horovod Runner ve TensorFlow ile Dağıtılmış Eğitim (Önizleme)

Horovod , TensorFlow ve PyTorch gibi kitaplıklar için dağıtılmış bir eğitim çerçevesidir. Horovod ile kullanıcılar mevcut bir eğitim betiğinin ölçeğini artırarak yalnızca birkaç satır kodda yüzlerce GPU üzerinde çalışabilir.

Azure Synapse Analytics'in içinde kullanıcılar, varsayılan Apache Spark 3 çalışma zamanını kullanarak Horovod kullanmaya hızlı bir şekilde başlayabilir. TensorFlow kullanan Spark ML işlem hattı uygulamaları için kullanıcılar kullanabilir HorovodRunner. Bu not defteri, MNIST veri kümesinde dağıtılmış bir sinir ağı (DNN) modelinin dağıtılmış eğitimini gerçekleştirmek için apache Spark veri çerçevesini kullanır. Bu öğreticide TensorFlow ve eğitim sürecini çalıştırmak için kullanılır HorovodRunner .

Önkoşullar

Uyarı

  • GPU hızlandırılmış önizlemesi Apache Spark 3.2 (Destek Sonu duyuruldu) çalışma zamanıyla sınırlıdır. Apache Spark 3.2 için Azure Synapse Runtime desteğinin sona erdiği 8 Temmuz 2023'te duyuruldu. Destek Sonu duyurulu çalışma zamanlarında hata ve özellik düzeltmeleri olmayacaktır. Güvenlik düzeltmeleri, risk değerlendirmesine göre geri aktarılacaktır. Spark 3.2'de bu çalışma zamanı ve ilgili GPU hızlandırılmış önizlemesi 8 Temmuz 2024 itibarıyla kullanımdan kaldırılacak ve devre dışı bırakılacaktır.
  • GPU hızlandırılmış önizlemesi artık Azure Synapse 3.1 (desteklenmeyen) çalışma zamanında desteklenmiyor. Apache Spark 3.1 için Azure Synapse Runtime, 26 Ocak 2023 itibarıyla Destek Sonuna ulaşmıştır ve resmi destek 26 Ocak 2024 tarihinden itibaren sona ermiştir ve bu tarihten sonra destek biletlerini, hata düzeltmelerini veya güvenlik güncelleştirmelerini ele almamaktadır.

Apache Spark oturumunu yapılandırma

Oturumun başında birkaç Apache Spark ayarı yapılandırmamız gerekir. Çoğu durumda, yalnızca ve spark.rapids.memory.gpu.reserve'yi numExecutors ayarlamamız gerekir. Çok büyük modeller için kullanıcıların ayarı da yapılandırması spark.kryoserializer.buffer.max gerekebilir. TensorFlow modellerinde kullanıcıların değerini true olarak ayarlaması spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH gerekir.

Örnekte Spark yapılandırmalarının komutuyla %%configure nasıl geçirilebileceğini görebilirsiniz. Her parametrenin ayrıntılı anlamı Apache Spark yapılandırma belgelerinde açıklanmıştır. Sağlanan değerler, Azure Synapse GPU büyük havuzları için önerilen, en iyi yöntem değerleridir.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

Bu öğretici için aşağıdaki yapılandırmaları kullanırız:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
   }
}

Not

Horovod ile eğitim yaparken kullanıcıların Spark yapılandırmasını numExecutors düğüm sayısına eşit veya daha az olacak şekilde ayarlamaları gerekir.

Birincil depolama hesabını ayarlama

Ara ve model verilerini depolamak için Azure Data Lake Depolama (ADLS) hesabına ihtiyacımız var. Alternatif bir depolama hesabı kullanıyorsanız, bağlı hizmeti otomatik olarak kimlik doğrulaması yapmak ve hesaptan okumak için ayarladığınızdan emin olun.

Bu örnekte, birincil Azure Synapse Analytics depolama hesabından verileri okuyoruz. Sonuçları okumak için aşağıdaki özellikleri değiştirmeniz gerekir: remote_url.

# Specify training parameters
num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1  # learning rate for single node code

# configure adls store remote url
remote_url = "<<abfss path to storage account>>

Veri kümesini hazırlama

Ardından, veri kümesini eğitim için hazırlarız. Bu öğreticide Azure Açık Veri Kümeleri'ndeki MNIST veri kümesini kullanacağız.

def get_dataset(rank=0, size=1):
    # import dependency libs
    from azureml.opendatasets import MNIST
    from sklearn.preprocessing import OneHotEncoder
    import numpy as np

    # Download MNIST dataset from Azure Open Datasets
    mnist = MNIST.get_tabular_dataset()
    mnist_df = mnist.to_pandas_dataframe()

    # Preprocess dataset
    mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
    mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)

    x = np.array(mnist_df['features'].values.tolist())
    y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)

    enc = OneHotEncoder()
    enc.fit(y)
    y = enc.transform(y).toarray()

    (x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
                                                                    y[60000:])

    # Prepare dataset for distributed training
    x_train = x_train[rank::size]
    y_train = y_train[rank::size]
    x_test = x_test[rank::size]
    y_test = y_test[rank::size]

    # Reshape and Normalize data for model input
    x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
    x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255.0
    x_test /= 255.0

    return (x_train, y_train), (x_test, y_test)

DNN modelini tanımlama

Veri kümemiz işlendikten sonra TensorFlow modelimizi tanımlayabiliriz. Aynı kod tek düğümlü tensorflow modelini eğitmek için de kullanılabilir.

# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
    from tensorflow.keras import models
    from tensorflow.keras import layers

    model = models.Sequential()
    model.add(
        layers.Conv2D(32,
                      kernel_size=(3, 3),
                      activation='relu',
                      input_shape=(28, 28, 1)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(layers.Dropout(0.25))
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(10, activation='softmax'))
    return model

Tek bir düğüm için eğitim işlevi tanımlama

İlk olarak, Apache Spark havuzunun sürücü düğümünde TensorFlow modelimizi eğiteceğiz. Eğitim süreci tamamlandıktan sonra modeli değerlendirir ve kayıp ve doğruluk puanlarını yazdırırız.


def train(learning_rate=0.1):
    import tensorflow as tf
    from tensorflow import keras

    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

    # Prepare dataset
    (x_train, y_train), (x_test, y_test) = get_dataset()

    # Initialize model
    model = get_model()

    # Specify the optimizer (Adadelta in this example)
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))
    return model

# Run the training process on the driver
model = train(learning_rate=lr_single_node)

# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)

Dağıtılmış eğitim için HorovodRunner'a geçiş

Ardından, aynı kodun dağıtılmış eğitim için kullanılarak HorovodRunner nasıl yeniden çalıştırılabildiğine göz atacağız.

Eğitim işlevini tanımlama

Modeli eğitmek için öncelikle için HorovodRunnerbir eğitim işlevi tanımlayacağız.

# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
    # Import base libs
    import tempfile
    import os
    import shutil
    import atexit

    # Import tensorflow modules to each worker
    import tensorflow as tf
    from tensorflow import keras
    import horovod.tensorflow.keras as hvd

    # Initialize Horovod
    hvd.init()

    # Pin GPU to be used to process local rank (one GPU per process)
    # These steps are skipped on a CPU cluster
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
                                                   'GPU')

    # Call the get_dataset function you created, this time with the Horovod rank and size
    (x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())

    # Initialize model with random weights
    model = get_model()

    # Adjust learning rate based on number of GPUs
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
                                          hvd.size())

    # Use the Horovod Distributed Optimizer
    optimizer = hvd.DistributedOptimizer(optimizer)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    # Create a callback to broadcast the initial variable states from rank 0 to all other processes.
    # This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
    callbacks = [
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    ]

    # Model checkpoint location.
    ckpt_dir = tempfile.mkdtemp()
    ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
    atexit.register(lambda: shutil.rmtree(ckpt_dir))

    # Save checkpoints only on worker 0 to prevent conflicts between workers
    if hvd.rank() == 0:
        callbacks.append(
            keras.callbacks.ModelCheckpoint(ckpt_file,
                                            monitor='val_loss',
                                            mode='min',
                                            save_best_only=True))

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              callbacks=callbacks,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))

    # Return model bytes only on worker 0
    if hvd.rank() == 0:
        with open(ckpt_file, 'rb') as f:
            return f.read()

Eğitimi çalıştırma

Model tanımlandıktan sonra eğitim sürecini çalıştırabiliriz.

# Run training
import os
import sys
import horovod.spark


best_model_bytes = \
    horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
                    env=os.environ.copy(),
                    stdout=sys.stdout, stderr=sys.stderr, verbose=2,
                    prefix_output_with_timestamp=True)[0]

Denetim noktalarını ADLS depolama alanına kaydetme

Kod, denetim noktalarının Azure Data Lake Depolama (ADLS) hesabına nasıl kaydedileceklerini gösterir.

import tempfile
import fsspec
import os

local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file

with open(local_ckpt_file, 'wb') as f:
    f.write(best_model_bytes)

## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)

print(adls_ckpt_file)

Horovod eğitilmiş modelini değerlendirme

Model eğitimi tamamlandıktan sonra, son modelin kaybına ve doğruluğuna göz atabiliriz.

import tensorflow as tf

hvd_model = tf.keras.models.load_model(local_ckpt_file)

_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)

Kaynakları temizleme

Spark örneğinin kapalı olduğundan emin olmak için bağlı oturumları (not defterlerini) sonlandırın. Apache Spark havuzunda belirtilen boşta kalma süresine ulaşıldığında havuz kapatılır. Not defterinin sağ üst kısmındaki durum çubuğunda oturumu durdur'u da seçebilirsiniz.

Durum çubuğunda oturumu durdur düğmesini gösteren ekran görüntüsü.

Sonraki adımlar