Share via


Az Apache Spark (az Azure Synapse Analytics által működtetett) használata a gépi tanulási folyamatban (elavult)

ÉRVÉNYES:Python SDK azureml v1

Figyelmeztetés

A Python SDK 1-ben elérhető Azure Synapse Analytics-integráció és az Azure Machine Tanulás elavult. A felhasználók továbbra is használhatják az Azure Machine Tanulás regisztrált Synapse-munkaterületet társított szolgáltatásként. Új Synapse-munkaterületet azonban már nem lehet regisztrálni az Azure Machine Learninggel társított szolgáltatásként. Javasoljuk, hogy használjon kiszolgáló nélküli Spark-számítást és csatolt Synapse Spark-készleteket, amely a CLI v2 és a Python SDK v2-ben érhető el. További információ: https://aka.ms/aml-spark.

Ebből a cikkből megtudhatja, hogyan használhatja az Azure Synapse Analytics által üzemeltetett Apache Spark-készleteket számítási célként egy Azure Machine Tanulás-folyamat adatelőkészítési lépéséhez. Megtudhatja, hogy egy folyamat hogyan használhatja az adott lépéshez megfelelő számítási erőforrásokat – például adatelőkészítést vagy betanítást. Azt is megtudhatja, hogyan készítik elő az adatokat a Spark-lépéshez, és hogyan haladnak át a következő lépésre.

Előfeltételek

Az Apache Spark-készleteket egy Azure Synapse Analytics-munkaterületen hozhatja létre és felügyelheti. Ha egy Apache Spark-készletet egy Azure Machine Tanulás-munkaterülettel szeretne integrálni, az Azure Synapse Analytics-munkaterületre kell hivatkoznia. Miután összekapcsolta az Azure Machine Tanulás-munkaterületet és az Azure Synapse Analytics-munkaterületeket, csatolhat egy Apache Spark-készletet a

  • Azure Machine Learning Studio

  • Python SDK, a későbbiekben ismertetett módon

  • Azure Resource Manager- (ARM-) sablon. További információkért látogasson el az ARM-példasablonra

    • A parancssor segítségével követheti az ARM-sablont, hozzáadhatja a csatolt szolgáltatást, és csatolhatja az Apache Spark-készletet a következő kódmintával:
    az deployment group create --name --resource-group <rg_name> --template-file "azuredeploy.json" --parameters @"azuredeploy.parameters.json"
    

Fontos

A Synapse-munkaterülethez való sikeres csatoláshoz meg kell adni a Synapse-munkaterület tulajdonosi szerepkörét. A hozzáférését az Azure Portalon ellenőrizheti.

A társított szolgáltatás a létrehozáskor kap egy rendszer által hozzárendelt felügyelt identitást (SAI). Ezt a hivatkozásszolgáltatást a Synapse Studio "Synapse Apache Spark-rendszergazda" szerepköréhez kell hozzárendelnie, hogy elküldhesse a Spark-feladatot (lásd : Synapse RBAC-szerepkör-hozzárendelések kezelése a Synapse Studióban).

Az Azure Machine Tanulás-munkaterület felhasználójának is meg kell adnia a "Közreműködő" szerepkört az Erőforrás-kezelés Azure Portalján.

Ez a kód bemutatja, hogyan kérhetők le csatolt szolgáltatások a munkaterületen:

from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration

ws = Workspace.from_config()

for service in LinkedService.list(ws) : 
    print(f"Service: {service}")

# Retrieve a known linked service
linked_service = LinkedService.get(ws, 'synapselink1')

Workspace.from_config() Először is hozzáfér az Azure Machine Tanulás-munkaterülethez a config.json fájl konfigurációjával. (További információkért látogasson el a Munkaterület konfigurációs fájljának létrehozása). Ezután a kód kinyomtatja a munkaterületen elérhető összes társított szolgáltatást. LinkedService.get() Végül lekéri a csatolt szolgáltatást.'synapselink1'

Az Apache Spark-készlet csatlakoztatása számítási célként az Azure Machine Tanulás

Ha az Apache Spark-készletet szeretné használni a gépi tanulási folyamat egy lépésének meghajtásához, csatolnia ComputeTarget kell azt a folyamatlépéshez, ahogyan az ebben a kódmintában látható:

from azureml.core.compute import SynapseCompute, ComputeTarget

attach_config = SynapseCompute.attach_configuration(
        linked_service = linked_service,
        type="SynapseSpark",
        pool_name="spark01") # This name comes from your Synapse workspace

synapse_compute=ComputeTarget.attach(
        workspace=ws,
        name='link1-spark01',
        attach_configuration=attach_config)

synapse_compute.wait_for_completion()

