共用方式為


使用 Petastorm 載入資料 (已淘汰)

Petastorm 是開放原始碼資料存取程式庫,可啟用深度學習模型的單一節點或分散式定型。 此程式庫可實現直接從 Apache Parquet 格式的資料集和已載入為 Apache Spark DataFrame 的資料集進行定型。 Petastorm 支援熱門的訓練架構,例如 TensorFlow 和 PyTorch。

如需 Petastorm 的詳細資訊,您可以瀏覽 Petastorm GitHub 頁面Petastorm API 文件

必要條件

  • Azure Synapse Analytics 工作區,其中 Azure Data Lake Storage Gen2 儲存體帳戶已設定為預設儲存體。 使用 Data Lake Storage Gen2 檔案系統時,您必須是該檔案系統的儲存體 Blob 資料參與者
  • 在 Azure Synapse Analytics 工作區中啟用 GPU 的 Apache Spark 集區。 如需詳細資訊,請參閱在 Azure Synapse 中建立已啟用 GPU 的 Apache Spark 集區。 在本教學課程中,我們建議使用具有 3 個節點 GPU-Large 叢集大小。

注意

Azure Synapse 啟用 GPU 預覽功能集區現已取代。

警告

取代和停用適用於 Apache Spark 3.1 和 3.2 的 Azure Synapse 執行階段上的 GPU 通知

  • Apache Spark 3.2 (取代) 執行階段上的 GPU 加速預覽現已被取代。 取代的執行階段不會有 Bug 和功能修正。 自 2024 年 7 月 8 日起,Spark 3.2 上的此執行階段和對應的 GPU 加速預覽已淘汰和停用。
  • Azure Synapse 3.1 (取代) 執行階段上的 GPU 加速預覽現已被取代。 自 2023 年 1 月 26 日起,適用於 Apache Spark 3.1 的 Azure Synapse 執行階段已終止支援,官方支援已於 2024 年 1 月 26 日失效,且自此日之後,無法再處理任何支援票證、錯誤修正或安全性更新。

設定 Apache Spark 工作階段

在工作階段開始時,我們必須設定一些 Apache Spark 設定。 在大部分情況下,我們只需要設定 numExecutorsspark.rapids.memory.gpu.reserve。 在範例中,您可以看到如何使用 %%configure 命令傳遞 Spark 設定。 Apache Spark 設定文件中會說明每個參數的詳細意義。

%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g"
   }
}

Petastorm 寫入 API

使用 Petastorm 建立的資料集會以 Apache Parquet 格式儲存。 在 Parquet 結構描述之上,Petastorm 也會儲存較高層級的結構描述資訊,讓多維度陣列成為 Petastorm 資料集的原生部分。

在範例中,我們會使用 PySpark 建立資料集。 我們會將資料集寫入至 Azure Data Lake Storage Gen2 帳戶。

import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType

from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField

# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema('HelloWorldSchema', [
    UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
    UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False),
    UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])


