Så här använder du Apache Spark (drivs av Azure Synapse Analytics) i din maskininlärningspipeline (inaktuell)

GÄLLER FÖR:Python SDK azureml v1

Varning

Azure Synapse Analytics-integreringen med Azure Machine Learning, som finns i Python SDK v1, är inaktuell. Användare kan fortfarande använda Synapse-arbetsytan, registrerad med Azure Machine Learning, som en länkad tjänst. Men en ny Synapse-arbetsyta kan inte längre registreras med Azure Machine Learning som en länkad tjänst. Vi rekommenderar att du använder serverlös Spark-beräkning och anslutna Synapse Spark-pooler, som är tillgängliga i CLI v2 och Python SDK v2. Mer information finns på https://aka.ms/aml-spark.

I den här artikeln får du lära dig hur du använder Apache Spark-pooler som drivs av Azure Synapse Analytics som beräkningsmål för ett dataförberedelsesteg i en Azure Machine Learning-pipeline. Du lär dig hur en enda pipeline kan använda beräkningsresurser som passar för det specifika steget , till exempel förberedelse av data eller träning. Du får också lära dig hur data förbereds för Spark-steget och hur de går vidare till nästa steg.

Förutsättningar

Du skapar och administrerar dina Apache Spark-pooler på en Azure Synapse Analytics-arbetsyta. Om du vill integrera en Apache Spark-pool med en Azure Machine Learning-arbetsyta måste du länka till Azure Synapse Analytics-arbetsytan. När du har länkat din Azure Machine Learning-arbetsyta och dina Azure Synapse Analytics-arbetsytor kan du ansluta en Apache Spark-pool med

  • Azure Machine Learning-studio

  • Python SDK, som beskrivs senare

  • Azure Resource Manager-mall (ARM). Mer information finns i Exempel på ARM-mall

    • Du kan använda kommandoraden för att följa ARM-mallen, lägga till den länkade tjänsten och bifoga Apache Spark-poolen med det här kodexemplet:
    az deployment group create --name --resource-group <rg_name> --template-file "azuredeploy.json" --parameters @"azuredeploy.parameters.json"
    

Viktigt!

Om du vill länka till Synapse-arbetsytan måste du beviljas rollen Ägare för Synapse-arbetsytan. Kontrollera din åtkomst i Azure-portalen.

Den länkade tjänsten hämtar en systemtilldelad hanterad identitet (SAI) när den skapas. Du måste tilldela den här länktjänsten SAI rollen "Synapse Apache Spark-administratör" från Synapse Studio, så att den kan skicka Spark-jobbet (se Hantera Rolltilldelningar för Synapse RBAC i Synapse Studio).

Du måste också ge användaren av Azure Machine Learning-arbetsytan rollen "Deltagare" från Azure-portalen för resurshantering.

Den här koden visar hur du hämtar länkade tjänster på din arbetsyta:

from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration

ws = Workspace.from_config()

for service in LinkedService.list(ws) : 
    print(f"Service: {service}")

# Retrieve a known linked service
linked_service = LinkedService.get(ws, 'synapselink1')

Workspace.from_config() Först kommer du åt din Azure Machine Learning-arbetsyta med konfigurationen config.json i filen. (Mer information finns i Skapa en konfigurationsfil för arbetsytan). Sedan skriver koden ut alla länkade tjänster som är tillgängliga på arbetsytan. Slutligen LinkedService.get() hämtar en länkad tjänst med namnet 'synapselink1'.

Koppla din Apache Spark-pool som beräkningsmål för Azure Machine Learning

Om du vill använda Apache Spark-poolen för att driva ett steg i din maskininlärningspipeline måste du bifoga den som ett ComputeTarget för pipelinesteget, som du ser i det här kodexemplet:

from azureml.core.compute import SynapseCompute, ComputeTarget

attach_config = SynapseCompute.attach_configuration(
        linked_service = linked_service,
        type="SynapseSpark",
        pool_name="spark01") # This name comes from your Synapse workspace

synapse_compute=ComputeTarget.attach(
        workspace=ws,
        name='link1-spark01',
        attach_configuration=attach_config)

synapse_compute.wait_for_completion()

