共用方式為


教學課程:使用 Horovod 執行器和 Tensorflow 的分散式定型 (已淘汰)

Horovod 為分散式定型架構,適用於 TensorFlow 和 PyTorch 這類程式庫。 透過 Horovod,使用者可相應增加現有的定型指令碼,只要幾行程式碼就能在數百個 GPU 上執行。

在 Azure Synapse Analytics 中,使用者可以使用預設的 Apache Spark 3 執行階段快速開始使用 Horovod。 針對使用 Tensorflow 的 Spark ML 管線應用程式,使用者可以使用 HorovodRunner。 此筆記本會使用 Apache Spark 資料框架,在 MNIST 資料集上執行分散式神經網路 (DNN) 模型的分散式定型。 本教學課程會使用 TensorFlow 和 HorovodRunner 來執行定型程序。

必要條件

  • Azure Synapse Analytics 工作區,其中 Azure Data Lake Storage Gen2 儲存體帳戶已設定為預設儲存體。 使用 Data Lake Storage Gen2 檔案系統時,您必須是該檔案系統的儲存體 Blob 資料參與者
  • 在 Azure Synapse Analytics 工作區中啟用 GPU 的 Apache Spark 集區。 如需詳細資訊,請參閱在 Azure Synapse 中建立已啟用 GPU 的 Apache Spark 集區。 在本教學課程中,我們建議使用具有 3 個節點 GPU-Large 叢集大小。

注意

Azure Synapse 啟用 GPU 預覽功能集區現已取代。

設定 Apache Spark 工作階段

在工作階段開始時,我們必須設定一些 Apache Spark 設定。 在大部分情況下,我們只需要設定 numExecutorsspark.rapids.memory.gpu.reserve。 對於非常大型的模型,使用者可能也須進行 spark.kryoserializer.buffer.max 設定。 對於 Tensorflow 模型,使用者必須將 spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH 設為 true。

在範例中,您可以看到如何使用 %%configure 命令傳遞 Spark 設定。 Apache Spark 設定文件中會說明每個參數的詳細意義。 所提供的值是 Azure Synapse GPU-large 集區的建議最佳做法值。


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

在本教學課程中,我們會使用下列設定:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
   }
}

注意

使用 Horovod 進行定型時,使用者應將 numExecutors 的 Spark 設定設為小於或等於節點數目。

設定主要儲存體帳戶

我們需要 Azure Data Lake Storage (ADLS) 帳戶來儲存中繼和模型資料。 若使用替代的儲存體帳戶,請務必將連結的服務設為從帳戶自動驗證和讀取。

在此範例中,我們會從主要 Azure Synapse Analytics 儲存體帳戶中讀取資料。 若要讀取結果,您需要修改下列屬性:remote_url

# Specify training parameters
num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1  # learning rate for single node code

# configure adls store remote url
remote_url = "<<abfss path to storage account>>

準備資料集

接下來,我們會準備資料集以進行定型。 在本教學課程中,我們會使用來自 Azure 開放資料集的 MNIST 資料集。

def get_dataset(rank=0, size=1):
    # import dependency libs
    from azureml.opendatasets import MNIST
    from sklearn.preprocessing import OneHotEncoder
    import numpy as np

    # Download MNIST dataset from Azure Open Datasets
    mnist = MNIST.get_tabular_dataset()
    mnist_df = mnist.to_pandas_dataframe()

    # Preprocess dataset
    mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
    mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)

    x = np.array(mnist_df['features'].values.tolist())
    y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)

    enc = OneHotEncoder()
    enc.fit(y)
    y = enc.transform(y).toarray()

    (x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
                                                                    y[60000:])

    # Prepare dataset for distributed training
    x_train = x_train[rank::size]
    y_train = y_train[rank::size]
    x_test = x_test[rank::size]
    y_test = y_test[rank::size]

    # Reshape and Normalize data for model input
    x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
    x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255.0
    x_test /= 255.0

    return (x_train, y_train), (x_test, y_test)

定義 DNN 模型

