Aracılığıyla paylaş


Öğretici: Horovod Estimator ve PyTorch ile Dağıtılmış Eğitim (Önizleme)

Horovod , TensorFlow ve PyTorch gibi kitaplıklar için dağıtılmış bir eğitim çerçevesidir. Horovod ile kullanıcılar mevcut bir eğitim betiğinin ölçeğini artırarak yalnızca birkaç satır kodda yüzlerce GPU üzerinde çalışabilir.

Azure Synapse Analytics'in içinde kullanıcılar, varsayılan Apache Spark 3 çalışma zamanını kullanarak Horovod kullanmaya hızlı bir şekilde başlayabilir. PyTorch kullanan Spark ML işlem hattı uygulamaları için kullanıcılar horovod.spark tahmin aracı API'sini kullanabilir. Bu not defteri, MNIST veri kümesinde dağıtılmış bir sinir ağı (DNN) modelinin dağıtılmış eğitimini gerçekleştirmek için apache Spark veri çerçevesini kullanır. Bu öğreticide eğitim sürecini çalıştırmak için PyTorch ve Horovod Estimator kullanılır.

Önkoşullar

Uyarı

  • GPU hızlandırılmış önizlemesi Apache Spark 3.2 (Destek Sonu duyuruldu) çalışma zamanıyla sınırlıdır. Apache Spark 3.2 için Azure Synapse Runtime desteğinin sona erdiği 8 Temmuz 2023'te duyuruldu. Destek Sonu duyurulu çalışma zamanlarında hata ve özellik düzeltmeleri olmayacaktır. Güvenlik düzeltmeleri, risk değerlendirmesine göre geri aktarılacaktır. Spark 3.2'de bu çalışma zamanı ve ilgili GPU hızlandırılmış önizlemesi 8 Temmuz 2024 itibarıyla kullanımdan kaldırılacak ve devre dışı bırakılacaktır.
  • GPU hızlandırılmış önizlemesi artık Azure Synapse 3.1 (desteklenmeyen) çalışma zamanında desteklenmiyor. Apache Spark 3.1 için Azure Synapse Runtime, 26 Ocak 2023 itibarıyla Destek Sonuna ulaşmıştır ve resmi destek 26 Ocak 2024 tarihinden itibaren sona ermiştir ve bu tarihten sonra destek biletlerini, hata düzeltmelerini veya güvenlik güncelleştirmelerini ele almamaktadır.

Apache Spark oturumunu yapılandırma

Oturumun başında birkaç Apache Spark ayarı yapılandırmamız gerekir. Çoğu durumda yalnızca numExecutors ve spark.rapids.memory.gpu.reserve değerlerini ayarlamamız gerekir. Büyük modeller için kullanıcıların da ayarı yapılandırması spark.kryoserializer.buffer.max gerekebilir. Tensorflow modellerinde kullanıcıların değerini true olarak ayarlaması spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH gerekir.

Örnekte Spark yapılandırmalarının komutuyla %%configure nasıl geçirilebileceğini görebilirsiniz. Her parametrenin ayrıntılı anlamı Apache Spark yapılandırma belgelerinde açıklanmıştır. Sağlanan değerler, Azure Synapse GPU büyük havuzları için önerilen, en iyi yöntem değerleridir.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

Bu öğretici için aşağıdaki yapılandırmaları kullanacağız:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g"
   }
}

Not

Horovod ile eğitim yaparken kullanıcıların Spark yapılandırmasını numExecutors düğüm sayısına eşit veya daha az olacak şekilde ayarlamaları gerekir.

Bağımlılıkları içeri aktarma

Bu öğreticide, veri kümesini okumak ve işlemek için PySpark'ı kullanacağız. Ardından, dağıtılmış sinir ağı (DNN) modelini oluşturmak ve eğitim sürecini çalıştırmak için PyTorch ve Horovod kullanırız. Başlamak için aşağıdaki bağımlılıkları içeri aktarmamız gerekir:

# base libs
import sys
import uuid

# numpy
import numpy as np

# pyspark related
import pyspark
import pyspark.sql.types as T
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

# pytorch related
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# horovod related
import horovod.spark.torch as hvd
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store

# azure related
from azure.synapse.ml.horovodutils import AdlsStore

Alternatif depolama hesabına Bağlan

