Share via


Verwenden von Apache Spark (unterstützt von Azure Synapse Analytics) in Ihrer Machine Learning-Pipeline (veraltet)

GILT FÜR:Python SDK azureml v1

Warnung

Die im Python SDK v1 verfügbare Azure Synapse Analytics-Integration mit Azure Machine Learning ist veraltet. Der Synapse-Arbeitsbereich, der bei Azure Machine Learning registriert ist, kann weiterhin als verknüpfter Dienst verwendet werden. Ein neuer Synapse-Arbeitsbereich kann jedoch nicht mehr bei Azure Machine Learning als verknüpfter Dienst registriert werden. Es wird empfohlen, serverlose Spark-Compute- und angefügte Synapse Spark-Pools zu verwenden, die in CLI v2 und Python SDK v2 verfügbar sind. Weitere Informationen finden Sie unter https://aka.ms/aml-spark.

In diesem Artikel erfahren Sie, wie Sie Apache Spark-Pools, die von Azure Synapse Analytics unterstützt werden, als Computeziel für einen Datenaufbereitungsschritt in einer Azure Machine Learning-Pipeline verwenden. Sie erfahren, wie eine einzelne Pipeline Computeressourcen verwenden kann, die für einen bestimmten Schritt geeignet sind, z. B. Datenvorbereitung oder Training. Sie werden außerdem lernen, wie die Daten für den Spark-Schritt vorbereitet und an den nächsten Schritt übergeben werden.

Voraussetzungen

Sie erstellen und verwalten Ihre Apache Spark-Pools in einem Azure Synapse Analytics-Arbeitsbereich. Wenn Sie einen Apache Spark-Pool mit einem Azure Machine Learning-Arbeitsbereich integrieren möchten, müssen Sie eine Verknüpfung mit dem Azure Synapse Analytics-Arbeitsbereich herstellen. Sobald Sie Ihren Azure Machine Learning-Arbeitsbereich und Ihre Azure Synapse Analytics-Arbeitsbereiche verknüpft haben, können Sie einen Apache Spark-Pool anfügen mit:

  • Azure Machine Learning Studio

  • Python SDK, wie weiter unten erläutert

  • Azure Resource Manager (ARM)-Vorlage. Weitere Informationen finden Sie unter Beispiel-ARM-Vorlage

    • Sie können die Befehlszeile verwenden, um die ARM-Vorlage zu befolgen, den verknüpften Dienst hinzuzufügen und den Apache Spark-Pool mit diesem Codebeispiel anzufügen:
    az deployment group create --name --resource-group <rg_name> --template-file "azuredeploy.json" --parameters @"azuredeploy.parameters.json"
    

Wichtig

Um erfolgreich eine Verknüpfung mit dem Synapse-Arbeitsbereich herzustellen, benötigen Sie die Rolle Besitzer im Synapse-Arbeitsbereich. Überprüfen Sie Ihren Zugriff im Azure-Portal.

Der verknüpfte Dienst erhält bei Erstellung eine systemseitig zugewiesene verwaltete Identität (SAI). Sie müssen dieser vom System zugewiesenen Identität des Linkdiensts die Rolle „Synapse Apache Spark-Admin“ von Synapse Studio zuweisen, damit sie den Spark-Auftrag übermitteln kann (weitere Informationen finden Sie unter Verwalten von Synapse RBAC-Rollenzuweisungen in Synapse Studio).

Außerdem müssen Sie dem Benutzer bzw. der Benutzerin des Azure Machine Learning-Arbeitsbereichs im Azure-Portal der Ressourcenverwaltung die Mitwirkungsrolle erteilen.

Dieser Code zeigt, wie Sie verknüpfte Dienste in Ihrem Arbeitsbereich abrufen:

from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration

ws = Workspace.from_config()

for service in LinkedService.list(ws) : 
    print(f"Service: {service}")

# Retrieve a known linked service
linked_service = LinkedService.get(ws, 'synapselink1')