處理資料集之後,我們可以定義 TensorFlow 模型。 相同的程式碼也可以用來定型單一節點 TensorFlow 模型。

# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
    from tensorflow.keras import models
    from tensorflow.keras import layers

    model = models.Sequential()
    model.add(
        layers.Conv2D(32,
                      kernel_size=(3, 3),
                      activation='relu',
                      input_shape=(28, 28, 1)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(layers.Dropout(0.25))
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(10, activation='softmax'))
    return model

定義單一節點的定型函數

首先,我們會在 Apache Spark 集區的驅動程式節點上定型 Tensorflow 模型。 完成程序的定型之後,我們會評估模型並列印遺失和正確性分數。


def train(learning_rate=0.1):
    import tensorflow as tf
    from tensorflow import keras

    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

    # Prepare dataset
    (x_train, y_train), (x_test, y_test) = get_dataset()

    # Initialize model
    model = get_model()

    # Specify the optimizer (Adadelta in this example)
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))
    return model

# Run the training process on the driver
model = train(learning_rate=lr_single_node)

# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)

遷移至 HorovodRunner 以進行分散式定型

接下來,我們將探討如何使用 HorovodRunner 重新執行相同的程式碼來進行分散式定型。

定義定型函數

若要將模型定型,我們會先定義 HorovodRunner 的定型函數。

# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
    # Import base libs
    import tempfile
    import os
    import shutil
    import atexit

    # Import tensorflow modules to each worker
    import tensorflow as tf
    from tensorflow import keras
    import horovod.tensorflow.keras as hvd

    # Initialize Horovod
    hvd.init()

    # Pin GPU to be used to process local rank (one GPU per process)
    # These steps are skipped on a CPU cluster
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
                                                   'GPU')

    # Call the get_dataset function you created, this time with the Horovod rank and size
    (x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())

    # Initialize model with random weights
    model = get_model()

    # Adjust learning rate based on number of GPUs
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
                                          hvd.size())

    # Use the Horovod Distributed Optimizer
    optimizer = hvd.DistributedOptimizer(optimizer)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    # Create a callback to broadcast the initial variable states from rank 0 to all other processes.
    # This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
    callbacks = [
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    ]

    # Model checkpoint location.
    ckpt_dir = tempfile.mkdtemp()
    ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
    atexit.register(lambda: shutil.rmtree(ckpt_dir))

    # Save checkpoints only on worker 0 to prevent conflicts between workers
    if hvd.rank() == 0:
        callbacks.append(
            keras.callbacks.ModelCheckpoint(ckpt_file,
                                            monitor='val_loss',
                                            mode='min',
                                            save_best_only=True))

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              callbacks=callbacks,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))

    # Return model bytes only on worker 0
    if hvd.rank() == 0:
        with open(ckpt_file, 'rb') as f:
            return f.read()

執行定型

定義模型之後,我們可以執行程序的定型。

# Run training
import os
import sys
import horovod.spark


best_model_bytes = \
    horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
                    env=os.environ.copy(),
                    stdout=sys.stdout, stderr=sys.stderr, verbose=2,
                    prefix_output_with_timestamp=True)[0]

將檢查點儲存至 ADLS 儲存體

程式碼示範如何將檢查點儲存至 Azure Data Lake Storage (ADLS) 帳戶。

import tempfile
import fsspec
import os

local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file

with open(local_ckpt_file, 'wb') as f:
    f.write(best_model_bytes)

## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)

print(adls_ckpt_file)

評估 Horovod 定型模型

完成模型的定型之後,就可以查看最終模型的遺失和正確性。

import tensorflow as tf

hvd_model = tf.keras.models.load_model(local_ckpt_file)

_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)

清除資源

為確保 Spark 執行個體已關閉,請結束任何已連線的工作階段 (Notebook)。 當達到 Apache Spark 集區中指定的閒置時間時,集區就會關閉。 您也可以在筆記本右上方的狀態列,選取 [停止工作階段]

顯示狀態列上 [停止工作階段] 按鈕的螢幕擷取畫面。

下一步