Compartir a través de


Carga de datos con Petastorm (en desuso)

Petastorm es una biblioteca de acceso a datos de código abierto, que permite el entrenamiento distribuido o nodo único de modelos de aprendizaje profundo. Esta biblioteca permite el entrenamiento directamente desde conjuntos de datos en formato Apache Parquet y conjuntos de datos que se cargan como DataFrame de Apache Spark. Petastorm es compatible con marcos de entrenamiento populares, como TensorFlow y PyTorch.

Para más información sobre Petastorm, puede visitar la página de GitHub de Petastorm y la documentación de la API de Petastorm.

Requisitos previos

  • Necesitará un área de trabajo de Azure Synapse Analytics con una cuenta de almacenamiento de Azure Data Lake Storage Gen2 que esté configurada como almacenamiento predeterminado. Asegúrese de que es el colaborador de datos de Storage Blob en el sistema de archivos de Data Lake Storage Gen2 con el que trabaja.
  • Cree un grupo de Apache Spark habilitado para GPU en el área de trabajo de Azure Synapse Analytics. Para más información, consulte Creación de un grupo de Apache Spark habilitado para GPU en Azure Synapse. En este tutorial, se recomienda usar el tamaño del clúster de GPU grande con 3 nodos.

Nota:

La versión preliminar de los grupos habilitados para GPU de Azure Synapse ya está en desuso.

Configuración de la sesión de Apache Spark

Al principio de la sesión, es necesario configurar algunas opciones de Apache Spark. En la mayoría de los casos, solo es necesario establecer el numExecutors y spark.rapids.memory.gpu.reserve. En el ejemplo, puede ver cómo se pueden pasar las configuraciones de Spark con el comando %%configure . El significado detallado de cada parámetro se explica en la documentación sobre la configuración de Apache Spark.

%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g"
   }
}

API de escritura de Petastorm

Un conjunto de datos creado con Petastorm se almacena en formato Apache Parquet. Además de un esquema Parquet, Petastorm también almacena información de esquema de nivel superior que convierte matrices multidimensionales en una parte nativa de un conjunto de datos de Petastorm.

En el ejemplo, se crea un conjunto de datos mediante PySpark. Se escribe el conjunto de datos en una cuenta de Azure Data Lake Storage Gen2.

import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType

from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField

# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema('HelloWorldSchema', [
    UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
    UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False),
    UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])