Workspace.from_config() greift zunächst auf Ihren Azure Machine Learning-Arbeitsbereich mit der Konfiguration in der config.json-Datei zu. (Weitere Informationen finden Sie unter Erstellen einer Arbeitsbereichskonfigurationsdatei). Anschließend gibt der Code alle verknüpften Dienste aus, die im Arbeitsbereich verfügbar sind. Schließlich ruft LinkedService.get() einen verknüpften Dienst namens 'synapselink1' ab.

Anfügen Ihres Apache Spark-Pools als Computeziel für Azure Machine Learning

Um Ihren Apache Spark-Pool für einen Schritt in Ihrer Machine Learning-Pipeline zu verwenden, müssen Sie ihn als ComputeTarget für den Pipelineschritt hinzufügen, wie in diesem Codebeispiel gezeigt:

from azureml.core.compute import SynapseCompute, ComputeTarget

attach_config = SynapseCompute.attach_configuration(
        linked_service = linked_service,
        type="SynapseSpark",
        pool_name="spark01") # This name comes from your Synapse workspace

synapse_compute=ComputeTarget.attach(
        workspace=ws,
        name='link1-spark01',
        attach_configuration=attach_config)

synapse_compute.wait_for_completion()

Der Code konfiguriert zuerst die SynapseCompute. Das Argument linked_service ist das LinkedService-Objekt, das Sie im vorherigen Schritt erstellt oder abgerufen haben. Das Argument type muss SynapseSpark sein. Das Argument pool_name in SynapseCompute.attach_configuration() muss mit einem vorhandenen Pool in Ihrem Azure Synapse Analytics-Arbeitsbereich übereinstimmen. Weitere Informationen zum Erstellen eines Apache Spark-Pools im Azure Synapse Analytics-Arbeitsbereich finden Sie unter Schnellstart: Erstellen eines serverlosen Apache Spark-Pools mithilfe von Synapse Studio. Der Typ von attach_config ist ComputeTargetAttachConfiguration.

Nachdem die Konfiguration erstellt wurde, erstellen Sie ein Machine Learning-ComputeTarget indem Sie Workspace- und ComputeTargetAttachConfiguration-Werte und den Namen übergeben, über den Sie auf die Compute-Instanz im Machine Learning-Arbeitsbereich verweisen möchten. Der Aufruf von ComputeTarget.attach() ist asynchron, sodass das Beispiel blockiert wird, bis der Aufruf abgeschlossen ist.

Erstellen eines SynapseSparkStep-Elements, das den verknüpften Apache Spark-Pool verwendet

Das Beispielnotebook Spark-Auftrag im Apache Spark-Pool definiert eine einfache Machine Learning-Pipeline. Zuerst definiert das Notebook einen Datenaufbereitungsschritt, der durch das im vorherigen Schritt definierte synapse_compute unterstützt wird. Anschließend definiert das Notebook einen Trainingsschritt, der von einem Computeziel unterstützt wird, das besser für das Training geeignet ist. Das Beispielnotebook verwendet die Titanic-Überlebensdatenbank, um Dateneingaben und -ausgaben anzuzeigen. Tatsächlich werden die Daten nicht bereinigt und kein Prädiktivmodell erstellt. Da dieses Beispiel nicht wirklich ein Training beinhaltet, verwendet der Trainingsschritt eine kostengünstige, CPU-basierte Computeressource.

Daten fließen in eine Machine Learning-Pipeline über DatasetConsumptionConfig-Objekte ein, die Tabellendaten oder eine Reihe von Dateien enthalten können. Die Daten stammen häufig aus Dateien im Blobspeicher eines Datenspeichers für den Arbeitsbereich. Dieses Codebeispiel zeigt einen typischen Code, der Eingaben für eine Machine Learning-Pipeline erzeugt:

from azureml.core import Dataset

datastore = ws.get_default_datastore()
file_name = 'Titanic.csv'

titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(datastore, file_name)])
step1_input1 = titanic_tabular_dataset.as_named_input("tabular_input")

# Example only: it wouldn't make sense to duplicate input data, especially one as tabular and the other as files
titanic_file_dataset = Dataset.File.from_files(path=[(datastore, file_name)])
step1_input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()