A kód először konfigurálja a SynapseCompute. Az linked_service argumentum az előző LinkedService lépésben létrehozott vagy lekért objektum. Az type argumentumnak a következőnek kell lennie SynapseSpark: . A pool_name benne lévő SynapseCompute.attach_configuration() argumentumnak meg kell egyeznie az Azure Synapse Analytics-munkaterület egy meglévő készletével. Az Apache Spark-készlet Azure Synapse Analytics-munkaterületen való létrehozásáról további információt a Rövid útmutatóban talál : Kiszolgáló nélküli Apache Spark-készlet létrehozása a Synapse Studióval. A attach_config típus a következő ComputeTargetAttachConfiguration: .

A konfiguráció létrehozása után hozzon létre egy gépi tanulást a gépi tanulásban ComputeTarget a Workspace gépi tanulási munkaterületen belüli számítási feladathoz tartozó név és ComputeTargetAttachConfiguration értékek megadásával. A hívás ComputeTarget.attach() aszinkron, ezért a minta le lesz tiltva, amíg a hívás befejeződik.

SynapseSparkStep Csatolt Apache Spark-készletet használó készlet létrehozása

Az Apache Spark-készleten lévő Spark-mintajegyzetfüzet-feladat egy egyszerű gépi tanulási folyamatot határoz meg. Először is a jegyzetfüzet egy adat-előkészítési lépést határoz meg, amelyet az synapse_compute előző lépésben meghatározottak hajtanak végre. Ezután a jegyzetfüzet egy betanításhoz jobban megfelelő számítási cél által hajtott betanítási lépést határoz meg. A mintajegyzetfüzet a Titanic túlélési adatbázisát használja az adatbemenet és a kimenet megjelenítéséhez. Valójában nem tisztítja meg az adatokat, és nem készít prediktív modellt. Mivel ez a minta nem igazán jár betanítással, a betanítási lépés egy olcsó, CPU-alapú számítási erőforrást használ.

Az adatok objektumokon keresztül DatasetConsumptionConfig áramlik egy gépi tanulási folyamatba, amely táblázatos adatokat vagy fájlhalmazokat tárolhat. Az adatok gyakran egy munkaterületi adattár blobtárolójában lévő fájlokból származnak. Ez a kódminta egy gépi tanulási folyamat bemenetét létrehozó tipikus kódot mutatja be:

from azureml.core import Dataset

datastore = ws.get_default_datastore()
file_name = 'Titanic.csv'

titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(datastore, file_name)])
step1_input1 = titanic_tabular_dataset.as_named_input("tabular_input")

# Example only: it wouldn't make sense to duplicate input data, especially one as tabular and the other as files
titanic_file_dataset = Dataset.File.from_files(path=[(datastore, file_name)])
step1_input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()

A kódminta feltételezi, hogy a fájl Titanic.csv blobtárolóban található. A kód bemutatja, hogyan olvasható be a fájl mind a TabularDatasetFileDataset. Ez a kód csak szemléltetési célokat szolgál, mert zavaró lenne duplikálni a bemeneteket, vagy egyetlen adatforrást táblatartalmú erőforrásként és szigorúan fájlként értelmezni.

Fontos

Bemenetként való használatához FileDataset legalább 1.20.0egy azureml-core verzióra van szüksége. Ezt az Environment osztálysal is megadhatja, ahogy azt a későbbiekben tárgyaljuk. Amikor egy lépés befejeződik, a kimeneti adatokat az alábbi kódmintában látható módon tárolhatja:

from azureml.data import HDFSOutputDatasetConfig
step1_output = HDFSOutputDatasetConfig(destination=(datastore,"test")).register_on_complete(name="registered_dataset")

Ebben a kódmintában a rendszer az datastore adatokat egy nevesített testfájlban tárolja. Az adatok a gépi tanulási munkaterületen lennének elérhetők, a Datasetnévvel registered_datasetegyütt.

Az adatok mellett a folyamatlépések lépésenkénti Python-függőségekkel is rendelkezhetnek. Emellett az egyes SynapseSparkStep objektumok pontos Azure Synapse Apache Spark-konfigurációt is megadhatnak. Ennek megjelenítéséhez a következő kódminta azt határozza meg, hogy a azureml-core csomagverziónak legalább 1.20.0a következőnek kell lennie. Ahogy korábban említettük, a azureml-core csomagra vonatkozó követelménynek bemenetként kell használnia FileDataset .

from azureml.core.environment import Environment
from azureml.pipeline.steps import SynapseSparkStep

env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core>=1.20.0")

step_1 = SynapseSparkStep(name = 'synapse-spark',
                          file = 'dataprep.py',
                          source_directory="./code", 
                          inputs=[step1_input1, step1_input2],
                          outputs=[step1_output],
                          arguments = ["--tabular_input", step1_input1, 
                                       "--file_input", step1_input2,
                                       "--output_dir", step1_output],
                          compute_target = 'link1-spark01',
                          driver_memory = "7g",
                          driver_cores = 4,
                          executor_memory = "7g",
                          executor_cores = 2,
                          num_executors = 1,
                          environment = env)

Ez a kód egyetlen lépést határoz meg az Azure Machine Tanulás folyamatában. A environment kód értéke beállít egy adott azureml-core verziót, és a kód szükség szerint további conda- vagy pip-függőségeket is hozzáadhat.