Ara ve model verilerini depolamak için Azure Data Lake Depolama (ADLS) hesabına ihtiyacımız var. Alternatif bir depolama hesabı kullanıyorsanız, bağlı hizmeti otomatik olarak kimlik doğrulaması yapmak ve hesaptan okumak için ayarladığınızdan emin olun. Ayrıca, şu özellikleri değiştirmeniz gerekir: remote_url, account_nameve linked_service_name.

num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.01  # learning rate for single node code

uuid_str = str(uuid.uuid4())  # with uuid, each run will use a new directory
work_dir = '/tmp/' + uuid_str

# create adls store for model training, use your own adls account info
remote_url = "<<ABFS path to storage account>>"
account_name = "<<name of storage account>>"
linked_service_name = "<<name of linked service>>"
sas_token = TokenLibrary.getConnectionString(linked_service_name)
adls_store_path = remote_url + work_dir

store = AdlsStore.create(adls_store_path,
                         storage_options={
                             'account_name': account_name,
                             'sas_token': sas_token
                         },
                         save_runs=True)

print(adls_store_path)

Veri kümesini hazırlama

Ardından veri kümesini eğitim için hazırlayacağız. Bu öğreticide Azure Open Datasets'teki MNIST veri kümesini kullanacağız.

# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

# Download MNIST dataset from Azure Open Datasets
from azureml.opendatasets import MNIST

mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
mnist_df.info()

# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
mnist_df.head()

Apache Spark ile verileri işleme

Şimdi bir Apache Spark veri çerçevesi oluşturacağız. Bu veri çerçevesi ile eğitim HorovodEstimator için kullanılacaktır.

# Create Spark DataFrame for training
df = spark.createDataFrame(mnist_df)

# repartition DataFrame for training
train_df = df.repartition(num_proc)

# Train/test split
train_df, test_df = train_df.randomSplit([0.9, 0.1])

# show the dataset
train_df.show()
train_df.count()

DNN modelini tanımlama

Veri kümemizi işlemeyi tamamladıktan sonra Artık PyTorch modelimizi tanımlayabiliriz. Aynı kod, tek düğümlü bir PyTorch modelini eğitmek için de kullanılabilir.

# Define the PyTorch model without any Horovod-specific parameters
class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = x.float()
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)


model = Net()
optimizer = optim.SGD(model.parameters(),
                      lr=lr_single_node * num_proc,
                      momentum=0.5)  # notice the lr is scaled up
loss = nn.NLLLoss()

Modeli eğitme

Artık Apache Spark veri çerçevemizin üzerinde horovod Spark tahmin aracı eğitebiliriz.

# Train a Horovod Spark Estimator on the DataFrame
backend = SparkBackend(num_proc=num_proc,
                       stdout=sys.stdout,
                       stderr=sys.stderr,
                       prefix_output_with_timestamp=True)

torch_estimator = hvd.TorchEstimator(
    backend=backend,
    store=store,
    partitions_per_process=1,  # important for GPU training
    model=model,
    optimizer=optimizer,
    loss=lambda input, target: loss(input, target.long()),
    input_shapes=[[-1, 1, 28, 28]],
    feature_cols=['features'],
    label_cols=['label'],
    batch_size=batch_size,
    epochs=epochs,
    validation=0.1,
    verbose=2)

torch_model = torch_estimator.fit(train_df).setOutputCols(['label_prob'])

Eğitilen modeli değerlendirme

Eğitim işlemi tamamlandıktan sonra test veri kümesindeki modeli değerlendirebiliriz.

# Evaluate the model on the held-out test DataFrame
pred_df = torch_model.transform(test_df)

argmax = udf(lambda v: float(np.argmax(v)), returnType=T.DoubleType())
pred_df = pred_df.withColumn('label_pred', argmax(pred_df.label_prob))
evaluator = MulticlassClassificationEvaluator(predictionCol='label_pred',
                                              labelCol='label',
                                              metricName='accuracy')

print('Test accuracy:', evaluator.evaluate(pred_df))

Kaynakları temizleme

Spark örneğinin kapalı olduğundan emin olmak için bağlı oturumları (not defterlerini) sonlandırın. Apache Spark havuzunda belirtilen boşta kalma süresine ulaşıldığında havuz kapatılır. Not defterinin sağ üst kısmındaki durum çubuğunda oturumu durdur'u da seçebilirsiniz.

Durum çubuğunda oturumu durdur düğmesini gösteren ekran görüntüsü.

Sonraki adımlar