مشاركة عبر


تحميل البيانات باستخدام Petastorm (مهمل)

Petastorm هي مكتبة مصدر مفتوح للوصول إلى البيانات، والتي تمكن من التدريب على عقدة واحدة أو تدريب موزع لنماذج التعلم العميق. تتيح هذه المكتبة التدريب مباشرة من مجموعات البيانات بتنسيق Apache Parquet ومجموعات البيانات التي يتم تحميلها ك Apache Spark DataFrame. يدعم Petastorm أطر التدريب الشائعة مثل TensorFlow وPyTorch.

لمزيد من المعلومات بشأن Petastorm، يمكنك زيارة صفحة Petastorm GitHub أو وثائق Petastorm API.

المتطلبات الأساسية

  • مساحة عمل Azure Synapse Analytics مع حساب تخزين Azure Data Lake Storage Gen2 الذي تم تكوينه كمخزن افتراضي. يجب أن تكون Storage Blob Data Contributor لنظام ملفات Data Lake Storage Gen2 التي تعمل معها.
  • قم بإنشاء تجمع Apache Spark ممكّن بواسطة GPU في مساحة عمل تحليلات Azure Synapse. للحصول على التفاصيل، راجع إنشاء تجمع Apache Spark ممكّن بواسطة GPU في Azure Synapse. بالنسبة لهذا البرنامج التعليمي، نقترح استخدام حجم نظام مجموعة GPU-Large مع 3 عقد.

إشعار

تم الآن إهمال معاينة التجمعات الممكنة ل Azure Synapse GPU.

تكوين جلسة Apache Spark

في بداية الجلسة، نحتاج إلى تكوين بعض إعدادات Apache Spark. في معظم الحالات، نحتاج فقط إلى تعيين numExecutors و spark.rapids.memory.gpu.reserve. في المثال، يمكنك أن ترى كيف يمكن تمرير تكوينات Spark باستخدام %%configure الأمر . يتم شرح المعنى التفصيلي لكل معلمة في وثائق تكوين Apache Spark.

%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g"
   }
}

واجهات برمجة التطبيقات لكتابة Petastorm

تُخزن مجموعة البيانات المنشأة باستخدام Petastorm بتنسيق Apache Parquet. بأعلى مخطط Parquet، تخزن Petastorm أيضاً معلومات مخطط ذات مستوى أعلى يُنشئ صفائف متعددة الأبعاد في جزء أصلي من مجموعة بيانات Petastorm.

في العينة، نقوم بإنشاء مجموعة بيانات باستخدام PySpark. نكتب مجموعة البيانات إلى حساب Azure Data Lake Storage Gen2.

import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType

from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField

# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema('HelloWorldSchema', [
    UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
    UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False),
    UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])