Das Codebeispiel geht davon aus, dass sich die Datei Titanic.csv im Blobspeicher befindet. Der Code zeigt, wie die Datei als TabularDataset und auch als FileDataset gelesen wird. Dieser Code dient nur zu Demonstrationszwecken, da es verwirrend wäre, Eingaben zu duplizieren oder eine einzelne Datenquelle sowohl als Ressource mit Tabellendaten als auch nur als Datei zu interpretieren.

Wichtig

Um eine FileDataset als Eingabe zu verwenden, benötigen Sie mindestens eine azureml-core-Version von 1.20.0. Sie können dies mit der Klasse Environment angeben, wie weiter unten beschrieben. Wenn ein Schritt abgeschlossen ist, können Sie die Ausgabedaten, wie in diesem Codebeispiel gezeigt, speichern:

from azureml.data import HDFSOutputDatasetConfig
step1_output = HDFSOutputDatasetConfig(destination=(datastore,"test")).register_on_complete(name="registered_dataset")

In diesem Codebeispiel würden die datastore-Daten in einer Datei mit dem Namen test gespeichert. Die Daten wären im Machine Learning-Arbeitsbereich als Dataset mit dem Namen registered_dataset verfügbar.

Zusätzlich zu den Daten kann ein Pipelineschritt auch schrittweise Python-Abhängigkeiten haben. Außerdem können einzelne SynapseSparkStep-Objekte ihre exakte Azure Synapse Apache Spark-Konfiguration angeben. Um dies zu verdeutlichen, gibt das folgende Codebeispiel an, dass das Paket azureml-core mindestens Version 1.20.0 sein muss. Wie bereits erwähnt, ist diese Anforderung für das azureml-core-Paket erforderlich, um ein FileDataset als Eingabe zu verwenden.

from azureml.core.environment import Environment
from azureml.pipeline.steps import SynapseSparkStep

env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core>=1.20.0")

step_1 = SynapseSparkStep(name = 'synapse-spark',
                          file = 'dataprep.py',
                          source_directory="./code", 
                          inputs=[step1_input1, step1_input2],
                          outputs=[step1_output],
                          arguments = ["--tabular_input", step1_input1, 
                                       "--file_input", step1_input2,
                                       "--output_dir", step1_output],
                          compute_target = 'link1-spark01',
                          driver_memory = "7g",
                          driver_cores = 4,
                          executor_memory = "7g",
                          executor_cores = 2,
                          num_executors = 1,
                          environment = env)

Dieser Code gibt einen einzelnen Schritt in der Azure Machine Learning-Pipeline an. Der environment-Wert dieses Codes legt eine bestimmte azureml-core-Version fest, und der Code kann bei Bedarf weitere Conda- oder Pip-Abhängigkeiten hinzufügen.

Der SynapseSparkStep zippt das Unterverzeichnis ./code vom lokalen Computer und lädt es hoch. Dieses Verzeichnis wird auf dem Computeserver neu erstellt und der Schritt führt das Skript dataprep.py aus diesem Verzeichnis aus. Die inputs und outputs dieses Schritts sind die Objekte step1_input1, step1_input2, und step1_output, die zuvor erläutert wurden. Die einfachste Möglichkeit, auf diese Werte innerhalb des dataprep.py-Skripts zuzugreifen, besteht darin, ihnen benannte arguments zuzuordnen.

Der nächste Satz von Argumenten für den SynapseSparkStep-Konstruktor steuert Apache Spark. Das compute_target ist 'link1-spark01', das wir zuvor als Computeziel angefügt haben. Mit den anderen Parametern werden der Arbeitsspeicher und die Kerne angegeben, die wir verwenden möchten.

Das Beispielnotebook verwendet diesen Code für dataprep.py:

import os
import sys
import azureml.core
from pyspark.sql import SparkSession
from azureml.core import Run, Dataset

print(azureml.core.VERSION)
print(os.environ)

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--tabular_input")
parser.add_argument("--file_input")
parser.add_argument("--output_dir")
args = parser.parse_args()

