자습서: Horovod 예측 도구 및 PyTorch를 사용한 분산 학습(사용되지 않음)

Horovod는 TensorFlow 및 PyTorch와 같은 라이브러리를 위한 분산 학습 프레임워크입니다. Horovod를 사용하면 사용자가 몇 줄의 코드만으로 수백 개의 GPU에서 실행되도록 기존 학습 스크립트를 스케일 업할 수 있습니다.

Azure Synapse Analytics 내에서 사용자는 기본 Apache Spark 3 런타임을 사용하여 Horovod를 빠르게 시작할 수 있습니다. PyTorch를 사용하는 Spark ML 파이프라인 애플리케이션의 경우 사용자는 horovod.spark 예측 도구 API를 사용할 수 있습니다. 이 Notebook은 Apache Spark 데이터 프레임을 사용하여 MNIST 데이터 세트에서 DNN(분산 신경망) 모델의 분산 학습을 수행합니다. 이 자습서에서는 PyTorch와 Horovod 예측 도구를 사용하여 학습 프로세스를 실행합니다.

필수 조건

  • Azure Synapse Analytics 작업 영역(기본 스토리지로 구성된 Azure Data Lake Storage Gen2 스토리지 계정이 있음). 사용하는 Data Lake Storage Gen2 파일 시스템의 Storage Blob 데이터 기여자여야 합니다.
  • Azure Synapse Analytics 작업 영역에서 GPU 지원 Apache Spark 풀을 만듭니다. 자세한 내용은 Azure Synapse에서 GPU 지원 Apache Spark 풀 만들기를 참조하세요. 이 자습서에서는 노드가 3개인 GPU-Large 클러스터 크기를 사용하는 것이 좋습니다.

참고 항목

Azure Synapse GPU 사용 풀에 대한 미리 보기는 더 이상 사용되지 않습니다.

Apache Spark 세션 구성

세션 시작 시 몇 가지 Apache Spark 설정을 구성해야 합니다. 대부분의 경우 numExecutors 및 spark.rapids.memory.gpu.reserve만 설정하면 됩니다. 대형 모델의 경우 사용자가 spark.kryoserializer.buffer.max 설정을 구성해야 할 수도 있습니다. TensorFlow 모델의 경우 사용자는 spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH를 true로 설정해야 합니다.

이 예에서는 %%configure 명령을 사용하여 Spark 구성을 전달하는 방법을 확인할 수 있습니다. 각 매개 변수의 자세한 의미는 Apache Spark 구성 설명서에 설명되어 있습니다. 제공된 값은 Azure Synapse GPU 대규모 풀에 대해 권장되는 모범 사례 값입니다.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

이 자습서에서는 다음 구성을 사용합니다.


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g"
   }
}

참고 항목

Horovod를 사용하여 학습할 때 사용자는 numExecutors에 대한 Spark 구성을 노드 수보다 작거나 같도록 설정해야 합니다.

종속성 가져오기

이 자습서에서는 PySpark를 사용하여 데이터 세트를 읽고 처리합니다. 그런 다음 PyTorch와 Horovod를 사용하여 DNN(분산 신경망) 모델을 빌드하고 학습 프로세스를 실행합니다. 시작하려면 다음 종속성을 가져와야 합니다.

# base libs
import sys
import uuid

# numpy
import numpy as np

# pyspark related
import pyspark
import pyspark.sql.types as T
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

# pytorch related
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# horovod related
import horovod.spark.torch as hvd
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store

# azure related
from azure.synapse.ml.horovodutils import AdlsStore

대체 스토리지 계정에 연결

중간 및 모델 데이터를 저장하려면 ADLS(Azure Data Lake Storage) 계정이 필요합니다. 대체 스토리지 계정을 사용하는 경우 계정에서 자동으로 인증하고 읽도록 연결된 서비스를 설정해야 합니다. 또한 remote_url, account_namelinked_service_name 속성을 수정해야 합니다.

num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.01  # learning rate for single node code

