Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Petastorm to biblioteka dostępu do danych typu open source, która umożliwia trenowanie modeli uczenia głębokiego w jednym węźle lub rozproszone. Ta biblioteka umożliwia trenowanie bezpośrednio z zestawów danych w formacie Apache Parquet i zestawach danych ładowanych jako ramka danych platformy Apache Spark. Program Petastorm obsługuje popularne struktury szkoleniowe, takie jak TensorFlow i PyTorch.
Aby uzyskać więcej informacji na temat platformy Petastorm, możesz odwiedzić stronę Petastorm GitHub lub dokumentację interfejsu API Petastorm.
Wymagania wstępne
- Obszar roboczy usługi Azure Synapse Analytics z kontem magazynu usługi Azure Data Lake Storage Gen2 skonfigurowanym jako magazyn domyślny. Musisz być współautorem danych obiektu blob usługi Storage w systemie plików usługi Data Lake Storage Gen2, z którym pracujesz.
- Utwórz pulę platformy Apache Spark z obsługą procesora GPU w obszarze roboczym usługi Azure Synapse Analytics. Aby uzyskać szczegółowe informacje, zobacz Tworzenie puli platformy Apache Spark z obsługą procesora GPU w usłudze Azure Synapse. Na potrzeby tego samouczka zalecamy użycie dużego rozmiaru klastra GPU z 3 węzłami.
Uwaga
Wersja zapoznawcza pul z obsługą procesora GPU usługi Azure Synapse jest teraz przestarzała.
Konfigurowanie sesji platformy Apache Spark
Na początku sesji musimy skonfigurować kilka ustawień platformy Apache Spark. W większości przypadków musimy ustawić tylko wartości numExecutors i spark.rapids.memory.gpu.reserve. W tym przykładzie możesz zobaczyć, jak konfiguracje platformy Spark można przekazać za %%configure pomocą polecenia . Szczegółowe znaczenie każdego parametru wyjaśniono w dokumentacji konfiguracji platformy Apache Spark.
%%configure -f
{
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g"
}
}
Interfejsy API zapisu petastorm
Zestaw danych utworzony przy użyciu narzędzia Petastorm jest przechowywany w formacie Apache Parquet. Oprócz schematu Parquet petastorm przechowuje również informacje o schemacie wyższego poziomu, dzięki czemu tablice wielowymiarowe są przechowywane w natywnej części zestawu danych Petastorm.
W przykładzie utworzymy zestaw danych przy użyciu narzędzia PySpark. Zapisujemy zestaw danych na koncie usługi Azure Data Lake Storage Gen2.
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema('HelloWorldSchema', [
UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False),
UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])
def row_generator(x):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return {'id': x,
'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)),
'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}
def generate_petastorm_dataset(output_url):
rowgroup_size_mb = 256
spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
sc = spark.sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):
rows_rdd = sc.parallelize(range(rows_count))\
.map(row_generator)\
.map(lambda x: dict_to_spark_row(HelloWorldSchema, x))
spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \
.coalesce(10) \
.write \
.mode('overwrite') \
.parquet(output_url)
output_url = 'abfs://container_name@storage_account_url/data_dir' #use your own adls account info
generate_petastorm_dataset(output_url)
Interfejsy API odczytu petastorm
Odczytywanie zestawu danych z podstawowego konta magazynu
Klasa petastorm.reader.Reader jest głównym punktem wejścia dla kodu użytkownika, który uzyskuje dostęp do danych z platformy ML, takiej jak TensorFlow lub PyTorch. Zestaw danych można odczytać przy użyciu petastorm.reader.Reader klasy i petastorm.make_reader metody factory.
W tym przykładzie możesz zobaczyć, jak można przekazać abfs protokół ADRESU URL.
from petastorm import make_reader
#on primary storage associated with the workspace, url can be abbreviated with container path for data directory
with make_reader('abfs://<container_name>/<data directory path>/') as reader:
for row in reader:
print(row)
Odczyt zestawu danych z pomocniczego konta magazynu
Jeśli używasz alternatywnego konta magazynu, pamiętaj, aby skonfigurować połączoną usługę w celu automatycznego uwierzytelniania i odczytywania z konta. Ponadto należy zmodyfikować następujące właściwości: remote_url, account_namei linked_service_name.
from petastorm import make_reader
# create sas token for storage account access, use your own adls account info
remote_url = "abfs://container_name@storage_account_url"
account_name = "<<adls account name>>"
linked_service_name = '<<linked service name>>'
TokenLibrary = spark._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
sas_token = TokenLibrary.getConnectionString(linked_service_name)
with make_reader('{}/data_directory'.format(remote_url), storage_options = {'sas_token' : sas_token}) as reader:
for row in reader:
print(row)
Odczyt zestawu danych w partiach
W tym przykładzie można zobaczyć, jak przekazać abfs protokół URL w celu odczytu danych w partiach. W tym przykładzie użyto make_batch_reader klasy .
from petastorm import make_batch_reader
with make_batch_reader('abfs://<container_name>/<data directory path>/', schema_fields=["value1", "value2"]) as reader:
for schema_view in reader:
print("Batched read:\nvalue1: {0} value2: {1}".format(schema_view.value1, schema_view.value2))
PyTorch API
Aby odczytać zestaw danych Petastorm z biblioteki PyTorch, możesz użyć klasy adaptera petastorm.pytorch.DataLoader . Ta karta umożliwia dostarczanie niestandardowych funkcji sortowania I przekształceń PyTorch.
W tym przykładzie pokażemy, jak można użyć narzędzia Petastorm DataLoader do załadowania zestawu danych Petastorm za pomocą interfejsu API make_reader. Ta pierwsza sekcja tworzy definicję Net klasy i traintest funkcji.
from __future__ import division, print_function
import argparse
import pyarrow
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms
from petastorm import make_reader, TransformSpec
from petastorm.pytorch import DataLoader
from pyspark.sql.functions import col
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1)
def train(model, device, train_loader, log_interval, optimizer, epoch):
model.train()
for batch_idx, row in enumerate(train_loader):
data, target = row['image'].to(device), row['digit'].to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % log_interval == 0:
print('Train Epoch: {} [{}]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), loss.item()))
def test(model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
count = 0
with torch.no_grad():
for row in test_loader:
data, target = row['image'].to(device), row['digit'].to(device)
output = model(data)
test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
count += data.shape[0]
test_loss /= count
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, count, 100. * correct / count))
def _transform_row(mnist_row):
# For this example, the images are stored as simpler ndarray (28,28), but the
# training network expects 3-dim images, hence the additional lambda transform.
transform = transforms.Compose([
transforms.Lambda(lambda nd: nd.reshape(28, 28, 1)),
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
# In addition, the petastorm pytorch DataLoader does not distinguish the notion of
# data or target transform, but that actually gives the user more flexibility
# to make the desired partial transform, as shown here.
result_row = {
'image': transform(mnist_row['image']),
'digit': mnist_row['digit']
}
return result_row
W tym przykładzie konto usługi Azure Data Lake Storage jest używane do przechowywania danych pośrednich. Aby przechowywać te dane, należy skonfigurować połączoną usługę na koncie magazynu i pobrać następujące informacje: remote_url, account_namei linked_service_name.
from petastorm import make_reader
# create sas token for storage account access, use your own adls account info
remote_url = "abfs://container_name@storage_account_url"
account_name = "<account name>"
linked_service_name = '<linked service name>'
TokenLibrary = spark._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
sas_token = TokenLibrary.getConnectionString(linked_service_name)
# Read Petastorm dataset and apply custom PyTorch transformation functions
device = torch.device('cpu') #For GPU, it will be torch.device('cuda'). More details: https://pytorch.org/docs/stable/tensor_attributes.html#torch-device
model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
loop_epochs = 1
reader_epochs = 1
transform = TransformSpec(_transform_row, removed_fields=['idx'])
for epoch in range(1, loop_epochs + 1):
with DataLoader(make_reader('{}/train'.format(remote_url), num_epochs=reader_epochs, transform_spec=transform),batch_size=5) as train_loader:
train(model, device, train_loader, 10, optimizer, epoch)
with DataLoader(make_reader('{}/test'.format(remote_url), num_epochs=reader_epochs, transform_spec=transform), batch_size=5) as test_loader:
test(model, device, test_loader)