def row_generator(x):
    """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
    return {'id': x,
            'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)),
            'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}


def generate_petastorm_dataset(output_url):
    rowgroup_size_mb = 256

    spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
    sc = spark.sparkContext

    # Wrap dataset materialization portion. Will take care of setting up spark environment variables as
    # well as save petastorm specific metadata
    rows_count = 10
    with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):

        rows_rdd = sc.parallelize(range(rows_count))\
            .map(row_generator)\
            .map(lambda x: dict_to_spark_row(HelloWorldSchema, x))

        spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \
            .coalesce(10) \
            .write \
            .mode('overwrite') \
            .parquet(output_url)


output_url = 'abfs://container_name@storage_account_url/data_dir' #use your own adls account info
generate_petastorm_dataset(output_url)

Petastorm 讀取 API

從主要儲存體帳戶讀取資料集

類別 petastorm.reader.Reader 是用戶程序代碼的主要進入點,可從 TensorFlow 或 PyTorch 等 ML 架構存取數據。 您可以使用 petastorm.reader.Reader 類別和 petastorm.make_reader Factory 方法讀取資料集。

在範例中,您可以看到如何傳遞 abfs URL 通訊協定。

from petastorm import make_reader

#on primary storage associated with the workspace, url can be abbreviated with container path for data directory
with make_reader('abfs://<container_name>/<data directory path>/') as reader:
    for row in reader:
        print(row)

從次要儲存體帳戶讀取資料集

如果您使用替代的儲存體帳戶,請務必將連結的服務設定為從帳戶自動驗證和讀取。 此外,您必須修改下列屬性:remote_urlaccount_namelinked_service_name

from petastorm import make_reader

# create sas token for storage account access, use your own adls account info
remote_url = "abfs://container_name@storage_account_url"
account_name = "<<adls account name>>"
linked_service_name = '<<linked service name>>'
TokenLibrary = spark._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
sas_token = TokenLibrary.getConnectionString(linked_service_name)

with make_reader('{}/data_directory'.format(remote_url), storage_options = {'sas_token' : sas_token}) as reader:
    for row in reader:
        print(row)

以批次方式讀取資料集

在範例中,您可以看到如何傳遞 abfs URL 通訊協定,以批次方式讀取資料。 這個範例會使用 make_batch_reader 類別。

from petastorm import make_batch_reader

with make_batch_reader('abfs://<container_name>/<data directory path>/', schema_fields=["value1", "value2"]) as reader:
    for schema_view in reader:
        print("Batched read:\nvalue1: {0} value2: {1}".format(schema_view.value1, schema_view.value2))

PyTorch API

若要從 PyTorch 讀取 Petastorm 資料集,您可以使用配接器 petastorm.pytorch.DataLoader 類別。 此配接器允許提供自訂 PyTorch 定序函式和轉換。

在此範例中,我們將說明如何使用 Petastorm DataLoader,藉助 make_reader API 來載入 Petastorm 資料集。 第一個區段會建立 Net 類別以及 traintest 函式的定義。

from __future__ import division, print_function

import argparse
import pyarrow
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms

from petastorm import make_reader, TransformSpec
from petastorm.pytorch import DataLoader
from pyspark.sql.functions import col

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

def train(model, device, train_loader, log_interval, optimizer, epoch):
    model.train()
    for batch_idx, row in enumerate(train_loader):
        data, target = row['image'].to(device), row['digit'].to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), loss.item()))

def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    count = 0
    with torch.no_grad():
        for row in test_loader:
            data, target = row['image'].to(device), row['digit'].to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.max(1, keepdim=True)[1]  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()
            count += data.shape[0]
    test_loss /= count
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, count, 100. * correct / count))

def _transform_row(mnist_row):
    # For this example, the images are stored as simpler ndarray (28,28), but the
    # training network expects 3-dim images, hence the additional lambda transform.
    transform = transforms.Compose([
        transforms.Lambda(lambda nd: nd.reshape(28, 28, 1)),
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    # In addition, the petastorm pytorch DataLoader does not distinguish the notion of
    # data or target transform, but that actually gives the user more flexibility
    # to make the desired partial transform, as shown here.
    result_row = {
        'image': transform(mnist_row['image']),
        'digit': mnist_row['digit']
    }

    return result_row

在此範例中,Azure Data Lake Storage 帳戶用來儲存中繼資料。 若要儲存此資料,您必須將連結服務設定為儲存體帳戶,並擷取下列資訊片段:remote_urlaccount_namelinked_service_name

from petastorm import make_reader

# create sas token for storage account access, use your own adls account info
remote_url = "abfs://container_name@storage_account_url"
account_name = "<account name>"
linked_service_name = '<linked service name>'
TokenLibrary = spark._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
sas_token = TokenLibrary.getConnectionString(linked_service_name)

# Read Petastorm dataset and apply custom PyTorch transformation functions 

device = torch.device('cpu') #For GPU, it will be torch.device('cuda'). More details: https://pytorch.org/docs/stable/tensor_attributes.html#torch-device

model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

loop_epochs = 1
reader_epochs = 1

transform = TransformSpec(_transform_row, removed_fields=['idx'])

for epoch in range(1, loop_epochs + 1):
    with DataLoader(make_reader('{}/train'.format(remote_url), num_epochs=reader_epochs, transform_spec=transform),batch_size=5) as train_loader:
        train(model, device, train_loader, 10, optimizer, epoch)
    with DataLoader(make_reader('{}/test'.format(remote_url), num_epochs=reader_epochs, transform_spec=transform), batch_size=5) as test_loader:
        test(model, device, test_loader)

下一步