Load data with Petastorm (deprecated)
Petastorm is an open source data access library, which enables single-node or distributed training of deep learning models. This library enables training directly from datasets in Apache Parquet format and datasets that are loaded as an Apache Spark DataFrame. Petastorm supports popular training frameworks such as Tensorflow and PyTorch.
For more information about Petastorm, you can visit the Petastorm GitHub page or the Petastorm API documentation.
Prerequisites
- Azure Synapse Analytics workspace with an Azure Data Lake Storage Gen2 storage account configured as the default storage. You need to be the Storage Blob Data Contributor of the Data Lake Storage Gen2 file system that you work with.
- Create a GPU-enabled Apache Spark pool in your Azure Synapse Analytics workspace. For details, see Create a GPU-enabled Apache Spark pool in Azure Synapse. For this tutorial, we suggest using the GPU-Large cluster size with 3 nodes.
Note
The Preview for Azure Synapse GPU-enabled pools has now been deprecated.
Caution
Deprecation and disablement notification for GPUs on the Azure Synapse Runtime for Apache Spark 3.1 and 3.2
- The GPU accelerated preview is now deprecated on the Apache Spark 3.2 (deprecated) runtime. Deprecated runtimes will not have bug and feature fixes. This runtime and the corresponding GPU accelerated preview on Spark 3.2 has been retired and disabled as of July 8, 2024.
- The GPU accelerated preview is now deprecated on the Azure Synapse 3.1 (deprecated) runtime. Azure Synapse Runtime for Apache Spark 3.1 has reached its end of support as of January 26, 2023, with official support discontinued effective January 26, 2024, and no further addressing of support tickets, bug fixes, or security updates beyond this date.
Configure the Apache Spark session
At the start of the session, we need to configure a few Apache Spark settings. In most cases, we only need to set the numExecutors
and spark.rapids.memory.gpu.reserve
. In the example, you can see how the Spark configurations can be passed with the %%configure
command. The detailed meaning of each parameter is explained in the Apache Spark configuration documentation.
%%configure -f
{
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g"
}
}
Petastorm write APIs
A dataset created using Petastorm is stored in an Apache Parquet format. On top of a Parquet schema, Petastorm also stores higher-level schema information that makes multidimensional arrays into a native part of a Petastorm dataset.
In the sample, we create a dataset using PySpark. We write the dataset to an Azure Data Lake Storage Gen2 account.
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema('HelloWorldSchema', [
UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False),
UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])
def row_generator(x):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return {'id': x,
'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)),
'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}
def generate_petastorm_dataset(output_url):
rowgroup_size_mb = 256
spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
sc = spark.sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):
rows_rdd = sc.parallelize(range(rows_count))\
.map(row_generator)\
.map(lambda x: dict_to_spark_row(HelloWorldSchema, x))
spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \
.coalesce(10) \
.write \
.mode('overwrite') \
.parquet(output_url)
output_url = 'abfs://container_name@storage_account_url/data_dir' #use your own adls account info
generate_petastorm_dataset(output_url)
Petastorm read APIs
Read dataset from a primary storage account
The petastorm.reader.Reader
class is the main entry point for user code that accesses the data from an ML framework such as Tensorflow or Pytorch. You can read a dataset using the petastorm.reader.Reader
class and the petastorm.make_reader
factory method.
In the example, you can see how you can pass an abfs
URL protocol.
from petastorm import make_reader
#on primary storage associated with the workspace, url can be abbreviated with container path for data directory
with make_reader('abfs://<container_name>/<data directory path>/') as reader:
for row in reader:
print(row)
Read dataset from secondary storage account
If you are using an alternative storage account, be sure to set up the linked service to automatically authenticate and read from the account. In addition, you need to modify the following properties: remote_url
, account_name
, and linked_service_name
.
from petastorm import make_reader
# create sas token for storage account access, use your own adls account info
remote_url = "abfs://container_name@storage_account_url"
account_name = "<<adls account name>>"
linked_service_name = '<<linked service name>>'
TokenLibrary = spark._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
sas_token = TokenLibrary.getConnectionString(linked_service_name)
with make_reader('{}/data_directory'.format(remote_url), storage_options = {'sas_token' : sas_token}) as reader:
for row in reader:
print(row)
Read dataset in batches
In the example, you can see how you can pass an abfs
URL protocol to read data in batches. This example uses the make_batch_reader
class.
from petastorm import make_batch_reader
with make_batch_reader('abfs://<container_name>/<data directory path>/', schema_fields=["value1", "value2"]) as reader:
for schema_view in reader:
print("Batched read:\nvalue1: {0} value2: {1}".format(schema_view.value1, schema_view.value2))
PyTorch API
To read a Petastorm dataset from PyTorch, you can use the adapter petastorm.pytorch.DataLoader
class. This adapter allows for custom PyTorch collating functions and transforms to be supplied.
In this example, we will show how Petastorm DataLoader can be used to load a Petastorm dataset with the help of make_reader API. This first section creates the definition of a Net
class and train
and test
function.
from __future__ import division, print_function
import argparse
import pyarrow
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms
from petastorm import make_reader, TransformSpec
from petastorm.pytorch import DataLoader
from pyspark.sql.functions import col
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1)
def train(model, device, train_loader, log_interval, optimizer, epoch):
model.train()
for batch_idx, row in enumerate(train_loader):
data, target = row['image'].to(device), row['digit'].to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % log_interval == 0:
print('Train Epoch: {} [{}]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), loss.item()))
def test(model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
count = 0
with torch.no_grad():
for row in test_loader:
data, target = row['image'].to(device), row['digit'].to(device)
output = model(data)
test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
count += data.shape[0]
test_loss /= count
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, count, 100. * correct / count))
def _transform_row(mnist_row):
# For this example, the images are stored as simpler ndarray (28,28), but the
# training network expects 3-dim images, hence the additional lambda transform.
transform = transforms.Compose([
transforms.Lambda(lambda nd: nd.reshape(28, 28, 1)),
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
# In addition, the petastorm pytorch DataLoader does not distinguish the notion of
# data or target transform, but that actually gives the user more flexibility
# to make the desired partial transform, as shown here.
result_row = {
'image': transform(mnist_row['image']),
'digit': mnist_row['digit']
}
return result_row
In this example, an Azure Data Lake Storage account is used to store intermediate data. To store this data, you must set up a Linked Service to the storage account and retrieve the following pieces of information: remote_url
, account_name
, and linked_service_name
.
from petastorm import make_reader
# create sas token for storage account access, use your own adls account info
remote_url = "abfs://container_name@storage_account_url"
account_name = "<account name>"
linked_service_name = '<linked service name>'
TokenLibrary = spark._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
sas_token = TokenLibrary.getConnectionString(linked_service_name)
# Read Petastorm dataset and apply custom PyTorch transformation functions
device = torch.device('cpu') #For GPU, it will be torch.device('cuda'). More details: https://pytorch.org/docs/stable/tensor_attributes.html#torch-device
model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
loop_epochs = 1
reader_epochs = 1
transform = TransformSpec(_transform_row, removed_fields=['idx'])
for epoch in range(1, loop_epochs + 1):
with DataLoader(make_reader('{}/train'.format(remote_url), num_epochs=reader_epochs, transform_spec=transform),batch_size=5) as train_loader:
train(model, device, train_loader, 10, optimizer, epoch)
with DataLoader(make_reader('{}/test'.format(remote_url), num_epochs=reader_epochs, transform_spec=transform), batch_size=5) as test_loader:
test(model, device, test_loader)