Udostępnij za pośrednictwem


Ładowanie danych za pomocą narzędzia Petastorm (przestarzałe)

Petastorm to biblioteka dostępu do danych typu open source, która umożliwia trenowanie modeli uczenia głębokiego w jednym węźle lub rozproszone. Ta biblioteka umożliwia trenowanie bezpośrednio z zestawów danych w formacie Apache Parquet i zestawach danych ładowanych jako ramka danych platformy Apache Spark. Program Petastorm obsługuje popularne struktury szkoleniowe, takie jak TensorFlow i PyTorch.

Aby uzyskać więcej informacji na temat platformy Petastorm, możesz odwiedzić stronę Petastorm GitHub lub dokumentację interfejsu API Petastorm.

Wymagania wstępne

  • Obszar roboczy usługi Azure Synapse Analytics z kontem magazynu usługi Azure Data Lake Storage Gen2 skonfigurowanym jako magazyn domyślny. Musisz być współautorem danych obiektu blob usługi Storage w systemie plików usługi Data Lake Storage Gen2, z którym pracujesz.
  • Utwórz pulę platformy Apache Spark z obsługą procesora GPU w obszarze roboczym usługi Azure Synapse Analytics. Aby uzyskać szczegółowe informacje, zobacz Tworzenie puli platformy Apache Spark z obsługą procesora GPU w usłudze Azure Synapse. Na potrzeby tego samouczka zalecamy użycie dużego rozmiaru klastra GPU z 3 węzłami.

Uwaga

Wersja zapoznawcza pul z obsługą procesora GPU usługi Azure Synapse jest teraz przestarzała.

Konfigurowanie sesji platformy Apache Spark

Na początku sesji musimy skonfigurować kilka ustawień platformy Apache Spark. W większości przypadków musimy ustawić tylko wartości numExecutors i spark.rapids.memory.gpu.reserve. W tym przykładzie możesz zobaczyć, jak konfiguracje platformy Spark można przekazać za %%configure pomocą polecenia . Szczegółowe znaczenie każdego parametru wyjaśniono w dokumentacji konfiguracji platformy Apache Spark.

%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g"
   }
}

Interfejsy API zapisu petastorm

Zestaw danych utworzony przy użyciu narzędzia Petastorm jest przechowywany w formacie Apache Parquet. Oprócz schematu Parquet petastorm przechowuje również informacje o schemacie wyższego poziomu, dzięki czemu tablice wielowymiarowe są przechowywane w natywnej części zestawu danych Petastorm.

W przykładzie utworzymy zestaw danych przy użyciu narzędzia PySpark. Zapisujemy zestaw danych na koncie usługi Azure Data Lake Storage Gen2.

import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType

from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField

# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema('HelloWorldSchema', [
    UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
    UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False),
    UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])