A SynapseSparkStep zip-fájl feltölti az ./code alkönyvtárat a helyi számítógépről. Ez a könyvtár újra létre lesz hozva a számítási kiszolgálón, és a lépés az adott könyvtárból futtatja a dataprep.py szkriptet. Ennek inputs a lépésnek az step1_input1step1_input2step1_output és outputs az objektumai a korábban tárgyaltak. Ezeknek az értékeknek a szkripten belüli elérésének legegyszerűbb módja a dataprep.py névvel ellátott argumentsértékek társítása.

A konstruktor következő argumentumkészlete az SynapseSparkStep Apache Sparkot vezérli. Ez compute_target az, 'link1-spark01' amelyet korábban számítási célként csatoltunk. A többi paraméter megadja a használni kívánt memóriát és magokat.

A mintajegyzetfüzet a következő kódot dataprep.pyhasználja:

import os
import sys
import azureml.core
from pyspark.sql import SparkSession
from azureml.core import Run, Dataset

print(azureml.core.VERSION)
print(os.environ)

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--tabular_input")
parser.add_argument("--file_input")
parser.add_argument("--output_dir")
args = parser.parse_args()

# use dataset sdk to read tabular dataset
run_context = Run.get_context()
dataset = Dataset.get_by_id(run_context.experiment.workspace,id=args.tabular_input)
sdf = dataset.to_spark_dataframe()
sdf.show()

# use hdfs path to read file dataset
spark= SparkSession.builder.getOrCreate()
sdf = spark.read.option("header", "true").csv(args.file_input)
sdf.show()

sdf.coalesce(1).write\
.option("header", "true")\
.mode("append")\
.csv(args.output_dir)

Ez az "adat-előkészítési" szkript nem végez valós adatátalakítást, de bemutatja, hogyan lehet adatokat lekérni, Spark-adatkeretté konvertálni, és hogyan lehet elvégezni néhány alapvető Apache Spark-manipulációt. Ha meg szeretné keresni a kimenetet az Azure Machine Tanulás Studióban, nyissa meg a gyermekfeladatot, válassza a Kimenetek + naplók lapot, és nyissa meg a logs/azureml/driver/stdout fájlt, ahogyan az a képernyőképen látható:

Screenshot of Studio showing stdout tab of child job

SynapseSparkStep A folyamaton belüli használat

A következő példa az előző szakaszban létrehozott kimenetet SynapseSparkStep használja. A folyamat más lépései saját egyedi környezetekkel rendelkezhetnek, és a feladatnak megfelelő különböző számítási erőforrásokon futtathatók. A mintajegyzetfüzet egy kis CPU-fürtön futtatja a betanítási lépést:

from azureml.core.compute import AmlCompute

cpu_cluster_name = "cpucluster"

if cpu_cluster_name in ws.compute_targets:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
else:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', max_nodes=1)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    print('Allocating new CPU compute cluster')

cpu_cluster.wait_for_completion(show_output=True)

step2_input = step1_output.as_input("step2_input").as_download()

step_2 = PythonScriptStep(script_name="train.py",
                          arguments=[step2_input],
                          inputs=[step2_input],
                          compute_target=cpu_cluster_name,
                          source_directory="./code",
                          allow_reuse=False)

Ez a kód szükség esetén létrehozza az új számítási erőforrást. Ezután átalakítja az step1_output eredményt a betanítási lépés bemeneteként. A as_download() beállítás azt jelenti, hogy az adatok a számítási erőforrásba kerülnek, ami gyorsabb hozzáférést eredményez. Ha az adatok olyan nagyok, hogy nem férnek el a helyi számítási merevlemezen, akkor a as_mount() fájlrendszerrel kell streamelnie az FUSE adatokat. A compute_target második lépés nem az 'cpucluster''link1-spark01' adatelőkészítési lépésben használt erőforrás. Ez a lépés az előző lépésben használt szkript helyett dataprep.py egy egyszerű train.py szkriptet használ. A mintajegyzetfüzet a szkript részleteit train.py tartalmazza.

Az összes lépés definiálása után létrehozhatja és futtathatja a folyamatot.

from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])
pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)

Ez a kód létrehoz egy folyamatot, amely az Apache Spark-készletek adat-előkészítési lépéséből áll, az Azure Synapse Analytics (step_1) és a betanítási lépés (step_2) segítségével. Az Azure megvizsgálja a végrehajtási gráf kiszámításának lépései közötti adatfüggőségeket. Ebben az esetben csak egy egyszerű függőség létezik. step2_input Itt feltétlenül szükség van step1_output.

A pipeline.submit hívás szükség esetén létrehoz egy kísérletet synapse-pipeline, és aszinkron módon elindít egy feladatot benne. A folyamat egyes lépései ennek a fő feladatnak a gyermekfeladataként futnak, és a Studio Kísérletek lapja figyelheti és áttekintheti ezeket a lépéseket.

Következő lépések