# use dataset sdk to read tabular dataset
run_context = Run.get_context()
dataset = Dataset.get_by_id(run_context.experiment.workspace,id=args.tabular_input)
sdf = dataset.to_spark_dataframe()
sdf.show()

# use hdfs path to read file dataset
spark= SparkSession.builder.getOrCreate()
sdf = spark.read.option("header", "true").csv(args.file_input)
sdf.show()

sdf.coalesce(1).write\
.option("header", "true")\
.mode("append")\
.csv(args.output_dir)

Dieses Skript zur „Datenaufbereitung“ führt keine echte Datentransformation durch, sondern zeigt, wie Daten abgerufen, in einen Spark-Datenrahmen konvertiert und einige grundlegende Apache Spark-Bearbeitungen vorgenommen werden. Um die Ausgabe in Azure Machine Learning Studio zu finden, öffnen Sie den untergeordneten Auftrag, wählen die Registerkarte Ausgaben und Protokolle aus und öffnen die Datei logs/azureml/driver/stdout, wie in diesem Screenshot gezeigt:

Screenshot of Studio showing stdout tab of child job

Verwenden von SynapseSparkStep in einer Pipeline

Im nächsten Beispiel wird die Ausgabe aus dem SynapseSparkStep, der im vorherigen Abschnitt erstellt wurde, verwendet. Andere Schritte in der Pipeline können möglicherweise ihre eigenen eindeutigen Umgebungen aufweisen und möglicherweise auf verschiedenen Computeressourcen ausgeführt werden, die für die jeweilige Aufgabe geeignet sind. Das Beispielnotebook führt den „Trainingsschritt“ auf einem kleinen CPU-Cluster aus:

from azureml.core.compute import AmlCompute

cpu_cluster_name = "cpucluster"

if cpu_cluster_name in ws.compute_targets:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
else:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', max_nodes=1)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    print('Allocating new CPU compute cluster')

cpu_cluster.wait_for_completion(show_output=True)

step2_input = step1_output.as_input("step2_input").as_download()

step_2 = PythonScriptStep(script_name="train.py",
                          arguments=[step2_input],
                          inputs=[step2_input],
                          compute_target=cpu_cluster_name,
                          source_directory="./code",
                          allow_reuse=False)

Dieser Code erstellt die neue Computeressource, falls erforderlich. Anschließend konvertiert es das step1_output-Ergebnis in Eingaben für den Trainingsschritt. Die Option as_download() bedeutet, dass die Daten auf die Computeressource verschoben werden, was zu einem schnelleren Zugriff führt. Wenn die Daten so umfangreich sind, dass sie nicht auf die Festplatte der lokalen Compute-Instanz passen, müssten Sie die Option as_mount() verwenden, um die Daten mit dem FUSE-Dateisystem zu streamen. Das compute_target dieses zweiten Schrittes ist 'cpucluster', nicht die 'link1-spark01'-Ressource, die Sie im Schritt der Datenaufbereitung verwendet haben. In diesem Schritt wird anstelle des dataprep.py-Skripts, das Sie im vorherigen Schritt verwendet haben, ein einfaches train.py-Skript verwendet. Das Beispielnotebook enthält Details zum train.py-Skript.

Nachdem Sie alle Schritte definiert haben, können Sie Ihre Pipeline erstellen und ausführen.

from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])
pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)

Dieser Code erstellt eine Pipeline, die aus dem von Azure Synapse Analytics unterstützten Datenaufbereitungsschritt (step_1) für Apache Spark-Pools und dem Trainingsschritt (step_2) besteht. Azure untersucht die Datenabhängigkeiten zwischen den Schritten, um das Ausführungsdiagramm zu berechnen. In diesem Fall gibt es nur eine einfache Abhängigkeit. Hier erfordert step2_input notwendigerweise step1_output.

Der Aufruf von pipeline.submit erstellt bei Bedarf ein Experiment mit dem Namen synapse-pipeline und startet darin asynchron einen Auftrag. Einzelne Schritte innerhalb der Pipeline werden als untergeordnete Aufträge dieses Hauptauftrags ausgeführt und die Seite „Experimente“ von Studio überwacht und überprüft diese Schritte.

Nächste Schritte