def row_generator(x):
    """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
    return {'id': x,
            'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)),
            'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}


def generate_petastorm_dataset(output_url):
    rowgroup_size_mb = 256

    spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
    sc = spark.sparkContext

    # Wrap dataset materialization portion. Will take care of setting up spark environment variables as
    # well as save petastorm specific metadata
    rows_count = 10
    with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):

        rows_rdd = sc.parallelize(range(rows_count))\
            .map(row_generator)\
            .map(lambda x: dict_to_spark_row(HelloWorldSchema, x))

        spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \
            .coalesce(10) \
            .write \
            .mode('overwrite') \
            .parquet(output_url)


output_url = 'abfs://container_name@storage_account_url/data_dir' #use your own adls account info
generate_petastorm_dataset(output_url)

API de lectura de Petastorm

Conjunto de datos de lectura de una cuenta de almacenamiento principal

La clase petastorm.reader.Reader es el punto de entrada principal para el código de usuario que accede a los datos desde un marco de ML, como TensorFlow o PyTorch. Puede leer un conjunto de datos mediante la clase petastorm.reader.Reader y el Factory Method petastorm.make_reader.

En el ejemplo, puede ver cómo puede pasar un protocolo de dirección URL de abfs .

from petastorm import make_reader

#on primary storage associated with the workspace, url can be abbreviated with container path for data directory
with make_reader('abfs://<container_name>/<data directory path>/') as reader:
    for row in reader:
        print(row)

Conjunto de datos de lectura de una cuenta de almacenamiento secundaria

Si usa una cuenta de almacenamiento alternativa, asegúrese de configurar el servicio vinculado para autenticarse y leer automáticamente desde la cuenta. Además, debe modificar las siguientes propiedades: remote_url, account_namey linked_service_name.

from petastorm import make_reader

# create sas token for storage account access, use your own adls account info
remote_url = "abfs://container_name@storage_account_url"
account_name = "<<adls account name>>"
linked_service_name = '<<linked service name>>'
TokenLibrary = spark._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
sas_token = TokenLibrary.getConnectionString(linked_service_name)

with make_reader('{}/data_directory'.format(remote_url), storage_options = {'sas_token' : sas_token}) as reader:
    for row in reader:
        print(row)

Conjunto de datos de lectura en lotes

En el ejemplo, puede ver cómo puede pasar un protocolo de dirección URL abfs para leer datos en lotes. Este ejemplo utiliza la clase make_batch_reader.

from petastorm import make_batch_reader

with make_batch_reader('abfs://<container_name>/<data directory path>/', schema_fields=["value1", "value2"]) as reader:
    for schema_view in reader:
        print("Batched read:\nvalue1: {0} value2: {1}".format(schema_view.value1, schema_view.value2))

PyTorch API

Para leer un conjunto de datos de Petastorm de PyTorch, puede usar la clase de adaptador petastorm.pytorch.DataLoader. Este adaptador permite que se proporcionen transformaciones y funciones de intercalación personalizadas de PyTorch.

En este ejemplo, se muestra cómo se puede usar Petastorm DataLoader para cargar un conjunto de datos de Petastorm con la ayuda de la API make_reader. En esta primera sección se crea la definición de una clase Net y una función train y test.

from __future__ import division, print_function

import argparse
import pyarrow
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms

from petastorm import make_reader, TransformSpec
from petastorm.pytorch import DataLoader
from pyspark.sql.functions import col

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

def train(model, device, train_loader, log_interval, optimizer, epoch):
    model.train()
    for batch_idx, row in enumerate(train_loader):
        data, target = row['image'].to(device), row['digit'].to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), loss.item()))

def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    count = 0
    with torch.no_grad():
        for row in test_loader:
            data, target = row['image'].to(device), row['digit'].to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.max(1, keepdim=True)[1]  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()
            count += data.shape[0]
    test_loss /= count
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, count, 100. * correct / count))

def _transform_row(mnist_row):
    # For this example, the images are stored as simpler ndarray (28,28), but the
    # training network expects 3-dim images, hence the additional lambda transform.
    transform = transforms.Compose([
        transforms.Lambda(lambda nd: nd.reshape(28, 28, 1)),
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    # In addition, the petastorm pytorch DataLoader does not distinguish the notion of
    # data or target transform, but that actually gives the user more flexibility
    # to make the desired partial transform, as shown here.
    result_row = {
        'image': transform(mnist_row['image']),
        'digit': mnist_row['digit']
    }

    return result_row

En este ejemplo, se usa una cuenta de Azure Data Lake Storage para almacenar datos intermedios. Para almacenar estos datos, debe configurar un servicio vinculado en la cuenta de almacenamiento y recuperar los siguientes fragmentos de información: remote_url, account_name y linked_service_name.

from petastorm import make_reader

# create sas token for storage account access, use your own adls account info
remote_url = "abfs://container_name@storage_account_url"
account_name = "<account name>"
linked_service_name = '<linked service name>'
TokenLibrary = spark._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
sas_token = TokenLibrary.getConnectionString(linked_service_name)

# Read Petastorm dataset and apply custom PyTorch transformation functions 

device = torch.device('cpu') #For GPU, it will be torch.device('cuda'). More details: https://pytorch.org/docs/stable/tensor_attributes.html#torch-device

model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

loop_epochs = 1
reader_epochs = 1

transform = TransformSpec(_transform_row, removed_fields=['idx'])

for epoch in range(1, loop_epochs + 1):
    with DataLoader(make_reader('{}/train'.format(remote_url), num_epochs=reader_epochs, transform_spec=transform),batch_size=5) as train_loader:
        train(model, device, train_loader, 10, optimizer, epoch)
    with DataLoader(make_reader('{}/test'.format(remote_url), num_epochs=reader_epochs, transform_spec=transform), batch_size=5) as test_loader:
        test(model, device, test_loader)

Pasos siguientes