你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
Petastorm 是一个开源数据访问库,可实现深度学习模型的单节点或分布式训练。 该库支持直接从 Apache Parquet 格式的数据集和已作为 Apache Spark 数据帧加载的数据集进行训练。 Petastorm 支持流行的训练框架,例如 TensorFlow 和 PyTorch。
有关 Petastorm 的详细信息,可以访问 Petastorm GitHub 页或 Petastorm API 文档。
先决条件
- Azure Synapse Analytics 工作区,其中 Azure Data Lake Storage Gen2 存储帐户配置为默认存储。 你需要成为所使用的 Data Lake Storage Gen2 文件系统的存储 Blob 数据参与者。
- 在 Azure Synapse Analytics 工作区中创建支持 GPU 的 Apache Spark 池。 有关详细信息,请参阅在 Azure Synapse 中创建支持 GPU 的 Apache Spark 池。 对于本教程,建议使用具有 3 个节点的大型 GPU 群集。
注意
已启用 Azure Synapse GPU 的池的预览版现已弃用。
配置 Apache Spark 会话
在会话开始时,我们需要配置一些 Apache Spark 设置。 在大多数情况下,我们只需要设置 numExecutors
和 spark.rapids.memory.gpu.reserve
。 在此示例中,你可以看到如何使用 %%configure
命令传递 Spark 配置。
Apache Spark 配置文档中详细解释了每个参数的含义。
%%configure -f
{
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g"
}
}
Petastorm 写入 API
使用 Petastorm 创建的数据集以 Apache Parquet 格式存储。 在 Parquet 模式之上,Petastorm 还存储更高级别的架构信息,这些信息使多维数组成为 Petastorm 数据集的本机部分。
在该示例中,我们使用 PySpark 创建了数据集。 我们将数据集写入 Azure Data Lake Storage Gen2 帐户。
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema('HelloWorldSchema', [
UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False),
UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])
def row_generator(x):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return {'id': x,
'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)),
'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}
def generate_petastorm_dataset(output_url):
rowgroup_size_mb = 256
spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
sc = spark.sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):
rows_rdd = sc.parallelize(range(rows_count))\
.map(row_generator)\
.map(lambda x: dict_to_spark_row(HelloWorldSchema, x))
spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \
.coalesce(10) \
.write \
.mode('overwrite') \
.parquet(output_url)
output_url = 'abfs://container_name@storage_account_url/data_dir' #use your own adls account info
generate_petastorm_dataset(output_url)
Petastorm 读取 API
从主存储帐户读取数据集
petastorm.reader.Reader
类是用户代码访问来自 ML 框架(例如 TensorFlow 或 PyTorch)的数据的主要入口点。 可以使用 petastorm.reader.Reader
类和 petastorm.make_reader
工厂方法读取数据集。
在该示例中,你可以看到如何传递 abfs
URL 协议。
from petastorm import make_reader
#on primary storage associated with the workspace, url can be abbreviated with container path for data directory
with make_reader('abfs://<container_name>/<data directory path>/') as reader:
for row in reader:
print(row)
从辅助存储帐户读取数据集
如果使用备用存储帐户,请确保对链接服务进行设置,以自动对帐户进行身份验证和读取。 此外,还需要修改以下属性:remote_url
、account_name
和 linked_service_name
。
from petastorm import make_reader
# create sas token for storage account access, use your own adls account info
remote_url = "abfs://container_name@storage_account_url"
account_name = "<<adls account name>>"
linked_service_name = '<<linked service name>>'
TokenLibrary = spark._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
sas_token = TokenLibrary.getConnectionString(linked_service_name)
with make_reader('{}/data_directory'.format(remote_url), storage_options = {'sas_token' : sas_token}) as reader:
for row in reader:
print(row)
批量读取数据集
在该示例中,你可以看到如何传递 abfs
URL 协议以批量读取数据。 此示例使用 make_batch_reader
类。
from petastorm import make_batch_reader
with make_batch_reader('abfs://<container_name>/<data directory path>/', schema_fields=["value1", "value2"]) as reader:
for schema_view in reader:
print("Batched read:\nvalue1: {0} value2: {1}".format(schema_view.value1, schema_view.value2))
PyTorch API
要从 PyTorch 读取 Petastorm 数据集,你可以使用适配器 petastorm.pytorch.DataLoader
类。 此适配器允许提供自定义 PyTorch 整理函数和转换。
在此示例中,我们将展示如何使用 Petastorm DataLoader 在 make_reader API 的帮助下加载 Petastorm 数据集。 第一部分创建 Net
类以及 train
和 test
函数的定义。
from __future__ import division, print_function
import argparse
import pyarrow
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms
from petastorm import make_reader, TransformSpec
from petastorm.pytorch import DataLoader
from pyspark.sql.functions import col
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1)
def train(model, device, train_loader, log_interval, optimizer, epoch):
model.train()
for batch_idx, row in enumerate(train_loader):
data, target = row['image'].to(device), row['digit'].to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % log_interval == 0:
print('Train Epoch: {} [{}]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), loss.item()))
def test(model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
count = 0
with torch.no_grad():
for row in test_loader:
data, target = row['image'].to(device), row['digit'].to(device)
output = model(data)
test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
count += data.shape[0]
test_loss /= count
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, count, 100. * correct / count))
def _transform_row(mnist_row):
# For this example, the images are stored as simpler ndarray (28,28), but the
# training network expects 3-dim images, hence the additional lambda transform.
transform = transforms.Compose([
transforms.Lambda(lambda nd: nd.reshape(28, 28, 1)),
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
# In addition, the petastorm pytorch DataLoader does not distinguish the notion of
# data or target transform, but that actually gives the user more flexibility
# to make the desired partial transform, as shown here.
result_row = {
'image': transform(mnist_row['image']),
'digit': mnist_row['digit']
}
return result_row
在此示例中,Azure Data Lake Storage 帐户用于存储中间数据。 要存储此数据,必须为存储帐户设置链接服务并检索以下信息:remote_url
、account_name
和 linked_service_name
。
from petastorm import make_reader
# create sas token for storage account access, use your own adls account info
remote_url = "abfs://container_name@storage_account_url"
account_name = "<account name>"
linked_service_name = '<linked service name>'
TokenLibrary = spark._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
sas_token = TokenLibrary.getConnectionString(linked_service_name)
# Read Petastorm dataset and apply custom PyTorch transformation functions
device = torch.device('cpu') #For GPU, it will be torch.device('cuda'). More details: https://pytorch.org/docs/stable/tensor_attributes.html#torch-device
model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
loop_epochs = 1
reader_epochs = 1
transform = TransformSpec(_transform_row, removed_fields=['idx'])
for epoch in range(1, loop_epochs + 1):
with DataLoader(make_reader('{}/train'.format(remote_url), num_epochs=reader_epochs, transform_spec=transform),batch_size=5) as train_loader:
train(model, device, train_loader, 10, optimizer, epoch)
with DataLoader(make_reader('{}/test'.format(remote_url), num_epochs=reader_epochs, transform_spec=transform), batch_size=5) as test_loader:
test(model, device, test_loader)