Koden konfigurerar SynapseComputeförst . Argumentet linked_service är det LinkedService objekt som du skapade eller hämtade i föregående steg. Argumentet type måste vara SynapseSpark. Argumentet pool_name i SynapseCompute.attach_configuration() måste matcha argumentet för en befintlig pool på din Azure Synapse Analytics-arbetsyta. Mer information om hur du skapar en Apache Spark-pool på Azure Synapse Analytics-arbetsytan finns i Snabbstart: Skapa en serverlös Apache Spark-pool med Synapse Studio. Typen attach_config är ComputeTargetAttachConfiguration.

När konfigurationen har skapats skapar du en maskininlärning ComputeTarget genom att skicka Workspace in värdena och ComputeTargetAttachConfiguration och namnet som du vill referera till i beräkningen på maskininlärningsarbetsytan. Anropet till ComputeTarget.attach() är asynkront, så exemplet blockeras tills anropet har slutförts.

Skapa en SynapseSparkStep som använder den länkade Apache Spark-poolen

Spark-exempeljobbet för notebook-filen i Apache Spark-poolen definierar en enkel maskininlärningspipeline. Först definierar notebook-filen ett dataförberedelsesteg som drivs av det synapse_compute som definierades i föregående steg. Sedan definierar notebook-filen ett träningssteg som drivs av ett beräkningsmål som är lämpligare för träning. Exempelanteckningsboken använder Titanics överlevnadsdatabas för att visa indata och utdata. Den rensar inte data eller skapar en förutsägelsemodell. Eftersom det här exemplet inte omfattar träning använder träningssteget en billig, CPU-baserad beräkningsresurs.

Data flödar till en maskininlärningspipeline via DatasetConsumptionConfig objekt som kan innehålla tabelldata eller uppsättningar med filer. Data kommer ofta från filer i bloblagring i ett datalager för arbetsytor. Det här kodexemplet visar typisk kod som skapar indata för en maskininlärningspipeline:

from azureml.core import Dataset

datastore = ws.get_default_datastore()
file_name = 'Titanic.csv'

titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(datastore, file_name)])
step1_input1 = titanic_tabular_dataset.as_named_input("tabular_input")

# Example only: it wouldn't make sense to duplicate input data, especially one as tabular and the other as files
titanic_file_dataset = Dataset.File.from_files(path=[(datastore, file_name)])
step1_input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()

Kodexemplet förutsätter att filen Titanic.csv finns i bloblagring. Koden visar hur du läser filen både som en TabularDataset och som en FileDataset. Den här koden är endast i demonstrationssyfte eftersom det skulle bli förvirrande att duplicera indata eller tolka en enskild datakälla som både en tabellinnehållande resurs och strikt som en fil.

Viktigt!

Om du vill använda en FileDataset som indata behöver du en azureml-core version av minst 1.20.0. Du kan ange detta med klassen enligt beskrivningen Environment senare. När ett steg är klart kan du lagra utdata, som du ser i det här kodexemplet:

from azureml.data import HDFSOutputDatasetConfig
step1_output = HDFSOutputDatasetConfig(destination=(datastore,"test")).register_on_complete(name="registered_dataset")

I det här kodexemplet datastore skulle data lagras i en fil med namnet test. Data skulle vara tillgängliga på maskininlärningsarbetsytan som en Dataset, med namnet registered_dataset.

Förutom data kan ett pipelinesteg ha Python-beroenden per steg. Dessutom kan enskilda SynapseSparkStep objekt ange sin exakta Azure Synapse Apache Spark-konfiguration. För att visa detta anger följande kodexempel att azureml-core paketversionen måste vara minst 1.20.0. Som tidigare nämnts krävs det här kravet för azureml-core att paketet ska kunna användas FileDataset som indata.

from azureml.core.environment import Environment
from azureml.pipeline.steps import SynapseSparkStep

env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core>=1.20.0")

step_1 = SynapseSparkStep(name = 'synapse-spark',
                          file = 'dataprep.py',
                          source_directory="./code", 
                          inputs=[step1_input1, step1_input2],
                          outputs=[step1_output],
                          arguments = ["--tabular_input", step1_input1, 
                                       "--file_input", step1_input2,
                                       "--output_dir", step1_output],
                          compute_target = 'link1-spark01',
                          driver_memory = "7g",
                          driver_cores = 4,
                          executor_memory = "7g",
                          executor_cores = 2,
                          num_executors = 1,
                          environment = env)

Den här koden anger ett enda steg i Azure Machine Learning-pipelinen. Värdet environment för den här koden anger en specifik azureml-core version och koden kan lägga till andra conda- eller pip-beroenden efter behov.