uuid_str = str(uuid.uuid4())  # with uuid, each run will use a new directory
work_dir = '/tmp/' + uuid_str

# create adls store for model training, use your own adls account info
remote_url = "<<ABFS path to storage account>>"
account_name = "<<name of storage account>>"
linked_service_name = "<<name of linked service>>"
sas_token = TokenLibrary.getConnectionString(linked_service_name)
adls_store_path = remote_url + work_dir

store = AdlsStore.create(adls_store_path,
                         storage_options={
                             'account_name': account_name,
                             'sas_token': sas_token
                         },
                         save_runs=True)

print(adls_store_path)

데이터 세트 준비

다음으로 학습을 위해 데이터 세트를 준비합니다. 이 자습서에서는 Azure Open Datasets의 MNIST 데이터 세트를 사용합니다.

# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

# Download MNIST dataset from Azure Open Datasets
from azureml.opendatasets import MNIST

mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
mnist_df.info()

# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
mnist_df.head()

Apache Spark를 사용하여 데이터 처리

이제 Apache Spark 데이터 프레임을 만듭니다. 이 데이터 프레임은 학습을 위해 HorovodEstimator와 함께 사용됩니다.

# Create Spark DataFrame for training
df = spark.createDataFrame(mnist_df)

# repartition DataFrame for training
train_df = df.repartition(num_proc)

# Train/test split
train_df, test_df = train_df.randomSplit([0.9, 0.1])

# show the dataset
train_df.show()
train_df.count()

DNN 모델 정의

데이터 세트 처리가 완료되면 이제 PyTorch 모델을 정의할 수 있습니다. 동일한 코드를 사용하여 단일 노드 PyTorch 모델을 학습시킬 수도 있습니다.

# Define the PyTorch model without any Horovod-specific parameters
class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = x.float()
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)


model = Net()
optimizer = optim.SGD(model.parameters(),
                      lr=lr_single_node * num_proc,
                      momentum=0.5)  # notice the lr is scaled up
loss = nn.NLLLoss()

모델 학습

이제 Apache Spark 데이터 프레임 위에 Horovod Spark 추정기를 학습할 수 있습니다.

# Train a Horovod Spark Estimator on the DataFrame
backend = SparkBackend(num_proc=num_proc,
                       stdout=sys.stdout,
                       stderr=sys.stderr,
                       prefix_output_with_timestamp=True)

torch_estimator = hvd.TorchEstimator(
    backend=backend,
    store=store,
    partitions_per_process=1,  # important for GPU training
    model=model,
    optimizer=optimizer,
    loss=lambda input, target: loss(input, target.long()),
    input_shapes=[[-1, 1, 28, 28]],
    feature_cols=['features'],
    label_cols=['label'],
    batch_size=batch_size,
    epochs=epochs,
    validation=0.1,
    verbose=2)

torch_model = torch_estimator.fit(train_df).setOutputCols(['label_prob'])

학습된 모델 평가

학습 프로세스가 완료되면 테스트 데이터 세트에서 모델을 평가할 수 있습니다.

# Evaluate the model on the held-out test DataFrame
pred_df = torch_model.transform(test_df)

argmax = udf(lambda v: float(np.argmax(v)), returnType=T.DoubleType())
pred_df = pred_df.withColumn('label_pred', argmax(pred_df.label_prob))
evaluator = MulticlassClassificationEvaluator(predictionCol='label_pred',
                                              labelCol='label',
                                              metricName='accuracy')

print('Test accuracy:', evaluator.evaluate(pred_df))

리소스 정리

Spark 인스턴스가 종료되도록 하려면 연결된 세션(Notebook)을 종료합니다. Apache Spark 풀에 지정된 유휴 시간에 도달하면 풀이 종료됩니다. Notebook 오른쪽 상단에 있는 상태 표시줄에서 세션 중지를 선택할 수도 있습니다.

상태 표시줄의 세션 중지 단추를 보여주는 스크린샷입니다.

다음 단계