def row_generator(x):
    """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
    return {'id': x,
            'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)),
            'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}


def generate_petastorm_dataset(output_url):
    rowgroup_size_mb = 256

    spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
    sc = spark.sparkContext

    # Wrap dataset materialization portion. Will take care of setting up spark environment variables as
    # well as save petastorm specific metadata
    rows_count = 10
    with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):

        rows_rdd = sc.parallelize(range(rows_count))\
            .map(row_generator)\
            .map(lambda x: dict_to_spark_row(HelloWorldSchema, x))

        spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \
            .coalesce(10) \
            .write \
            .mode('overwrite') \
            .parquet(output_url)


output_url = 'abfs://container_name@storage_account_url/data_dir' #use your own adls account info
generate_petastorm_dataset(output_url)

واجهات برمجة تطبيقات قراءة Petastorm

قراءة مجموعة البيانات من حساب تخزين أساسي

petastorm.reader.Reader الفئة هي نقطة الإدخال الرئيسية للتعليمات البرمجية للمستخدم التي تصل إلى البيانات من إطار عمل التعلم الآلي مثل TensorFlow أو PyTorch. يمكنك قراءة مجموعة بيانات باستخدام فئة petastorm.reader.Reader وأسلوب المصنع petastorm.make_reader.

في المثال، يمكنك أن ترى كيف يمكنك تمرير abfs بروتوكول URL.

from petastorm import make_reader

#on primary storage associated with the workspace, url can be abbreviated with container path for data directory
with make_reader('abfs://<container_name>/<data directory path>/') as reader:
    for row in reader:
        print(row)

قراءة مجموعة البيانات من حساب التخزين الثانوي

إذا كنت تستخدم حساب تخزين بديل، تأكد من إعداد الخدمة المرتبطة للمصادقة والقراءة تلقائيًا من الحساب. بالإضافة إلى ذلك، تحتاج إلى تعديل الخصائص التالية: remote_urlو account_nameو.linked_service_name

from petastorm import make_reader

# create sas token for storage account access, use your own adls account info
remote_url = "abfs://container_name@storage_account_url"
account_name = "<<adls account name>>"
linked_service_name = '<<linked service name>>'
TokenLibrary = spark._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
sas_token = TokenLibrary.getConnectionString(linked_service_name)

with make_reader('{}/data_directory'.format(remote_url), storage_options = {'sas_token' : sas_token}) as reader:
    for row in reader:
        print(row)

قراءة مجموعة البيانات على دفعات

في المثال، يمكنك أن ترى كيف يمكنك تمرير abfs بروتوكول URL لقراءة البيانات على دفعات. يستخدم هذا المثال فئة make_batch_reader.

from petastorm import make_batch_reader

with make_batch_reader('abfs://<container_name>/<data directory path>/', schema_fields=["value1", "value2"]) as reader:
    for schema_view in reader:
        print("Batched read:\nvalue1: {0} value2: {1}".format(schema_view.value1, schema_view.value2))

واجهة برمجة تطبيقات PyTorch

لقراءة مجموعة بيانات Petastorm من PyTorch، يمكنك استخدام الفئة petastorm.pytorch.DataLoader للمحول. يسمح هذا المحول بوظائف ترتيب PyTorch المخصصة والتحويلات التي سيتم توفيرها.

في هذا المثال، سنعرض كيفية استخدام Petastorm DataLoader لتحميل مجموعة بيانات Petastorm بمساعدة واجهة برمجة تطبيقات make_reader. ينشئ القسم الأول المُشار إليه تعريف فئة Net وtrain ووظيفة test.

from __future__ import division, print_function

import argparse
import pyarrow
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms

from petastorm import make_reader, TransformSpec
from petastorm.pytorch import DataLoader
from pyspark.sql.functions import col

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

def train(model, device, train_loader, log_interval, optimizer, epoch):
    model.train()
    for batch_idx, row in enumerate(train_loader):
        data, target = row['image'].to(device), row['digit'].to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), loss.item()))

def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    count = 0
    with torch.no_grad():
        for row in test_loader:
            data, target = row['image'].to(device), row['digit'].to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.max(1, keepdim=True)[1]  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()
            count += data.shape[0]
    test_loss /= count
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, count, 100. * correct / count))

def _transform_row(mnist_row):
    # For this example, the images are stored as simpler ndarray (28,28), but the
    # training network expects 3-dim images, hence the additional lambda transform.
    transform = transforms.Compose([
        transforms.Lambda(lambda nd: nd.reshape(28, 28, 1)),
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    # In addition, the petastorm pytorch DataLoader does not distinguish the notion of
    # data or target transform, but that actually gives the user more flexibility
    # to make the desired partial transform, as shown here.
    result_row = {
        'image': transform(mnist_row['image']),
        'digit': mnist_row['digit']
    }

    return result_row

في هذا المثال، يُستخدم حساب Azure Data Lake Storage لتخزين البيانات الوسيطة. لتخزين هذه البيانات، يجب إعداد خدمة مرتبطة بحساب التخزين واسترداد الأجزاء الآتية من المعلومات: remote_url، وaccount_name، وlinked_service_name.

from petastorm import make_reader

# create sas token for storage account access, use your own adls account info
remote_url = "abfs://container_name@storage_account_url"
account_name = "<account name>"
linked_service_name = '<linked service name>'
TokenLibrary = spark._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
sas_token = TokenLibrary.getConnectionString(linked_service_name)

# Read Petastorm dataset and apply custom PyTorch transformation functions 

device = torch.device('cpu') #For GPU, it will be torch.device('cuda'). More details: https://pytorch.org/docs/stable/tensor_attributes.html#torch-device

model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

loop_epochs = 1
reader_epochs = 1

transform = TransformSpec(_transform_row, removed_fields=['idx'])

for epoch in range(1, loop_epochs + 1):
    with DataLoader(make_reader('{}/train'.format(remote_url), num_epochs=reader_epochs, transform_spec=transform),batch_size=5) as train_loader:
        train(model, device, train_loader, 10, optimizer, epoch)
    with DataLoader(make_reader('{}/test'.format(remote_url), num_epochs=reader_epochs, transform_spec=transform), batch_size=5) as test_loader:
        test(model, device, test_loader)

الخطوات التالية