Horovod는 TensorFlow 및 PyTorch와 같은 라이브러리를 위한 분산 학습 프레임워크입니다. Horovod를 사용하면 사용자가 몇 줄의 코드만으로 수백 개의 GPU에서 실행되도록 기존 학습 스크립트를 스케일 업할 수 있습니다.
Azure Synapse Analytics 내에서 사용자는 기본 Apache Spark 3 런타임을 사용하여 Horovod를 빠르게 시작할 수 있습니다. PyTorch를 사용하는 Spark ML 파이프라인 애플리케이션의 경우 사용자는 horovod.spark 예측 도구 API를 사용할 수 있습니다. 이 Notebook은 Apache Spark 데이터 프레임을 사용하여 MNIST 데이터 세트에서 DNN(분산 신경망) 모델의 분산 학습을 수행합니다. 이 자습서에서는 PyTorch와 Horovod 예측 도구를 사용하여 학습 프로세스를 실행합니다.
필수 조건
- Azure Synapse Analytics 작업 영역(기본 스토리지로 구성된 Azure Data Lake Storage Gen2 스토리지 계정이 있음). 사용하는 Data Lake Storage Gen2 파일 시스템의 Storage Blob 데이터 기여자여야 합니다.
- Azure Synapse Analytics 작업 영역에서 GPU 지원 Apache Spark 풀을 만듭니다. 자세한 내용은 Azure Synapse에서 GPU 지원 Apache Spark 풀 만들기를 참조하세요. 이 자습서에서는 노드가 3개인 GPU-Large 클러스터 크기를 사용하는 것이 좋습니다.
참고 항목
Azure Synapse GPU 사용 풀에 대한 미리 보기는 더 이상 사용되지 않습니다.
Apache Spark 세션 구성
세션 시작 시 몇 가지 Apache Spark 설정을 구성해야 합니다. 대부분의 경우 numExecutors 및 spark.rapids.memory.gpu.reserve만 설정하면 됩니다. 대형 모델의 경우 사용자가 spark.kryoserializer.buffer.max 설정을 구성해야 할 수도 있습니다. TensorFlow 모델의 경우 사용자는 spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH를 true로 설정해야 합니다.
이 예에서는 %%configure 명령을 사용하여 Spark 구성을 전달하는 방법을 확인할 수 있습니다. 각 매개 변수의 자세한 의미는 Apache Spark 구성 설명서에 설명되어 있습니다. 제공된 값은 Azure Synapse GPU 대규모 풀에 대해 권장되는 모범 사례 값입니다.
%%configure -f
{
"driverMemory": "30g",
"driverCores": 4,
"executorMemory": "60g",
"executorCores": 12,
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
"spark.kryoserializer.buffer.max": "2000m"
}
}
이 자습서에서는 다음 구성을 사용합니다.
%%configure -f
{
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g"
}
}
참고 항목
Horovod를 사용하여 학습할 때 사용자는 numExecutors에 대한 Spark 구성을 노드 수보다 작거나 같도록 설정해야 합니다.
종속성 가져오기
이 자습서에서는 PySpark를 사용하여 데이터 세트를 읽고 처리합니다. 그런 다음 PyTorch와 Horovod를 사용하여 DNN(분산 신경망) 모델을 빌드하고 학습 프로세스를 실행합니다. 시작하려면 다음 종속성을 가져와야 합니다.
# base libs
import sys
import uuid
# numpy
import numpy as np
# pyspark related
import pyspark
import pyspark.sql.types as T
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
# pytorch related
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
# horovod related
import horovod.spark.torch as hvd
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store
# azure related
from azure.synapse.ml.horovodutils import AdlsStore
대체 스토리지 계정에 연결
중간 및 모델 데이터를 저장하려면 ADLS(Azure Data Lake Storage) 계정이 필요합니다. 대체 스토리지 계정을 사용하는 경우 계정에서 자동으로 인증하고 읽도록 연결된 서비스를 설정해야 합니다. 또한 remote_url, account_name 및 linked_service_name 속성을 수정해야 합니다.
num_proc = 3 # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.01 # learning rate for single node code
uuid_str = str(uuid.uuid4()) # with uuid, each run will use a new directory
work_dir = '/tmp/' + uuid_str
# create adls store for model training, use your own adls account info
remote_url = "<<ABFS path to storage account>>"
account_name = "<<name of storage account>>"
linked_service_name = "<<name of linked service>>"
sas_token = TokenLibrary.getConnectionString(linked_service_name)
adls_store_path = remote_url + work_dir
store = AdlsStore.create(adls_store_path,
storage_options={
'account_name': account_name,
'sas_token': sas_token
},
save_runs=True)
print(adls_store_path)
데이터 세트 준비
다음으로 학습을 위해 데이터 세트를 준비합니다. 이 자습서에서는 Azure Open Datasets의 MNIST 데이터 세트를 사용합니다.
# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()
# Download MNIST dataset from Azure Open Datasets
from azureml.opendatasets import MNIST
mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
mnist_df.info()
# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
mnist_df.head()
Apache Spark를 사용하여 데이터 처리
이제 Apache Spark 데이터 프레임을 만듭니다. 이 데이터 프레임은 학습을 위해 HorovodEstimator와 함께 사용됩니다.
# Create Spark DataFrame for training
df = spark.createDataFrame(mnist_df)
# repartition DataFrame for training
train_df = df.repartition(num_proc)
# Train/test split
train_df, test_df = train_df.randomSplit([0.9, 0.1])
# show the dataset
train_df.show()
train_df.count()
DNN 모델 정의
데이터 세트 처리가 완료되면 이제 PyTorch 모델을 정의할 수 있습니다. 동일한 코드를 사용하여 단일 노드 PyTorch 모델을 학습시킬 수도 있습니다.
# Define the PyTorch model without any Horovod-specific parameters
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = x.float()
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x)
model = Net()
optimizer = optim.SGD(model.parameters(),
lr=lr_single_node * num_proc,
momentum=0.5) # notice the lr is scaled up
loss = nn.NLLLoss()
모델 학습
이제 Apache Spark 데이터 프레임 위에 Horovod Spark 추정기를 학습할 수 있습니다.
# Train a Horovod Spark Estimator on the DataFrame
backend = SparkBackend(num_proc=num_proc,
stdout=sys.stdout,
stderr=sys.stderr,
prefix_output_with_timestamp=True)
torch_estimator = hvd.TorchEstimator(
backend=backend,
store=store,
partitions_per_process=1, # important for GPU training
model=model,
optimizer=optimizer,
loss=lambda input, target: loss(input, target.long()),
input_shapes=[[-1, 1, 28, 28]],
feature_cols=['features'],
label_cols=['label'],
batch_size=batch_size,
epochs=epochs,
validation=0.1,
verbose=2)
torch_model = torch_estimator.fit(train_df).setOutputCols(['label_prob'])
학습된 모델 평가
학습 프로세스가 완료되면 테스트 데이터 세트에서 모델을 평가할 수 있습니다.
# Evaluate the model on the held-out test DataFrame
pred_df = torch_model.transform(test_df)
argmax = udf(lambda v: float(np.argmax(v)), returnType=T.DoubleType())
pred_df = pred_df.withColumn('label_pred', argmax(pred_df.label_prob))
evaluator = MulticlassClassificationEvaluator(predictionCol='label_pred',
labelCol='label',
metricName='accuracy')
print('Test accuracy:', evaluator.evaluate(pred_df))
리소스 정리
Spark 인스턴스가 종료되도록 하려면 연결된 세션(Notebook)을 종료합니다. Apache Spark 풀에 지정된 유휴 시간에 도달하면 풀이 종료됩니다. Notebook 오른쪽 상단에 있는 상태 표시줄에서 세션 중지를 선택할 수도 있습니다.