Az Apache Spark (az Azure Synapse Analytics által működtetett) használata a gépi tanulási folyamatban (elavult)
ÉRVÉNYES:Python SDK azureml v1
Figyelmeztetés
A Python SDK 1-ben elérhető Azure Synapse Analytics-integráció és az Azure Machine Tanulás elavult. A felhasználók továbbra is használhatják az Azure Machine Tanulás regisztrált Synapse-munkaterületet társított szolgáltatásként. Új Synapse-munkaterületet azonban már nem lehet regisztrálni az Azure Machine Learninggel társított szolgáltatásként. Javasoljuk, hogy használjon kiszolgáló nélküli Spark-számítást és csatolt Synapse Spark-készleteket, amely a CLI v2 és a Python SDK v2-ben érhető el. További információ: https://aka.ms/aml-spark.
Ebből a cikkből megtudhatja, hogyan használhatja az Azure Synapse Analytics által üzemeltetett Apache Spark-készleteket számítási célként egy Azure Machine Tanulás-folyamat adatelőkészítési lépéséhez. Megtudhatja, hogy egy folyamat hogyan használhatja az adott lépéshez megfelelő számítási erőforrásokat – például adatelőkészítést vagy betanítást. Azt is megtudhatja, hogyan készítik elő az adatokat a Spark-lépéshez, és hogyan haladnak át a következő lépésre.
Előfeltételek
Azure Machine Tanulás-munkaterület létrehozása az összes folyamaterőforrás tárolásához
A fejlesztési környezet konfigurálása az Azure Machine Tanulás SDK telepítéséhez, vagy azure machine Tanulás számítási példány használata a már telepített SDK-val
Azure Synapse Analytics-munkaterület és Apache Spark-készlet létrehozása. További információ : Rövid útmutató: Kiszolgáló nélküli Apache Spark-készlet létrehozása a Synapse Studióval
Az Azure Machine Tanulás-munkaterület és az Azure Synapse Analytics-munkaterület összekapcsolása
Az Apache Spark-készleteket egy Azure Synapse Analytics-munkaterületen hozhatja létre és felügyelheti. Ha egy Apache Spark-készletet egy Azure Machine Tanulás-munkaterülettel szeretne integrálni, az Azure Synapse Analytics-munkaterületre kell hivatkoznia. Miután összekapcsolta az Azure Machine Tanulás-munkaterületet és az Azure Synapse Analytics-munkaterületeket, csatolhat egy Apache Spark-készletet a
Python SDK, a későbbiekben ismertetett módon
Azure Resource Manager- (ARM-) sablon. További információkért látogasson el az ARM-példasablonra
- A parancssor segítségével követheti az ARM-sablont, hozzáadhatja a csatolt szolgáltatást, és csatolhatja az Apache Spark-készletet a következő kódmintával:
az deployment group create --name --resource-group <rg_name> --template-file "azuredeploy.json" --parameters @"azuredeploy.parameters.json"
Fontos
A Synapse-munkaterülethez való sikeres csatoláshoz meg kell adni a Synapse-munkaterület tulajdonosi szerepkörét. A hozzáférését az Azure Portalon ellenőrizheti.
A társított szolgáltatás a létrehozáskor kap egy rendszer által hozzárendelt felügyelt identitást (SAI). Ezt a hivatkozásszolgáltatást a Synapse Studio "Synapse Apache Spark-rendszergazda" szerepköréhez kell hozzárendelnie, hogy elküldhesse a Spark-feladatot (lásd : Synapse RBAC-szerepkör-hozzárendelések kezelése a Synapse Studióban).
Az Azure Machine Tanulás-munkaterület felhasználójának is meg kell adnia a "Közreműködő" szerepkört az Erőforrás-kezelés Azure Portalján.
Az Azure Synapse Analytics-munkaterület és az Azure Machine Tanulás-munkaterület közötti hivatkozás lekérése
Ez a kód bemutatja, hogyan kérhetők le csatolt szolgáltatások a munkaterületen:
from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration
ws = Workspace.from_config()
for service in LinkedService.list(ws) :
print(f"Service: {service}")
# Retrieve a known linked service
linked_service = LinkedService.get(ws, 'synapselink1')
Workspace.from_config()
Először is hozzáfér az Azure Machine Tanulás-munkaterülethez a config.json
fájl konfigurációjával. (További információkért látogasson el a Munkaterület konfigurációs fájljának létrehozása). Ezután a kód kinyomtatja a munkaterületen elérhető összes társított szolgáltatást. LinkedService.get()
Végül lekéri a csatolt szolgáltatást.'synapselink1'
Az Apache Spark-készlet csatlakoztatása számítási célként az Azure Machine Tanulás
Ha az Apache Spark-készletet szeretné használni a gépi tanulási folyamat egy lépésének meghajtásához, csatolnia ComputeTarget
kell azt a folyamatlépéshez, ahogyan az ebben a kódmintában látható:
from azureml.core.compute import SynapseCompute, ComputeTarget
attach_config = SynapseCompute.attach_configuration(
linked_service = linked_service,
type="SynapseSpark",
pool_name="spark01") # This name comes from your Synapse workspace
synapse_compute=ComputeTarget.attach(
workspace=ws,
name='link1-spark01',
attach_configuration=attach_config)
synapse_compute.wait_for_completion()
A kód először konfigurálja a SynapseCompute
. Az linked_service
argumentum az előző LinkedService
lépésben létrehozott vagy lekért objektum. Az type
argumentumnak a következőnek kell lennie SynapseSpark
: . A pool_name
benne lévő SynapseCompute.attach_configuration()
argumentumnak meg kell egyeznie az Azure Synapse Analytics-munkaterület egy meglévő készletével. Az Apache Spark-készlet Azure Synapse Analytics-munkaterületen való létrehozásáról további információt a Rövid útmutatóban talál : Kiszolgáló nélküli Apache Spark-készlet létrehozása a Synapse Studióval. A attach_config
típus a következő ComputeTargetAttachConfiguration
: .
A konfiguráció létrehozása után hozzon létre egy gépi tanulást a gépi tanulásban ComputeTarget
a Workspace
gépi tanulási munkaterületen belüli számítási feladathoz tartozó név és ComputeTargetAttachConfiguration
értékek megadásával. A hívás ComputeTarget.attach()
aszinkron, ezért a minta le lesz tiltva, amíg a hívás befejeződik.
SynapseSparkStep
Csatolt Apache Spark-készletet használó készlet létrehozása
Az Apache Spark-készleten lévő Spark-mintajegyzetfüzet-feladat egy egyszerű gépi tanulási folyamatot határoz meg. Először is a jegyzetfüzet egy adat-előkészítési lépést határoz meg, amelyet az synapse_compute
előző lépésben meghatározottak hajtanak végre. Ezután a jegyzetfüzet egy betanításhoz jobban megfelelő számítási cél által hajtott betanítási lépést határoz meg. A mintajegyzetfüzet a Titanic túlélési adatbázisát használja az adatbemenet és a kimenet megjelenítéséhez. Valójában nem tisztítja meg az adatokat, és nem készít prediktív modellt. Mivel ez a minta nem igazán jár betanítással, a betanítási lépés egy olcsó, CPU-alapú számítási erőforrást használ.
Az adatok objektumokon keresztül DatasetConsumptionConfig
áramlik egy gépi tanulási folyamatba, amely táblázatos adatokat vagy fájlhalmazokat tárolhat. Az adatok gyakran egy munkaterületi adattár blobtárolójában lévő fájlokból származnak. Ez a kódminta egy gépi tanulási folyamat bemenetét létrehozó tipikus kódot mutatja be:
from azureml.core import Dataset
datastore = ws.get_default_datastore()
file_name = 'Titanic.csv'
titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(datastore, file_name)])
step1_input1 = titanic_tabular_dataset.as_named_input("tabular_input")
# Example only: it wouldn't make sense to duplicate input data, especially one as tabular and the other as files
titanic_file_dataset = Dataset.File.from_files(path=[(datastore, file_name)])
step1_input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()
A kódminta feltételezi, hogy a fájl Titanic.csv
blobtárolóban található. A kód bemutatja, hogyan olvasható be a fájl mind a TabularDataset
FileDataset
. Ez a kód csak szemléltetési célokat szolgál, mert zavaró lenne duplikálni a bemeneteket, vagy egyetlen adatforrást táblatartalmú erőforrásként és szigorúan fájlként értelmezni.
Fontos
Bemenetként való használatához FileDataset
legalább 1.20.0
egy azureml-core
verzióra van szüksége. Ezt az Environment
osztálysal is megadhatja, ahogy azt a későbbiekben tárgyaljuk. Amikor egy lépés befejeződik, a kimeneti adatokat az alábbi kódmintában látható módon tárolhatja:
from azureml.data import HDFSOutputDatasetConfig
step1_output = HDFSOutputDatasetConfig(destination=(datastore,"test")).register_on_complete(name="registered_dataset")
Ebben a kódmintában a rendszer az datastore
adatokat egy nevesített test
fájlban tárolja. Az adatok a gépi tanulási munkaterületen lennének elérhetők, a Dataset
névvel registered_dataset
együtt.
Az adatok mellett a folyamatlépések lépésenkénti Python-függőségekkel is rendelkezhetnek. Emellett az egyes SynapseSparkStep
objektumok pontos Azure Synapse Apache Spark-konfigurációt is megadhatnak. Ennek megjelenítéséhez a következő kódminta azt határozza meg, hogy a azureml-core
csomagverziónak legalább 1.20.0
a következőnek kell lennie. Ahogy korábban említettük, a azureml-core
csomagra vonatkozó követelménynek bemenetként kell használnia FileDataset
.
from azureml.core.environment import Environment
from azureml.pipeline.steps import SynapseSparkStep
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core>=1.20.0")
step_1 = SynapseSparkStep(name = 'synapse-spark',
file = 'dataprep.py',
source_directory="./code",
inputs=[step1_input1, step1_input2],
outputs=[step1_output],
arguments = ["--tabular_input", step1_input1,
"--file_input", step1_input2,
"--output_dir", step1_output],
compute_target = 'link1-spark01',
driver_memory = "7g",
driver_cores = 4,
executor_memory = "7g",
executor_cores = 2,
num_executors = 1,
environment = env)
Ez a kód egyetlen lépést határoz meg az Azure Machine Tanulás folyamatában. A environment
kód értéke beállít egy adott azureml-core
verziót, és a kód szükség szerint további conda- vagy pip-függőségeket is hozzáadhat.
A SynapseSparkStep
zip-fájl feltölti az ./code
alkönyvtárat a helyi számítógépről. Ez a könyvtár újra létre lesz hozva a számítási kiszolgálón, és a lépés az adott könyvtárból futtatja a dataprep.py
szkriptet. Ennek inputs
a lépésnek az step1_input1
step1_input2
step1_output
és outputs
az objektumai a korábban tárgyaltak. Ezeknek az értékeknek a szkripten belüli elérésének legegyszerűbb módja a dataprep.py
névvel ellátott arguments
értékek társítása.
A konstruktor következő argumentumkészlete az SynapseSparkStep
Apache Sparkot vezérli. Ez compute_target
az, 'link1-spark01'
amelyet korábban számítási célként csatoltunk. A többi paraméter megadja a használni kívánt memóriát és magokat.
A mintajegyzetfüzet a következő kódot dataprep.py
használja:
import os
import sys
import azureml.core
from pyspark.sql import SparkSession
from azureml.core import Run, Dataset
print(azureml.core.VERSION)
print(os.environ)
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--tabular_input")
parser.add_argument("--file_input")
parser.add_argument("--output_dir")
args = parser.parse_args()
# use dataset sdk to read tabular dataset
run_context = Run.get_context()
dataset = Dataset.get_by_id(run_context.experiment.workspace,id=args.tabular_input)
sdf = dataset.to_spark_dataframe()
sdf.show()
# use hdfs path to read file dataset
spark= SparkSession.builder.getOrCreate()
sdf = spark.read.option("header", "true").csv(args.file_input)
sdf.show()
sdf.coalesce(1).write\
.option("header", "true")\
.mode("append")\
.csv(args.output_dir)
Ez az "adat-előkészítési" szkript nem végez valós adatátalakítást, de bemutatja, hogyan lehet adatokat lekérni, Spark-adatkeretté konvertálni, és hogyan lehet elvégezni néhány alapvető Apache Spark-manipulációt. Ha meg szeretné keresni a kimenetet az Azure Machine Tanulás Studióban, nyissa meg a gyermekfeladatot, válassza a Kimenetek + naplók lapot, és nyissa meg a logs/azureml/driver/stdout
fájlt, ahogyan az a képernyőképen látható:
SynapseSparkStep
A folyamaton belüli használat
A következő példa az előző szakaszban létrehozott kimenetet SynapseSparkStep
használja. A folyamat más lépései saját egyedi környezetekkel rendelkezhetnek, és a feladatnak megfelelő különböző számítási erőforrásokon futtathatók. A mintajegyzetfüzet egy kis CPU-fürtön futtatja a betanítási lépést:
from azureml.core.compute import AmlCompute
cpu_cluster_name = "cpucluster"
if cpu_cluster_name in ws.compute_targets:
cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
print('Found existing cluster, use it.')
else:
compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', max_nodes=1)
cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
print('Allocating new CPU compute cluster')
cpu_cluster.wait_for_completion(show_output=True)
step2_input = step1_output.as_input("step2_input").as_download()
step_2 = PythonScriptStep(script_name="train.py",
arguments=[step2_input],
inputs=[step2_input],
compute_target=cpu_cluster_name,
source_directory="./code",
allow_reuse=False)
Ez a kód szükség esetén létrehozza az új számítási erőforrást. Ezután átalakítja az step1_output
eredményt a betanítási lépés bemeneteként. A as_download()
beállítás azt jelenti, hogy az adatok a számítási erőforrásba kerülnek, ami gyorsabb hozzáférést eredményez. Ha az adatok olyan nagyok, hogy nem férnek el a helyi számítási merevlemezen, akkor a as_mount()
fájlrendszerrel kell streamelnie az FUSE
adatokat. A compute_target
második lépés nem az 'cpucluster'
'link1-spark01'
adatelőkészítési lépésben használt erőforrás. Ez a lépés az előző lépésben használt szkript helyett dataprep.py
egy egyszerű train.py
szkriptet használ. A mintajegyzetfüzet a szkript részleteit train.py
tartalmazza.
Az összes lépés definiálása után létrehozhatja és futtathatja a folyamatot.
from azureml.pipeline.core import Pipeline
pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])
pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)
Ez a kód létrehoz egy folyamatot, amely az Apache Spark-készletek adat-előkészítési lépéséből áll, az Azure Synapse Analytics (step_1
) és a betanítási lépés (step_2
) segítségével. Az Azure megvizsgálja a végrehajtási gráf kiszámításának lépései közötti adatfüggőségeket. Ebben az esetben csak egy egyszerű függőség létezik. step2_input
Itt feltétlenül szükség van step1_output
.
A pipeline.submit
hívás szükség esetén létrehoz egy kísérletet synapse-pipeline
, és aszinkron módon elindít egy feladatot benne. A folyamat egyes lépései ennek a fő feladatnak a gyermekfeladataként futnak, és a Studio Kísérletek lapja figyelheti és áttekintheti ezeket a lépéseket.