Så här använder du Apache Spark (drivs av Azure Synapse Analytics) i din maskininlärningspipeline (inaktuell)
GÄLLER FÖR:Python SDK azureml v1
Varning
Azure Synapse Analytics-integreringen med Azure Machine Learning, som finns i Python SDK v1, är inaktuell. Användare kan fortfarande använda Synapse-arbetsytan, registrerad med Azure Machine Learning, som en länkad tjänst. Men en ny Synapse-arbetsyta kan inte längre registreras med Azure Machine Learning som en länkad tjänst. Vi rekommenderar att du använder serverlös Spark-beräkning och anslutna Synapse Spark-pooler, som är tillgängliga i CLI v2 och Python SDK v2. Mer information finns på https://aka.ms/aml-spark.
I den här artikeln får du lära dig hur du använder Apache Spark-pooler som drivs av Azure Synapse Analytics som beräkningsmål för ett dataförberedelsesteg i en Azure Machine Learning-pipeline. Du lär dig hur en enda pipeline kan använda beräkningsresurser som passar för det specifika steget , till exempel förberedelse av data eller träning. Du får också lära dig hur data förbereds för Spark-steget och hur de går vidare till nästa steg.
Förutsättningar
Skapa en Azure Machine Learning-arbetsyta för att lagra alla dina pipelineresurser
Konfigurera utvecklingsmiljön för att installera Azure Machine Learning SDK eller använd en Azure Machine Learning-beräkningsinstans med SDK:t redan installerat
Skapa en Azure Synapse Analytics-arbetsyta och Apache Spark-pool. Mer information finns i Snabbstart: Skapa en serverlös Apache Spark-pool med Synapse Studio
Länka din Azure Machine Learning-arbetsyta och Azure Synapse Analytics-arbetsyta
Du skapar och administrerar dina Apache Spark-pooler på en Azure Synapse Analytics-arbetsyta. Om du vill integrera en Apache Spark-pool med en Azure Machine Learning-arbetsyta måste du länka till Azure Synapse Analytics-arbetsytan. När du har länkat din Azure Machine Learning-arbetsyta och dina Azure Synapse Analytics-arbetsytor kan du ansluta en Apache Spark-pool med
Python SDK, som beskrivs senare
Azure Resource Manager-mall (ARM). Mer information finns i Exempel på ARM-mall
- Du kan använda kommandoraden för att följa ARM-mallen, lägga till den länkade tjänsten och bifoga Apache Spark-poolen med det här kodexemplet:
az deployment group create --name --resource-group <rg_name> --template-file "azuredeploy.json" --parameters @"azuredeploy.parameters.json"
Viktigt!
Om du vill länka till Synapse-arbetsytan måste du beviljas rollen Ägare för Synapse-arbetsytan. Kontrollera din åtkomst i Azure-portalen.
Den länkade tjänsten hämtar en systemtilldelad hanterad identitet (SAI) när den skapas. Du måste tilldela den här länktjänsten SAI rollen "Synapse Apache Spark-administratör" från Synapse Studio, så att den kan skicka Spark-jobbet (se Hantera Rolltilldelningar för Synapse RBAC i Synapse Studio).
Du måste också ge användaren av Azure Machine Learning-arbetsytan rollen "Deltagare" från Azure-portalen för resurshantering.
Hämta länken mellan din Azure Synapse Analytics-arbetsyta och din Azure Machine Learning-arbetsyta
Den här koden visar hur du hämtar länkade tjänster på din arbetsyta:
from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration
ws = Workspace.from_config()
for service in LinkedService.list(ws) :
print(f"Service: {service}")
# Retrieve a known linked service
linked_service = LinkedService.get(ws, 'synapselink1')
Workspace.from_config()
Först kommer du åt din Azure Machine Learning-arbetsyta med konfigurationen config.json
i filen. (Mer information finns i Skapa en konfigurationsfil för arbetsytan). Sedan skriver koden ut alla länkade tjänster som är tillgängliga på arbetsytan. Slutligen LinkedService.get()
hämtar en länkad tjänst med namnet 'synapselink1'
.
Koppla din Apache Spark-pool som beräkningsmål för Azure Machine Learning
Om du vill använda Apache Spark-poolen för att driva ett steg i din maskininlärningspipeline måste du bifoga den som ett ComputeTarget
för pipelinesteget, som du ser i det här kodexemplet:
from azureml.core.compute import SynapseCompute, ComputeTarget
attach_config = SynapseCompute.attach_configuration(
linked_service = linked_service,
type="SynapseSpark",
pool_name="spark01") # This name comes from your Synapse workspace
synapse_compute=ComputeTarget.attach(
workspace=ws,
name='link1-spark01',
attach_configuration=attach_config)
synapse_compute.wait_for_completion()
Koden konfigurerar SynapseCompute
först . Argumentet linked_service
är det LinkedService
objekt som du skapade eller hämtade i föregående steg. Argumentet type
måste vara SynapseSpark
. Argumentet pool_name
i SynapseCompute.attach_configuration()
måste matcha argumentet för en befintlig pool på din Azure Synapse Analytics-arbetsyta. Mer information om hur du skapar en Apache Spark-pool på Azure Synapse Analytics-arbetsytan finns i Snabbstart: Skapa en serverlös Apache Spark-pool med Synapse Studio. Typen attach_config
är ComputeTargetAttachConfiguration
.
När konfigurationen har skapats skapar du en maskininlärning ComputeTarget
genom att skicka Workspace
in värdena och ComputeTargetAttachConfiguration
och namnet som du vill referera till i beräkningen på maskininlärningsarbetsytan. Anropet till ComputeTarget.attach()
är asynkront, så exemplet blockeras tills anropet har slutförts.
Skapa en SynapseSparkStep
som använder den länkade Apache Spark-poolen
Spark-exempeljobbet för notebook-filen i Apache Spark-poolen definierar en enkel maskininlärningspipeline. Först definierar notebook-filen ett dataförberedelsesteg som drivs av det synapse_compute
som definierades i föregående steg. Sedan definierar notebook-filen ett träningssteg som drivs av ett beräkningsmål som är lämpligare för träning. Exempelanteckningsboken använder Titanics överlevnadsdatabas för att visa indata och utdata. Den rensar inte data eller skapar en förutsägelsemodell. Eftersom det här exemplet inte omfattar träning använder träningssteget en billig, CPU-baserad beräkningsresurs.
Data flödar till en maskininlärningspipeline via DatasetConsumptionConfig
objekt som kan innehålla tabelldata eller uppsättningar med filer. Data kommer ofta från filer i bloblagring i ett datalager för arbetsytor. Det här kodexemplet visar typisk kod som skapar indata för en maskininlärningspipeline:
from azureml.core import Dataset
datastore = ws.get_default_datastore()
file_name = 'Titanic.csv'
titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(datastore, file_name)])
step1_input1 = titanic_tabular_dataset.as_named_input("tabular_input")
# Example only: it wouldn't make sense to duplicate input data, especially one as tabular and the other as files
titanic_file_dataset = Dataset.File.from_files(path=[(datastore, file_name)])
step1_input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()
Kodexemplet förutsätter att filen Titanic.csv
finns i bloblagring. Koden visar hur du läser filen både som en TabularDataset
och som en FileDataset
. Den här koden är endast i demonstrationssyfte eftersom det skulle bli förvirrande att duplicera indata eller tolka en enskild datakälla som både en tabellinnehållande resurs och strikt som en fil.
Viktigt!
Om du vill använda en FileDataset
som indata behöver du en azureml-core
version av minst 1.20.0
. Du kan ange detta med klassen enligt beskrivningen Environment
senare. När ett steg är klart kan du lagra utdata, som du ser i det här kodexemplet:
from azureml.data import HDFSOutputDatasetConfig
step1_output = HDFSOutputDatasetConfig(destination=(datastore,"test")).register_on_complete(name="registered_dataset")
I det här kodexemplet datastore
skulle data lagras i en fil med namnet test
. Data skulle vara tillgängliga på maskininlärningsarbetsytan som en Dataset
, med namnet registered_dataset
.
Förutom data kan ett pipelinesteg ha Python-beroenden per steg. Dessutom kan enskilda SynapseSparkStep
objekt ange sin exakta Azure Synapse Apache Spark-konfiguration. För att visa detta anger följande kodexempel att azureml-core
paketversionen måste vara minst 1.20.0
. Som tidigare nämnts krävs det här kravet för azureml-core
att paketet ska kunna användas FileDataset
som indata.
from azureml.core.environment import Environment
from azureml.pipeline.steps import SynapseSparkStep
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core>=1.20.0")
step_1 = SynapseSparkStep(name = 'synapse-spark',
file = 'dataprep.py',
source_directory="./code",
inputs=[step1_input1, step1_input2],
outputs=[step1_output],
arguments = ["--tabular_input", step1_input1,
"--file_input", step1_input2,
"--output_dir", step1_output],
compute_target = 'link1-spark01',
driver_memory = "7g",
driver_cores = 4,
executor_memory = "7g",
executor_cores = 2,
num_executors = 1,
environment = env)
Den här koden anger ett enda steg i Azure Machine Learning-pipelinen. Värdet environment
för den här koden anger en specifik azureml-core
version och koden kan lägga till andra conda- eller pip-beroenden efter behov.
Zippar SynapseSparkStep
och laddar upp underkatalogen ./code
från den lokala datorn. Katalogen återskapas på beräkningsservern och steget kör skriptet från katalogen dataprep.py
. Och inputs
outputs
för det steget är objekten step1_input1
, step1_input2
och som step1_output
diskuterades tidigare. Det enklaste sättet att komma åt dessa värden i skriptet dataprep.py
är att associera dem med med namnet arguments
.
Nästa uppsättning argument till SynapseSparkStep
konstruktorn styr Apache Spark. compute_target
är 'link1-spark01'
det som vi kopplade som ett beräkningsmål tidigare. De andra parametrarna anger det minne och de kärnor som vi vill använda.
Exempelanteckningsboken använder den här koden för dataprep.py
:
import os
import sys
import azureml.core
from pyspark.sql import SparkSession
from azureml.core import Run, Dataset
print(azureml.core.VERSION)
print(os.environ)
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--tabular_input")
parser.add_argument("--file_input")
parser.add_argument("--output_dir")
args = parser.parse_args()
# use dataset sdk to read tabular dataset
run_context = Run.get_context()
dataset = Dataset.get_by_id(run_context.experiment.workspace,id=args.tabular_input)
sdf = dataset.to_spark_dataframe()
sdf.show()
# use hdfs path to read file dataset
spark= SparkSession.builder.getOrCreate()
sdf = spark.read.option("header", "true").csv(args.file_input)
sdf.show()
sdf.coalesce(1).write\
.option("header", "true")\
.mode("append")\
.csv(args.output_dir)
Det här "dataförberedelseskriptet" utför ingen verklig datatransformering, men det visar hur du hämtar data, konverterar dem till en Spark-dataram och hur du utför grundläggande Apache Spark-manipulering. Om du vill hitta utdata i Azure Machine Learning-studio öppnar du det underordnade jobbet, väljer fliken Utdata + loggar och öppnar filen enligt den här skärmbildenlogs/azureml/driver/stdout
:
SynapseSparkStep
Använda i en pipeline
I nästa exempel används utdata från det SynapseSparkStep
som skapades i föregående avsnitt. Andra steg i pipelinen kan ha egna unika miljöer och kan köras på olika beräkningsresurser som är lämpliga för den aktuella aktiviteten. Exempelanteckningsboken kör "träningssteget" i ett litet CPU-kluster:
from azureml.core.compute import AmlCompute
cpu_cluster_name = "cpucluster"
if cpu_cluster_name in ws.compute_targets:
cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
print('Found existing cluster, use it.')
else:
compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', max_nodes=1)
cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
print('Allocating new CPU compute cluster')
cpu_cluster.wait_for_completion(show_output=True)
step2_input = step1_output.as_input("step2_input").as_download()
step_2 = PythonScriptStep(script_name="train.py",
arguments=[step2_input],
inputs=[step2_input],
compute_target=cpu_cluster_name,
source_directory="./code",
allow_reuse=False)
Den här koden skapar den nya beräkningsresursen om det behövs. Sedan konverteras resultatet step1_output
till indata för träningssteget. Alternativet as_download()
innebär att data flyttas till beräkningsresursen, vilket ger snabbare åtkomst. Om data var så stora att de inte skulle få plats på den lokala beräkningshårdenheten måste du använda as_mount()
alternativet för att strömma data med FUSE
filsystemet. Det compute_target
andra steget är 'cpucluster'
, inte den 'link1-spark01'
resurs som du använde i steget för dataförberedelse. Det här steget använder ett enkelt train.py
skript i stället för det dataprep.py
skript som du använde i föregående steg. Exempelanteckningsboken innehåller information om skriptet train.py
.
När du har definierat alla dina steg kan du skapa och köra din pipeline.
from azureml.pipeline.core import Pipeline
pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])
pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)
Den här koden skapar en pipeline som består av steget för förberedelse av data i Apache Spark-pooler som drivs av Azure Synapse Analytics (step_1
) och träningssteget (step_2
). Azure undersöker databeroendena mellan stegen för att beräkna körningsdiagrammet. I det här fallet finns det bara ett enkelt beroende. step2_input
Här kräver step1_output
nödvändigtvis .
Anropet pipeline.submit
skapar vid behov ett experiment med namnet synapse-pipeline
och startar asynkront ett jobb i det. Enskilda steg i pipelinen körs som underordnade jobb för det här huvudjobbet, och sidan Experiment i Studio kan övervaka och granska dessa steg.