def row_generator(x):
    """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
    return {'id': x,
            'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)),
            'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}


def generate_petastorm_dataset(output_url):
    rowgroup_size_mb = 256

    spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
    sc = spark.sparkContext

    # Wrap dataset materialization portion. Will take care of setting up spark environment variables as
    # well as save petastorm specific metadata
    rows_count = 10
    with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):

        rows_rdd = sc.parallelize(range(rows_count))\
            .map(row_generator)\
            .map(lambda x: dict_to_spark_row(HelloWorldSchema, x))

        spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \
            .coalesce(10) \
            .write \
            .mode('overwrite') \
            .parquet(output_url)


output_url = 'abfs://container_name@storage_account_url/data_dir' #use your own adls account info
generate_petastorm_dataset(output_url)

Interfejsy API odczytu petastorm

Odczytywanie zestawu danych z podstawowego konta magazynu

Klasa petastorm.reader.Reader jest głównym punktem wejścia dla kodu użytkownika, który uzyskuje dostęp do danych z platformy ML, takiej jak TensorFlow lub PyTorch. Zestaw danych można odczytać przy użyciu petastorm.reader.Reader klasy i petastorm.make_reader metody factory.

W tym przykładzie możesz zobaczyć, jak można przekazać abfs protokół ADRESU URL.

from petastorm import make_reader

#on primary storage associated with the workspace, url can be abbreviated with container path for data directory
with make_reader('abfs://<container_name>/<data directory path>/') as reader:
    for row in reader:
        print(row)

Odczyt zestawu danych z pomocniczego konta magazynu

Jeśli używasz alternatywnego konta magazynu, pamiętaj, aby skonfigurować połączoną usługę w celu automatycznego uwierzytelniania i odczytywania z konta. Ponadto należy zmodyfikować następujące właściwości: remote_url, account_namei linked_service_name.

from petastorm import make_reader

# create sas token for storage account access, use your own adls account info
remote_url = "abfs://container_name@storage_account_url"
account_name = "<<adls account name>>"
linked_service_name = '<<linked service name>>'
TokenLibrary = spark._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
sas_token = TokenLibrary.getConnectionString(linked_service_name)

with make_reader('{}/data_directory'.format(remote_url), storage_options = {'sas_token' : sas_token}) as reader:
    for row in reader:
        print(row)

Odczyt zestawu danych w partiach

W tym przykładzie można zobaczyć, jak przekazać abfs protokół URL w celu odczytu danych w partiach. W tym przykładzie użyto make_batch_reader klasy .

from petastorm import make_batch_reader

with make_batch_reader('abfs://<container_name>/<data directory path>/', schema_fields=["value1", "value2"]) as reader:
    for schema_view in reader:
        print("Batched read:\nvalue1: {0} value2: {1}".format(schema_view.value1, schema_view.value2))

PyTorch API

Aby odczytać zestaw danych Petastorm z biblioteki PyTorch, możesz użyć klasy adaptera petastorm.pytorch.DataLoader . Ta karta umożliwia dostarczanie niestandardowych funkcji sortowania I przekształceń PyTorch.

W tym przykładzie pokażemy, jak można użyć narzędzia Petastorm DataLoader do załadowania zestawu danych Petastorm za pomocą interfejsu API make_reader. Ta pierwsza sekcja tworzy definicję Net klasy i traintest funkcji.

from __future__ import division, print_function

import argparse
import pyarrow
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms

from petastorm import make_reader, TransformSpec
from petastorm.pytorch import DataLoader
from pyspark.sql.functions import col

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

def train(model, device, train_loader, log_interval, optimizer, epoch):
    model.train()
    for batch_idx, row in enumerate(train_loader):
        data, target = row['image'].to(device), row['digit'].to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), loss.item()))

def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    count = 0
    with torch.no_grad():
        for row in test_loader:
            data, target = row['image'].to(device), row['digit'].to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.max(1, keepdim=True)[1]  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()
            count += data.shape[0]
    test_loss /= count
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, count, 100. * correct / count))

def _transform_row(mnist_row):
    # For this example, the images are stored as simpler ndarray (28,28), but the
    # training network expects 3-dim images, hence the additional lambda transform.
    transform = transforms.Compose([
        transforms.Lambda(lambda nd: nd.reshape(28, 28, 1)),
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    # In addition, the petastorm pytorch DataLoader does not distinguish the notion of
    # data or target transform, but that actually gives the user more flexibility
    # to make the desired partial transform, as shown here.
    result_row = {
        'image': transform(mnist_row['image']),
        'digit': mnist_row['digit']
    }

    return result_row

W tym przykładzie konto usługi Azure Data Lake Storage jest używane do przechowywania danych pośrednich. Aby przechowywać te dane, należy skonfigurować połączoną usługę na koncie magazynu i pobrać następujące informacje: remote_url, account_namei linked_service_name.

from petastorm import make_reader

# create sas token for storage account access, use your own adls account info
remote_url = "abfs://container_name@storage_account_url"
account_name = "<account name>"
linked_service_name = '<linked service name>'
TokenLibrary = spark._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
sas_token = TokenLibrary.getConnectionString(linked_service_name)

# Read Petastorm dataset and apply custom PyTorch transformation functions 

device = torch.device('cpu') #For GPU, it will be torch.device('cuda'). More details: https://pytorch.org/docs/stable/tensor_attributes.html#torch-device

model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

loop_epochs = 1
reader_epochs = 1

transform = TransformSpec(_transform_row, removed_fields=['idx'])

for epoch in range(1, loop_epochs + 1):
    with DataLoader(make_reader('{}/train'.format(remote_url), num_epochs=reader_epochs, transform_spec=transform),batch_size=5) as train_loader:
        train(model, device, train_loader, 10, optimizer, epoch)
    with DataLoader(make_reader('{}/test'.format(remote_url), num_epochs=reader_epochs, transform_spec=transform), batch_size=5) as test_loader:
        test(model, device, test_loader)

Następne kroki