Verwenden von Apache Spark (unterstützt von Azure Synapse Analytics) in Ihrer Machine Learning-Pipeline (veraltet)
GILT FÜR: Python SDK azureml v1
Warnung
Die im Python SDK v1 verfügbare Azure Synapse Analytics-Integration mit Azure Machine Learning ist veraltet. Der Synapse-Arbeitsbereich, der bei Azure Machine Learning registriert ist, kann weiterhin als verknüpfter Dienst verwendet werden. Ein neuer Synapse-Arbeitsbereich kann jedoch nicht mehr bei Azure Machine Learning als verknüpfter Dienst registriert werden. Es wird empfohlen, serverlose Spark-Compute- und angefügte Synapse Spark-Pools zu verwenden, die in CLI v2 und Python SDK v2 verfügbar sind. Weitere Informationen finden Sie unter https://aka.ms/aml-spark.
In diesem Artikel erfahren Sie, wie Sie Apache Spark-Pools, die von Azure Synapse Analytics unterstützt werden, als Computeziel für einen Datenaufbereitungsschritt in einer Azure Machine Learning-Pipeline verwenden. Sie erfahren, wie eine einzelne Pipeline Computeressourcen verwenden kann, die für einen bestimmten Schritt geeignet sind, z. B. Datenvorbereitung oder Training. Sie werden außerdem lernen, wie die Daten für den Spark-Schritt vorbereitet und an den nächsten Schritt übergeben werden.
Voraussetzungen
Erstellen Sie einen Azure Machine Learning-Arbeitsbereich, der Ihre gesamten Pipelineressourcen aufnehmen soll.
Konfigurieren Sie Ihre Entwicklungsumgebung für die Installation des Azure Machine Learning SDK, oder verwenden Sie eine Azure Machine Learning-Computeinstanz mit bereits installiertem SDK.
Erstellen Sie einen Azure Synapse Analytics-Arbeitsbereich und einen Apache Spark-Pool. Weitere Informationen finden Sie in der Schnellstartanleitung: Erstellen eines serverlosen Apache Spark-Pools mit Synapse Studio
Verknüpfen des Azure Machine Learning-Arbeitsbereichs und des Azure Synapse Analytics-Arbeitsbereichs
Sie erstellen und verwalten Ihre Apache Spark-Pools in einem Azure Synapse Analytics-Arbeitsbereich. Wenn Sie einen Apache Spark-Pool mit einem Azure Machine Learning-Arbeitsbereich integrieren möchten, müssen Sie eine Verknüpfung mit dem Azure Synapse Analytics-Arbeitsbereich herstellen. Sobald Sie Ihren Azure Machine Learning-Arbeitsbereich und Ihre Azure Synapse Analytics-Arbeitsbereiche verknüpft haben, können Sie einen Apache Spark-Pool anfügen mit:
Python SDK, wie weiter unten erläutert
Azure Resource Manager (ARM)-Vorlage. Weitere Informationen finden Sie unter Beispiel-ARM-Vorlage
- Sie können die Befehlszeile verwenden, um die ARM-Vorlage zu befolgen, den verknüpften Dienst hinzuzufügen und den Apache Spark-Pool mit diesem Codebeispiel anzufügen:
az deployment group create --name --resource-group <rg_name> --template-file "azuredeploy.json" --parameters @"azuredeploy.parameters.json"
Wichtig
Um erfolgreich eine Verknüpfung mit dem Synapse-Arbeitsbereich herzustellen, benötigen Sie die Rolle Besitzer im Synapse-Arbeitsbereich. Überprüfen Sie Ihren Zugriff im Azure-Portal.
Der verknüpfte Dienst erhält bei Erstellung eine systemseitig zugewiesene verwaltete Identität (SAI). Sie müssen dieser vom System zugewiesenen Identität des Linkdiensts die Rolle „Synapse Apache Spark-Admin“ von Synapse Studio zuweisen, damit sie den Spark-Auftrag übermitteln kann (weitere Informationen finden Sie unter Verwalten von Synapse RBAC-Rollenzuweisungen in Synapse Studio).
Außerdem müssen Sie dem Benutzer bzw. der Benutzerin des Azure Machine Learning-Arbeitsbereichs im Azure-Portal der Ressourcenverwaltung die Mitwirkungsrolle erteilen.
Abrufen des Links zwischen Ihrem Azure Synapse Analytics-Arbeitsbereich und Ihrem Azure Machine Learning-Arbeitsbereich
Dieser Code zeigt, wie Sie verknüpfte Dienste in Ihrem Arbeitsbereich abrufen:
from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration
ws = Workspace.from_config()
for service in LinkedService.list(ws) :
print(f"Service: {service}")
# Retrieve a known linked service
linked_service = LinkedService.get(ws, 'synapselink1')
Workspace.from_config()
greift zunächst auf Ihren Azure Machine Learning-Arbeitsbereich mit der Konfiguration in der config.json
-Datei zu. (Weitere Informationen finden Sie unter Erstellen einer Arbeitsbereichskonfigurationsdatei). Anschließend gibt der Code alle verknüpften Dienste aus, die im Arbeitsbereich verfügbar sind. Schließlich ruft LinkedService.get()
einen verknüpften Dienst namens 'synapselink1'
ab.
Anfügen Ihres Apache Spark-Pools als Computeziel für Azure Machine Learning
Um Ihren Apache Spark-Pool für einen Schritt in Ihrer Machine Learning-Pipeline zu verwenden, müssen Sie ihn als ComputeTarget
für den Pipelineschritt hinzufügen, wie in diesem Codebeispiel gezeigt:
from azureml.core.compute import SynapseCompute, ComputeTarget
attach_config = SynapseCompute.attach_configuration(
linked_service = linked_service,
type="SynapseSpark",
pool_name="spark01") # This name comes from your Synapse workspace
synapse_compute=ComputeTarget.attach(
workspace=ws,
name='link1-spark01',
attach_configuration=attach_config)
synapse_compute.wait_for_completion()
Der Code konfiguriert zuerst die SynapseCompute
. Das Argument linked_service
ist das LinkedService
-Objekt, das Sie im vorherigen Schritt erstellt oder abgerufen haben. Das Argument type
muss SynapseSpark
sein. Das Argument pool_name
in SynapseCompute.attach_configuration()
muss mit einem vorhandenen Pool in Ihrem Azure Synapse Analytics-Arbeitsbereich übereinstimmen. Weitere Informationen zum Erstellen eines Apache Spark-Pools im Azure Synapse Analytics-Arbeitsbereich finden Sie unter Schnellstart: Erstellen eines serverlosen Apache Spark-Pools mithilfe von Synapse Studio. Der Typ von attach_config
ist ComputeTargetAttachConfiguration
.
Nachdem die Konfiguration erstellt wurde, erstellen Sie ein Machine Learning-ComputeTarget
indem Sie Workspace
- und ComputeTargetAttachConfiguration
-Werte und den Namen übergeben, über den Sie auf die Compute-Instanz im Machine Learning-Arbeitsbereich verweisen möchten. Der Aufruf von ComputeTarget.attach()
ist asynchron, sodass das Beispiel blockiert wird, bis der Aufruf abgeschlossen ist.
Erstellen eines SynapseSparkStep
-Elements, das den verknüpften Apache Spark-Pool verwendet
Das Beispielnotebook Spark-Auftrag im Apache Spark-Pool definiert eine einfache Machine Learning-Pipeline. Zuerst definiert das Notebook einen Datenaufbereitungsschritt, der durch das im vorherigen Schritt definierte synapse_compute
unterstützt wird. Anschließend definiert das Notebook einen Trainingsschritt, der von einem Computeziel unterstützt wird, das besser für das Training geeignet ist. Das Beispielnotebook verwendet die Titanic-Überlebensdatenbank, um Dateneingaben und -ausgaben anzuzeigen. Tatsächlich werden die Daten nicht bereinigt und kein Prädiktivmodell erstellt. Da dieses Beispiel nicht wirklich ein Training beinhaltet, verwendet der Trainingsschritt eine kostengünstige, CPU-basierte Computeressource.
Daten fließen in eine Machine Learning-Pipeline über DatasetConsumptionConfig
-Objekte ein, die Tabellendaten oder eine Reihe von Dateien enthalten können. Die Daten stammen häufig aus Dateien im Blobspeicher eines Datenspeichers für den Arbeitsbereich. Dieses Codebeispiel zeigt einen typischen Code, der Eingaben für eine Machine Learning-Pipeline erzeugt:
from azureml.core import Dataset
datastore = ws.get_default_datastore()
file_name = 'Titanic.csv'
titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(datastore, file_name)])
step1_input1 = titanic_tabular_dataset.as_named_input("tabular_input")
# Example only: it wouldn't make sense to duplicate input data, especially one as tabular and the other as files
titanic_file_dataset = Dataset.File.from_files(path=[(datastore, file_name)])
step1_input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()
Das Codebeispiel geht davon aus, dass sich die Datei Titanic.csv
im Blobspeicher befindet. Der Code zeigt, wie die Datei als TabularDataset
und auch als FileDataset
gelesen wird. Dieser Code dient nur zu Demonstrationszwecken, da es verwirrend wäre, Eingaben zu duplizieren oder eine einzelne Datenquelle sowohl als Ressource mit Tabellendaten als auch nur als Datei zu interpretieren.
Wichtig
Um eine FileDataset
als Eingabe zu verwenden, benötigen Sie mindestens eine azureml-core
-Version von 1.20.0
. Sie können dies mit der Klasse Environment
angeben, wie weiter unten beschrieben. Wenn ein Schritt abgeschlossen ist, können Sie die Ausgabedaten, wie in diesem Codebeispiel gezeigt, speichern:
from azureml.data import HDFSOutputDatasetConfig
step1_output = HDFSOutputDatasetConfig(destination=(datastore,"test")).register_on_complete(name="registered_dataset")
In diesem Codebeispiel würden die datastore
-Daten in einer Datei mit dem Namen test
gespeichert. Die Daten wären im Machine Learning-Arbeitsbereich als Dataset
mit dem Namen registered_dataset
verfügbar.
Zusätzlich zu den Daten kann ein Pipelineschritt auch schrittweise Python-Abhängigkeiten haben. Außerdem können einzelne SynapseSparkStep
-Objekte ihre exakte Azure Synapse Apache Spark-Konfiguration angeben. Um dies zu verdeutlichen, gibt das folgende Codebeispiel an, dass das Paket azureml-core
mindestens Version 1.20.0
sein muss. Wie bereits erwähnt, ist diese Anforderung für das azureml-core
-Paket erforderlich, um ein FileDataset
als Eingabe zu verwenden.
from azureml.core.environment import Environment
from azureml.pipeline.steps import SynapseSparkStep
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core>=1.20.0")
step_1 = SynapseSparkStep(name = 'synapse-spark',
file = 'dataprep.py',
source_directory="./code",
inputs=[step1_input1, step1_input2],
outputs=[step1_output],
arguments = ["--tabular_input", step1_input1,
"--file_input", step1_input2,
"--output_dir", step1_output],
compute_target = 'link1-spark01',
driver_memory = "7g",
driver_cores = 4,
executor_memory = "7g",
executor_cores = 2,
num_executors = 1,
environment = env)
Dieser Code gibt einen einzelnen Schritt in der Azure Machine Learning-Pipeline an. Der environment
-Wert dieses Codes legt eine bestimmte azureml-core
-Version fest, und der Code kann bei Bedarf weitere Conda- oder Pip-Abhängigkeiten hinzufügen.
Der SynapseSparkStep
zippt das Unterverzeichnis ./code
vom lokalen Computer und lädt es hoch. Dieses Verzeichnis wird auf dem Computeserver neu erstellt und der Schritt führt das Skript dataprep.py
aus diesem Verzeichnis aus. Die inputs
und outputs
dieses Schritts sind die Objekte step1_input1
, step1_input2
, und step1_output
, die zuvor erläutert wurden. Die einfachste Möglichkeit, auf diese Werte innerhalb des dataprep.py
-Skripts zuzugreifen, besteht darin, ihnen benannte arguments
zuzuordnen.
Der nächste Satz von Argumenten für den SynapseSparkStep
-Konstruktor steuert Apache Spark. Das compute_target
ist 'link1-spark01'
, das wir zuvor als Computeziel angefügt haben. Mit den anderen Parametern werden der Arbeitsspeicher und die Kerne angegeben, die wir verwenden möchten.
Das Beispielnotebook verwendet diesen Code für dataprep.py
:
import os
import sys
import azureml.core
from pyspark.sql import SparkSession
from azureml.core import Run, Dataset
print(azureml.core.VERSION)
print(os.environ)
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--tabular_input")
parser.add_argument("--file_input")
parser.add_argument("--output_dir")
args = parser.parse_args()
# use dataset sdk to read tabular dataset
run_context = Run.get_context()
dataset = Dataset.get_by_id(run_context.experiment.workspace,id=args.tabular_input)
sdf = dataset.to_spark_dataframe()
sdf.show()
# use hdfs path to read file dataset
spark= SparkSession.builder.getOrCreate()
sdf = spark.read.option("header", "true").csv(args.file_input)
sdf.show()
sdf.coalesce(1).write\
.option("header", "true")\
.mode("append")\
.csv(args.output_dir)
Dieses Skript zur „Datenaufbereitung“ führt keine echte Datentransformation durch, sondern zeigt, wie Daten abgerufen, in einen Spark-Datenrahmen konvertiert und einige grundlegende Apache Spark-Bearbeitungen vorgenommen werden. Um die Ausgabe in Azure Machine Learning Studio zu finden, öffnen Sie den untergeordneten Auftrag, wählen die Registerkarte Ausgaben und Protokolle aus und öffnen die Datei logs/azureml/driver/stdout
, wie in diesem Screenshot gezeigt:
Verwenden von SynapseSparkStep
in einer Pipeline
Im nächsten Beispiel wird die Ausgabe aus dem SynapseSparkStep
, der im vorherigen Abschnitt erstellt wurde, verwendet. Andere Schritte in der Pipeline können möglicherweise ihre eigenen eindeutigen Umgebungen aufweisen und möglicherweise auf verschiedenen Computeressourcen ausgeführt werden, die für die jeweilige Aufgabe geeignet sind. Das Beispielnotebook führt den „Trainingsschritt“ auf einem kleinen CPU-Cluster aus:
from azureml.core.compute import AmlCompute
cpu_cluster_name = "cpucluster"
if cpu_cluster_name in ws.compute_targets:
cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
print('Found existing cluster, use it.')
else:
compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', max_nodes=1)
cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
print('Allocating new CPU compute cluster')
cpu_cluster.wait_for_completion(show_output=True)
step2_input = step1_output.as_input("step2_input").as_download()
step_2 = PythonScriptStep(script_name="train.py",
arguments=[step2_input],
inputs=[step2_input],
compute_target=cpu_cluster_name,
source_directory="./code",
allow_reuse=False)
Dieser Code erstellt die neue Computeressource, falls erforderlich. Anschließend konvertiert es das step1_output
-Ergebnis in Eingaben für den Trainingsschritt. Die Option as_download()
bedeutet, dass die Daten auf die Computeressource verschoben werden, was zu einem schnelleren Zugriff führt. Wenn die Daten so umfangreich sind, dass sie nicht auf die Festplatte der lokalen Compute-Instanz passen, müssten Sie die Option as_mount()
verwenden, um die Daten mit dem FUSE
-Dateisystem zu streamen. Das compute_target
dieses zweiten Schrittes ist 'cpucluster'
, nicht die 'link1-spark01'
-Ressource, die Sie im Schritt der Datenaufbereitung verwendet haben. In diesem Schritt wird anstelle des dataprep.py
-Skripts, das Sie im vorherigen Schritt verwendet haben, ein einfaches train.py
-Skript verwendet. Das Beispielnotebook enthält Details zum train.py
-Skript.
Nachdem Sie alle Schritte definiert haben, können Sie Ihre Pipeline erstellen und ausführen.
from azureml.pipeline.core import Pipeline
pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])
pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)
Dieser Code erstellt eine Pipeline, die aus dem von Azure Synapse Analytics unterstützten Datenaufbereitungsschritt (step_1
) für Apache Spark-Pools und dem Trainingsschritt (step_2
) besteht. Azure untersucht die Datenabhängigkeiten zwischen den Schritten, um das Ausführungsdiagramm zu berechnen. In diesem Fall gibt es nur eine einfache Abhängigkeit. Hier erfordert step2_input
notwendigerweise step1_output
.
Der Aufruf von pipeline.submit
erstellt bei Bedarf ein Experiment mit dem Namen synapse-pipeline
und startet darin asynchron einen Auftrag. Einzelne Schritte innerhalb der Pipeline werden als untergeordnete Aufträge dieses Hauptauftrags ausgeführt und die Seite „Experimente“ von Studio überwacht und überprüft diese Schritte.