Petastorm 是開放原始碼資料存取程式庫,可啟用深度學習模型的單一節點或分散式定型。 此程式庫可實現直接從 Apache Parquet 格式的資料集和已載入為 Apache Spark DataFrame 的資料集進行定型。 Petastorm 支援熱門的訓練架構,例如 TensorFlow 和 PyTorch。
如需 Petastorm 的詳細資訊,您可以瀏覽 Petastorm GitHub 頁面或 Petastorm API 文件。
必要條件
- Azure Synapse Analytics 工作區,其中 Azure Data Lake Storage Gen2 儲存體帳戶已設定為預設儲存體。 使用 Data Lake Storage Gen2 檔案系統時,您必須是該檔案系統的儲存體 Blob 資料參與者。
- 在 Azure Synapse Analytics 工作區中啟用 GPU 的 Apache Spark 集區。 如需詳細資訊,請參閱在 Azure Synapse 中建立已啟用 GPU 的 Apache Spark 集區。 在本教學課程中,我們建議使用具有 3 個節點 GPU-Large 叢集大小。
注意
Azure Synapse 啟用 GPU 預覽功能集區現已取代。
警告
取代和停用適用於 Apache Spark 3.1 和 3.2 的 Azure Synapse 執行階段上的 GPU 通知
- Apache Spark 3.2 (取代) 執行階段上的 GPU 加速預覽現已被取代。 取代的執行階段不會有 Bug 和功能修正。 自 2024 年 7 月 8 日起,Spark 3.2 上的此執行階段和對應的 GPU 加速預覽已淘汰和停用。
- Azure Synapse 3.1 (取代) 執行階段上的 GPU 加速預覽現已被取代。 自 2023 年 1 月 26 日起,適用於 Apache Spark 3.1 的 Azure Synapse 執行階段已終止支援,官方支援已於 2024 年 1 月 26 日失效,且自此日之後,無法再處理任何支援票證、錯誤修正或安全性更新。
設定 Apache Spark 工作階段
在工作階段開始時,我們必須設定一些 Apache Spark 設定。 在大部分情況下,我們只需要設定 numExecutors
和 spark.rapids.memory.gpu.reserve
。 在範例中,您可以看到如何使用 %%configure
命令傳遞 Spark 設定。
Apache Spark 設定文件中會說明每個參數的詳細意義。
%%configure -f
{
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g"
}
}
Petastorm 寫入 API
使用 Petastorm 建立的資料集會以 Apache Parquet 格式儲存。 在 Parquet 結構描述之上,Petastorm 也會儲存較高層級的結構描述資訊,讓多維度陣列成為 Petastorm 資料集的原生部分。
在範例中,我們會使用 PySpark 建立資料集。 我們會將資料集寫入至 Azure Data Lake Storage Gen2 帳戶。
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema('HelloWorldSchema', [
UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False),
UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])
def row_generator(x):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return {'id': x,
'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)),
'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}
def generate_petastorm_dataset(output_url):
rowgroup_size_mb = 256
spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
sc = spark.sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):
rows_rdd = sc.parallelize(range(rows_count))\
.map(row_generator)\
.map(lambda x: dict_to_spark_row(HelloWorldSchema, x))
spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \
.coalesce(10) \
.write \
.mode('overwrite') \
.parquet(output_url)
output_url = 'abfs://container_name@storage_account_url/data_dir' #use your own adls account info
generate_petastorm_dataset(output_url)
Petastorm 讀取 API
從主要儲存體帳戶讀取資料集
類別 petastorm.reader.Reader
是用戶程序代碼的主要進入點,可從 TensorFlow 或 PyTorch 等 ML 架構存取數據。 您可以使用 petastorm.reader.Reader
類別和 petastorm.make_reader
Factory 方法讀取資料集。
在範例中,您可以看到如何傳遞 abfs
URL 通訊協定。
from petastorm import make_reader
#on primary storage associated with the workspace, url can be abbreviated with container path for data directory
with make_reader('abfs://<container_name>/<data directory path>/') as reader:
for row in reader:
print(row)
從次要儲存體帳戶讀取資料集
如果您使用替代的儲存體帳戶,請務必將連結的服務設定為從帳戶自動驗證和讀取。 此外,您必須修改下列屬性:remote_url
、account_name
和 linked_service_name
。
from petastorm import make_reader
# create sas token for storage account access, use your own adls account info
remote_url = "abfs://container_name@storage_account_url"
account_name = "<<adls account name>>"
linked_service_name = '<<linked service name>>'
TokenLibrary = spark._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
sas_token = TokenLibrary.getConnectionString(linked_service_name)
with make_reader('{}/data_directory'.format(remote_url), storage_options = {'sas_token' : sas_token}) as reader:
for row in reader:
print(row)
以批次方式讀取資料集
在範例中,您可以看到如何傳遞 abfs
URL 通訊協定,以批次方式讀取資料。 這個範例會使用 make_batch_reader
類別。
from petastorm import make_batch_reader
with make_batch_reader('abfs://<container_name>/<data directory path>/', schema_fields=["value1", "value2"]) as reader:
for schema_view in reader:
print("Batched read:\nvalue1: {0} value2: {1}".format(schema_view.value1, schema_view.value2))
PyTorch API
若要從 PyTorch 讀取 Petastorm 資料集,您可以使用配接器 petastorm.pytorch.DataLoader
類別。 此配接器允許提供自訂 PyTorch 定序函式和轉換。
在此範例中,我們將說明如何使用 Petastorm DataLoader,藉助 make_reader API 來載入 Petastorm 資料集。 第一個區段會建立 Net
類別以及 train
和 test
函式的定義。
from __future__ import division, print_function
import argparse
import pyarrow
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms
from petastorm import make_reader, TransformSpec
from petastorm.pytorch import DataLoader
from pyspark.sql.functions import col
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1)
def train(model, device, train_loader, log_interval, optimizer, epoch):
model.train()
for batch_idx, row in enumerate(train_loader):
data, target = row['image'].to(device), row['digit'].to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % log_interval == 0:
print('Train Epoch: {} [{}]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), loss.item()))
def test(model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
count = 0
with torch.no_grad():
for row in test_loader:
data, target = row['image'].to(device), row['digit'].to(device)
output = model(data)
test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
count += data.shape[0]
test_loss /= count
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, count, 100. * correct / count))
def _transform_row(mnist_row):
# For this example, the images are stored as simpler ndarray (28,28), but the
# training network expects 3-dim images, hence the additional lambda transform.
transform = transforms.Compose([
transforms.Lambda(lambda nd: nd.reshape(28, 28, 1)),
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
# In addition, the petastorm pytorch DataLoader does not distinguish the notion of
# data or target transform, but that actually gives the user more flexibility
# to make the desired partial transform, as shown here.
result_row = {
'image': transform(mnist_row['image']),
'digit': mnist_row['digit']
}
return result_row
在此範例中,Azure Data Lake Storage 帳戶用來儲存中繼資料。 若要儲存此資料,您必須將連結服務設定為儲存體帳戶,並擷取下列資訊片段:remote_url
、account_name
和 linked_service_name
。
from petastorm import make_reader
# create sas token for storage account access, use your own adls account info
remote_url = "abfs://container_name@storage_account_url"
account_name = "<account name>"
linked_service_name = '<linked service name>'
TokenLibrary = spark._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
sas_token = TokenLibrary.getConnectionString(linked_service_name)
# Read Petastorm dataset and apply custom PyTorch transformation functions
device = torch.device('cpu') #For GPU, it will be torch.device('cuda'). More details: https://pytorch.org/docs/stable/tensor_attributes.html#torch-device
model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
loop_epochs = 1
reader_epochs = 1
transform = TransformSpec(_transform_row, removed_fields=['idx'])
for epoch in range(1, loop_epochs + 1):
with DataLoader(make_reader('{}/train'.format(remote_url), num_epochs=reader_epochs, transform_spec=transform),batch_size=5) as train_loader:
train(model, device, train_loader, 10, optimizer, epoch)
with DataLoader(make_reader('{}/test'.format(remote_url), num_epochs=reader_epochs, transform_spec=transform), batch_size=5) as test_loader:
test(model, device, test_loader)