Zippar SynapseSparkStep och laddar upp underkatalogen ./code från den lokala datorn. Katalogen återskapas på beräkningsservern och steget kör skriptet från katalogen dataprep.py . Och inputsoutputs för det steget är objekten step1_input1, step1_input2och som step1_output diskuterades tidigare. Det enklaste sättet att komma åt dessa värden i skriptet dataprep.py är att associera dem med med namnet arguments.

Nästa uppsättning argument till SynapseSparkStep konstruktorn styr Apache Spark. compute_target är 'link1-spark01' det som vi kopplade som ett beräkningsmål tidigare. De andra parametrarna anger det minne och de kärnor som vi vill använda.

Exempelanteckningsboken använder den här koden för dataprep.py:

import os
import sys
import azureml.core
from pyspark.sql import SparkSession
from azureml.core import Run, Dataset

print(azureml.core.VERSION)
print(os.environ)

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--tabular_input")
parser.add_argument("--file_input")
parser.add_argument("--output_dir")
args = parser.parse_args()

# use dataset sdk to read tabular dataset
run_context = Run.get_context()
dataset = Dataset.get_by_id(run_context.experiment.workspace,id=args.tabular_input)
sdf = dataset.to_spark_dataframe()
sdf.show()

# use hdfs path to read file dataset
spark= SparkSession.builder.getOrCreate()
sdf = spark.read.option("header", "true").csv(args.file_input)
sdf.show()

sdf.coalesce(1).write\
.option("header", "true")\
.mode("append")\
.csv(args.output_dir)

Det här "dataförberedelseskriptet" utför ingen verklig datatransformering, men det visar hur du hämtar data, konverterar dem till en Spark-dataram och hur du utför grundläggande Apache Spark-manipulering. Om du vill hitta utdata i Azure Machine Learning-studio öppnar du det underordnade jobbet, väljer fliken Utdata + loggar och öppnar filen enligt den här skärmbildenlogs/azureml/driver/stdout:

Screenshot of Studio showing stdout tab of child job

SynapseSparkStep Använda i en pipeline

I nästa exempel används utdata från det SynapseSparkStep som skapades i föregående avsnitt. Andra steg i pipelinen kan ha egna unika miljöer och kan köras på olika beräkningsresurser som är lämpliga för den aktuella aktiviteten. Exempelanteckningsboken kör "träningssteget" i ett litet CPU-kluster:

from azureml.core.compute import AmlCompute

cpu_cluster_name = "cpucluster"

if cpu_cluster_name in ws.compute_targets:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
else:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', max_nodes=1)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    print('Allocating new CPU compute cluster')

cpu_cluster.wait_for_completion(show_output=True)

step2_input = step1_output.as_input("step2_input").as_download()

step_2 = PythonScriptStep(script_name="train.py",
                          arguments=[step2_input],
                          inputs=[step2_input],
                          compute_target=cpu_cluster_name,
                          source_directory="./code",
                          allow_reuse=False)

Den här koden skapar den nya beräkningsresursen om det behövs. Sedan konverteras resultatet step1_output till indata för träningssteget. Alternativet as_download() innebär att data flyttas till beräkningsresursen, vilket ger snabbare åtkomst. Om data var så stora att de inte skulle få plats på den lokala beräkningshårdenheten måste du använda as_mount() alternativet för att strömma data med FUSE filsystemet. Det compute_target andra steget är 'cpucluster', inte den 'link1-spark01' resurs som du använde i steget för dataförberedelse. Det här steget använder ett enkelt train.py skript i stället för det dataprep.py skript som du använde i föregående steg. Exempelanteckningsboken innehåller information om skriptet train.py .

När du har definierat alla dina steg kan du skapa och köra din pipeline.

from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])
pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)

Den här koden skapar en pipeline som består av steget för förberedelse av data i Apache Spark-pooler som drivs av Azure Synapse Analytics (step_1) och träningssteget (step_2). Azure undersöker databeroendena mellan stegen för att beräkna körningsdiagrammet. I det här fallet finns det bara ett enkelt beroende. step2_input Här kräver step1_outputnödvändigtvis .

Anropet pipeline.submit skapar vid behov ett experiment med namnet synapse-pipelineoch startar asynkront ett jobb i det. Enskilda steg i pipelinen körs som underordnade jobb för det här huvudjobbet, och sidan Experiment i Studio kan övervaka och granska dessa steg.

Nästa steg